sc_network_sync/
state_request_handler.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3
4// Substrate is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Substrate is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Helper for handling (i.e. answering) state requests from a remote peer via the
18//! `crate::request_responses::RequestResponsesBehaviour`.
19
20use crate::{
21	schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
22	LOG_TARGET,
23};
24
25use codec::{Decode, Encode};
26use futures::{channel::oneshot, stream::StreamExt};
27use log::{debug, trace};
28use prost::Message;
29use sc_network_types::PeerId;
30use schnellru::{ByLength, LruMap};
31
32use sc_client_api::{BlockBackend, ProofProvider};
33use sc_network::{
34	config::ProtocolId,
35	request_responses::{IncomingRequest, OutgoingResponse},
36	NetworkBackend, MAX_RESPONSE_SIZE,
37};
38use sp_runtime::traits::Block as BlockT;
39
40use std::{
41	hash::{Hash, Hasher},
42	sync::Arc,
43	time::Duration,
44};
45
46const MAX_RESPONSE_BYTES: usize = 2 * 1024 * 1024; // Actual reponse may be bigger.
47const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
48
49mod rep {
50	use sc_network::ReputationChange as Rep;
51
52	/// Reputation change when a peer sent us the same request multiple times.
53	pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
54}
55
56/// Generates a `RequestResponseProtocolConfig` for the state request protocol, refusing incoming
57/// requests.
58pub fn generate_protocol_config<
59	Hash: AsRef<[u8]>,
60	B: BlockT,
61	N: NetworkBackend<B, <B as BlockT>::Hash>,
62>(
63	protocol_id: &ProtocolId,
64	genesis_hash: Hash,
65	fork_id: Option<&str>,
66	inbound_queue: async_channel::Sender<IncomingRequest>,
67) -> N::RequestResponseProtocolConfig {
68	N::request_response_config(
69		generate_protocol_name(genesis_hash, fork_id).into(),
70		std::iter::once(generate_legacy_protocol_name(protocol_id).into()).collect(),
71		1024 * 1024,
72		MAX_RESPONSE_SIZE,
73		Duration::from_secs(40),
74		Some(inbound_queue),
75	)
76}
77
78/// Generate the state protocol name from the genesis hash and fork id.
79fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
80	let genesis_hash = genesis_hash.as_ref();
81	if let Some(fork_id) = fork_id {
82		format!("/{}/{}/state/2", array_bytes::bytes2hex("", genesis_hash), fork_id)
83	} else {
84		format!("/{}/state/2", array_bytes::bytes2hex("", genesis_hash))
85	}
86}
87
88/// Generate the legacy state protocol name from chain specific protocol identifier.
89fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
90	format!("/{}/state/2", protocol_id.as_ref())
91}
92
93/// The key of [`BlockRequestHandler::seen_requests`].
94#[derive(Eq, PartialEq, Clone)]
95struct SeenRequestsKey<B: BlockT> {
96	peer: PeerId,
97	block: B::Hash,
98	start: Vec<Vec<u8>>,
99}
100
101#[allow(clippy::derived_hash_with_manual_eq)]
102impl<B: BlockT> Hash for SeenRequestsKey<B> {
103	fn hash<H: Hasher>(&self, state: &mut H) {
104		self.peer.hash(state);
105		self.block.hash(state);
106		self.start.hash(state);
107	}
108}
109
110/// The value of [`StateRequestHandler::seen_requests`].
111enum SeenRequestsValue {
112	/// First time we have seen the request.
113	First,
114	/// We have fulfilled the request `n` times.
115	Fulfilled(usize),
116}
117
118/// Handler for incoming block requests from a remote peer.
119pub struct StateRequestHandler<B: BlockT, Client> {
120	client: Arc<Client>,
121	request_receiver: async_channel::Receiver<IncomingRequest>,
122	/// Maps from request to number of times we have seen this request.
123	///
124	/// This is used to check if a peer is spamming us with the same request.
125	seen_requests: LruMap<SeenRequestsKey<B>, SeenRequestsValue>,
126}
127
128impl<B, Client> StateRequestHandler<B, Client>
129where
130	B: BlockT,
131	Client: BlockBackend<B> + ProofProvider<B> + Send + Sync + 'static,
132{
133	/// Create a new [`StateRequestHandler`].
134	pub fn new<N: NetworkBackend<B, <B as BlockT>::Hash>>(
135		protocol_id: &ProtocolId,
136		fork_id: Option<&str>,
137		client: Arc<Client>,
138		num_peer_hint: usize,
139	) -> (Self, N::RequestResponseProtocolConfig) {
140		// Reserve enough request slots for one request per peer when we are at the maximum
141		// number of peers.
142		let capacity = std::cmp::max(num_peer_hint, 1);
143		let (tx, request_receiver) = async_channel::bounded(capacity);
144
145		let protocol_config = generate_protocol_config::<_, B, N>(
146			protocol_id,
147			client
148				.block_hash(0u32.into())
149				.ok()
150				.flatten()
151				.expect("Genesis block exists; qed"),
152			fork_id,
153			tx,
154		);
155
156		let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
157		let seen_requests = LruMap::new(capacity);
158
159		(Self { client, request_receiver, seen_requests }, protocol_config)
160	}
161
162	/// Run [`StateRequestHandler`].
163	pub async fn run(mut self) {
164		while let Some(request) = self.request_receiver.next().await {
165			let IncomingRequest { peer, payload, pending_response } = request;
166
167			match self.handle_request(payload, pending_response, &peer) {
168				Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
169				Err(e) => debug!(
170					target: LOG_TARGET,
171					"Failed to handle state request from {}: {}", peer, e,
172				),
173			}
174		}
175	}
176
177	fn handle_request(
178		&mut self,
179		payload: Vec<u8>,
180		pending_response: oneshot::Sender<OutgoingResponse>,
181		peer: &PeerId,
182	) -> Result<(), HandleRequestError> {
183		let request = StateRequest::decode(&payload[..])?;
184		let block: B::Hash = Decode::decode(&mut request.block.as_ref())?;
185
186		let key = SeenRequestsKey { peer: *peer, block, start: request.start.clone() };
187
188		let mut reputation_changes = Vec::new();
189
190		match self.seen_requests.get(&key) {
191			Some(SeenRequestsValue::First) => {},
192			Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
193				*requests = requests.saturating_add(1);
194
195				if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
196					reputation_changes.push(rep::SAME_REQUEST);
197				}
198			},
199			None => {
200				self.seen_requests.insert(key.clone(), SeenRequestsValue::First);
201			},
202		}
203
204		trace!(
205			target: LOG_TARGET,
206			"Handling state request from {}: Block {:?}, Starting at {:x?}, no_proof={}",
207			peer,
208			request.block,
209			&request.start,
210			request.no_proof,
211		);
212
213		let result = if reputation_changes.is_empty() {
214			let mut response = StateResponse::default();
215
216			if !request.no_proof {
217				let (proof, _count) = self.client.read_proof_collection(
218					block,
219					request.start.as_slice(),
220					MAX_RESPONSE_BYTES,
221				)?;
222				response.proof = proof.encode();
223			} else {
224				let entries = self.client.storage_collection(
225					block,
226					request.start.as_slice(),
227					MAX_RESPONSE_BYTES,
228				)?;
229				response.entries = entries
230					.into_iter()
231					.map(|(state, complete)| KeyValueStateEntry {
232						state_root: state.state_root,
233						entries: state
234							.key_values
235							.into_iter()
236							.map(|(key, value)| StateEntry { key, value })
237							.collect(),
238						complete,
239					})
240					.collect();
241			}
242
243			trace!(
244				target: LOG_TARGET,
245				"StateResponse contains {} keys, {}, proof nodes, from {:?} to {:?}",
246				response.entries.len(),
247				response.proof.len(),
248				response.entries.get(0).and_then(|top| top
249					.entries
250					.first()
251					.map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key))),
252				response.entries.get(0).and_then(|top| top
253					.entries
254					.last()
255					.map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key))),
256			);
257			if let Some(value) = self.seen_requests.get(&key) {
258				// If this is the first time we have processed this request, we need to change
259				// it to `Fulfilled`.
260				if let SeenRequestsValue::First = value {
261					*value = SeenRequestsValue::Fulfilled(1);
262				}
263			}
264
265			let mut data = Vec::with_capacity(response.encoded_len());
266			response.encode(&mut data)?;
267			Ok(data)
268		} else {
269			Err(())
270		};
271
272		pending_response
273			.send(OutgoingResponse { result, reputation_changes, sent_feedback: None })
274			.map_err(|_| HandleRequestError::SendResponse)
275	}
276}
277
278#[derive(Debug, thiserror::Error)]
279enum HandleRequestError {
280	#[error("Failed to decode request: {0}.")]
281	DecodeProto(#[from] prost::DecodeError),
282
283	#[error("Failed to encode response: {0}.")]
284	EncodeProto(#[from] prost::EncodeError),
285
286	#[error("Failed to decode block hash: {0}.")]
287	InvalidHash(#[from] codec::Error),
288
289	#[error(transparent)]
290	Client(#[from] sp_blockchain::Error),
291
292	#[error("Failed to send response.")]
293	SendResponse,
294}