1use crate::{
21 schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
22 LOG_TARGET,
23};
24
25use codec::{Decode, Encode};
26use futures::{channel::oneshot, stream::StreamExt};
27use log::{debug, trace};
28use prost::Message;
29use sc_network_types::PeerId;
30use schnellru::{ByLength, LruMap};
31
32use sc_client_api::{BlockBackend, ProofProvider};
33use sc_network::{
34 config::ProtocolId,
35 request_responses::{IncomingRequest, OutgoingResponse},
36 NetworkBackend, MAX_RESPONSE_SIZE,
37};
38use sp_runtime::traits::Block as BlockT;
39
40use std::{
41 hash::{Hash, Hasher},
42 sync::Arc,
43 time::Duration,
44};
45
46const MAX_RESPONSE_BYTES: usize = 2 * 1024 * 1024; const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
48
49mod rep {
50 use sc_network::ReputationChange as Rep;
51
52 pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
54}
55
56pub fn generate_protocol_config<
59 Hash: AsRef<[u8]>,
60 B: BlockT,
61 N: NetworkBackend<B, <B as BlockT>::Hash>,
62>(
63 protocol_id: &ProtocolId,
64 genesis_hash: Hash,
65 fork_id: Option<&str>,
66 inbound_queue: async_channel::Sender<IncomingRequest>,
67) -> N::RequestResponseProtocolConfig {
68 N::request_response_config(
69 generate_protocol_name(genesis_hash, fork_id).into(),
70 std::iter::once(generate_legacy_protocol_name(protocol_id).into()).collect(),
71 1024 * 1024,
72 MAX_RESPONSE_SIZE,
73 Duration::from_secs(40),
74 Some(inbound_queue),
75 )
76}
77
78fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
80 let genesis_hash = genesis_hash.as_ref();
81 if let Some(fork_id) = fork_id {
82 format!("/{}/{}/state/2", array_bytes::bytes2hex("", genesis_hash), fork_id)
83 } else {
84 format!("/{}/state/2", array_bytes::bytes2hex("", genesis_hash))
85 }
86}
87
88fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
90 format!("/{}/state/2", protocol_id.as_ref())
91}
92
93#[derive(Eq, PartialEq, Clone)]
95struct SeenRequestsKey<B: BlockT> {
96 peer: PeerId,
97 block: B::Hash,
98 start: Vec<Vec<u8>>,
99}
100
101#[allow(clippy::derived_hash_with_manual_eq)]
102impl<B: BlockT> Hash for SeenRequestsKey<B> {
103 fn hash<H: Hasher>(&self, state: &mut H) {
104 self.peer.hash(state);
105 self.block.hash(state);
106 self.start.hash(state);
107 }
108}
109
110enum SeenRequestsValue {
112 First,
114 Fulfilled(usize),
116}
117
118pub struct StateRequestHandler<B: BlockT, Client> {
120 client: Arc<Client>,
121 request_receiver: async_channel::Receiver<IncomingRequest>,
122 seen_requests: LruMap<SeenRequestsKey<B>, SeenRequestsValue>,
126}
127
128impl<B, Client> StateRequestHandler<B, Client>
129where
130 B: BlockT,
131 Client: BlockBackend<B> + ProofProvider<B> + Send + Sync + 'static,
132{
133 pub fn new<N: NetworkBackend<B, <B as BlockT>::Hash>>(
135 protocol_id: &ProtocolId,
136 fork_id: Option<&str>,
137 client: Arc<Client>,
138 num_peer_hint: usize,
139 ) -> (Self, N::RequestResponseProtocolConfig) {
140 let capacity = std::cmp::max(num_peer_hint, 1);
143 let (tx, request_receiver) = async_channel::bounded(capacity);
144
145 let protocol_config = generate_protocol_config::<_, B, N>(
146 protocol_id,
147 client
148 .block_hash(0u32.into())
149 .ok()
150 .flatten()
151 .expect("Genesis block exists; qed"),
152 fork_id,
153 tx,
154 );
155
156 let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
157 let seen_requests = LruMap::new(capacity);
158
159 (Self { client, request_receiver, seen_requests }, protocol_config)
160 }
161
162 pub async fn run(mut self) {
164 while let Some(request) = self.request_receiver.next().await {
165 let IncomingRequest { peer, payload, pending_response } = request;
166
167 match self.handle_request(payload, pending_response, &peer) {
168 Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
169 Err(e) => debug!(
170 target: LOG_TARGET,
171 "Failed to handle state request from {}: {}", peer, e,
172 ),
173 }
174 }
175 }
176
177 fn handle_request(
178 &mut self,
179 payload: Vec<u8>,
180 pending_response: oneshot::Sender<OutgoingResponse>,
181 peer: &PeerId,
182 ) -> Result<(), HandleRequestError> {
183 let request = StateRequest::decode(&payload[..])?;
184 let block: B::Hash = Decode::decode(&mut request.block.as_ref())?;
185
186 let key = SeenRequestsKey { peer: *peer, block, start: request.start.clone() };
187
188 let mut reputation_changes = Vec::new();
189
190 match self.seen_requests.get(&key) {
191 Some(SeenRequestsValue::First) => {},
192 Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
193 *requests = requests.saturating_add(1);
194
195 if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
196 reputation_changes.push(rep::SAME_REQUEST);
197 }
198 },
199 None => {
200 self.seen_requests.insert(key.clone(), SeenRequestsValue::First);
201 },
202 }
203
204 trace!(
205 target: LOG_TARGET,
206 "Handling state request from {}: Block {:?}, Starting at {:x?}, no_proof={}",
207 peer,
208 request.block,
209 &request.start,
210 request.no_proof,
211 );
212
213 let result = if reputation_changes.is_empty() {
214 let mut response = StateResponse::default();
215
216 if !request.no_proof {
217 let (proof, _count) = self.client.read_proof_collection(
218 block,
219 request.start.as_slice(),
220 MAX_RESPONSE_BYTES,
221 )?;
222 response.proof = proof.encode();
223 } else {
224 let entries = self.client.storage_collection(
225 block,
226 request.start.as_slice(),
227 MAX_RESPONSE_BYTES,
228 )?;
229 response.entries = entries
230 .into_iter()
231 .map(|(state, complete)| KeyValueStateEntry {
232 state_root: state.state_root,
233 entries: state
234 .key_values
235 .into_iter()
236 .map(|(key, value)| StateEntry { key, value })
237 .collect(),
238 complete,
239 })
240 .collect();
241 }
242
243 trace!(
244 target: LOG_TARGET,
245 "StateResponse contains {} keys, {}, proof nodes, from {:?} to {:?}",
246 response.entries.len(),
247 response.proof.len(),
248 response.entries.get(0).and_then(|top| top
249 .entries
250 .first()
251 .map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key))),
252 response.entries.get(0).and_then(|top| top
253 .entries
254 .last()
255 .map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key))),
256 );
257 if let Some(value) = self.seen_requests.get(&key) {
258 if let SeenRequestsValue::First = value {
261 *value = SeenRequestsValue::Fulfilled(1);
262 }
263 }
264
265 let mut data = Vec::with_capacity(response.encoded_len());
266 response.encode(&mut data)?;
267 Ok(data)
268 } else {
269 Err(())
270 };
271
272 pending_response
273 .send(OutgoingResponse { result, reputation_changes, sent_feedback: None })
274 .map_err(|_| HandleRequestError::SendResponse)
275 }
276}
277
278#[derive(Debug, thiserror::Error)]
279enum HandleRequestError {
280 #[error("Failed to decode request: {0}.")]
281 DecodeProto(#[from] prost::DecodeError),
282
283 #[error("Failed to encode response: {0}.")]
284 EncodeProto(#[from] prost::EncodeError),
285
286 #[error("Failed to decode block hash: {0}.")]
287 InvalidHash(#[from] codec::Error),
288
289 #[error(transparent)]
290 Client(#[from] sp_blockchain::Error),
291
292 #[error("Failed to send response.")]
293 SendResponse,
294}