sc_network_sync/
block_request_handler.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3
4// Substrate is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Substrate is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Helper for handling (i.e. answering) block requests from a remote peer via the
18//! `crate::request_responses::RequestResponsesBehaviour`.
19
20use crate::{
21	block_relay_protocol::{BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer},
22	schema::v1::{
23		block_request::FromBlock as FromBlockSchema, BlockRequest as BlockRequestSchema,
24		BlockResponse as BlockResponseSchema, BlockResponse, Direction,
25	},
26	service::network::NetworkServiceHandle,
27	LOG_TARGET,
28};
29
30use codec::{Decode, DecodeAll, Encode};
31use futures::{channel::oneshot, stream::StreamExt};
32use log::debug;
33use prost::Message;
34use schnellru::{ByLength, LruMap};
35
36use sc_client_api::BlockBackend;
37use sc_network::{
38	config::ProtocolId,
39	request_responses::{IfDisconnected, IncomingRequest, OutgoingResponse, RequestFailure},
40	service::traits::RequestResponseConfig,
41	types::ProtocolName,
42	NetworkBackend, MAX_RESPONSE_SIZE,
43};
44use sc_network_common::sync::message::{BlockAttributes, BlockData, BlockRequest, FromBlock};
45use sc_network_types::PeerId;
46use sp_blockchain::HeaderBackend;
47use sp_runtime::{
48	generic::BlockId,
49	traits::{Block as BlockT, Header, One, Zero},
50};
51
52use std::{
53	cmp::min,
54	hash::{Hash, Hasher},
55	sync::Arc,
56	time::Duration,
57};
58
59/// Maximum blocks per response.
60pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128;
61
62const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
63const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
64
65mod rep {
66	use sc_network::ReputationChange as Rep;
67
68	/// Reputation change when a peer sent us the same request multiple times.
69	pub const SAME_REQUEST: Rep = Rep::new_fatal("Same block request multiple times");
70
71	/// Reputation change when a peer sent us the same "small" request multiple times.
72	pub const SAME_SMALL_REQUEST: Rep =
73		Rep::new(-(1 << 10), "same small block request multiple times");
74}
75
76/// Generates a `RequestResponseProtocolConfig` for the block request protocol,
77/// refusing incoming requests.
78pub fn generate_protocol_config<
79	Hash: AsRef<[u8]>,
80	B: BlockT,
81	N: NetworkBackend<B, <B as BlockT>::Hash>,
82>(
83	protocol_id: &ProtocolId,
84	genesis_hash: Hash,
85	fork_id: Option<&str>,
86	inbound_queue: async_channel::Sender<IncomingRequest>,
87) -> N::RequestResponseProtocolConfig {
88	N::request_response_config(
89		generate_protocol_name(genesis_hash, fork_id).into(),
90		std::iter::once(generate_legacy_protocol_name(protocol_id).into()).collect(),
91		1024 * 1024,
92		MAX_RESPONSE_SIZE,
93		Duration::from_secs(20),
94		Some(inbound_queue),
95	)
96}
97
98/// Generate the block protocol name from the genesis hash and fork id.
99fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
100	let genesis_hash = genesis_hash.as_ref();
101	if let Some(fork_id) = fork_id {
102		format!("/{}/{}/sync/2", array_bytes::bytes2hex("", genesis_hash), fork_id)
103	} else {
104		format!("/{}/sync/2", array_bytes::bytes2hex("", genesis_hash))
105	}
106}
107
108/// Generate the legacy block protocol name from chain specific protocol identifier.
109fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
110	format!("/{}/sync/2", protocol_id.as_ref())
111}
112
113/// The key of [`BlockRequestHandler::seen_requests`].
114#[derive(Eq, PartialEq, Clone)]
115struct SeenRequestsKey<B: BlockT> {
116	peer: PeerId,
117	from: BlockId<B>,
118	max_blocks: usize,
119	direction: Direction,
120	attributes: BlockAttributes,
121	support_multiple_justifications: bool,
122}
123
124#[allow(clippy::derived_hash_with_manual_eq)]
125impl<B: BlockT> Hash for SeenRequestsKey<B> {
126	fn hash<H: Hasher>(&self, state: &mut H) {
127		self.peer.hash(state);
128		self.max_blocks.hash(state);
129		self.direction.hash(state);
130		self.attributes.hash(state);
131		self.support_multiple_justifications.hash(state);
132		match self.from {
133			BlockId::Hash(h) => h.hash(state),
134			BlockId::Number(n) => n.hash(state),
135		}
136	}
137}
138
139/// The value of [`BlockRequestHandler::seen_requests`].
140enum SeenRequestsValue {
141	/// First time we have seen the request.
142	First,
143	/// We have fulfilled the request `n` times.
144	Fulfilled(usize),
145}
146
147/// The full block server implementation of [`BlockServer`]. It handles
148/// the incoming block requests from a remote peer.
149pub struct BlockRequestHandler<B: BlockT, Client> {
150	client: Arc<Client>,
151	request_receiver: async_channel::Receiver<IncomingRequest>,
152	/// Maps from request to number of times we have seen this request.
153	///
154	/// This is used to check if a peer is spamming us with the same request.
155	seen_requests: LruMap<SeenRequestsKey<B>, SeenRequestsValue>,
156}
157
158impl<B, Client> BlockRequestHandler<B, Client>
159where
160	B: BlockT,
161	Client: HeaderBackend<B> + BlockBackend<B> + Send + Sync + 'static,
162{
163	/// Create a new [`BlockRequestHandler`].
164	pub fn new<N: NetworkBackend<B, <B as BlockT>::Hash>>(
165		network: NetworkServiceHandle,
166		protocol_id: &ProtocolId,
167		fork_id: Option<&str>,
168		client: Arc<Client>,
169		num_peer_hint: usize,
170	) -> BlockRelayParams<B, N> {
171		// Reserve enough request slots for one request per peer when we are at the maximum
172		// number of peers.
173		let capacity = std::cmp::max(num_peer_hint, 1);
174		let (tx, request_receiver) = async_channel::bounded(capacity);
175
176		let protocol_config = generate_protocol_config::<_, B, N>(
177			protocol_id,
178			client
179				.block_hash(0u32.into())
180				.ok()
181				.flatten()
182				.expect("Genesis block exists; qed"),
183			fork_id,
184			tx,
185		);
186
187		let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
188		let seen_requests = LruMap::new(capacity);
189
190		BlockRelayParams {
191			server: Box::new(Self { client, request_receiver, seen_requests }),
192			downloader: Arc::new(FullBlockDownloader::new(
193				protocol_config.protocol_name().clone(),
194				network,
195			)),
196			request_response_config: protocol_config,
197		}
198	}
199
200	/// Run [`BlockRequestHandler`].
201	async fn process_requests(&mut self) {
202		while let Some(request) = self.request_receiver.next().await {
203			let IncomingRequest { peer, payload, pending_response } = request;
204
205			match self.handle_request(payload, pending_response, &peer) {
206				Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
207				Err(e) => debug!(
208					target: LOG_TARGET,
209					"Failed to handle block request from {}: {}", peer, e,
210				),
211			}
212		}
213	}
214
215	fn handle_request(
216		&mut self,
217		payload: Vec<u8>,
218		pending_response: oneshot::Sender<OutgoingResponse>,
219		peer: &PeerId,
220	) -> Result<(), HandleRequestError> {
221		let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;
222
223		let from_block_id = match request.from_block.ok_or(HandleRequestError::MissingFromField)? {
224			FromBlockSchema::Hash(ref h) => {
225				let h = Decode::decode(&mut h.as_ref())?;
226				BlockId::<B>::Hash(h)
227			},
228			FromBlockSchema::Number(ref n) => {
229				let n = Decode::decode(&mut n.as_ref())?;
230				BlockId::<B>::Number(n)
231			},
232		};
233
234		let max_blocks = if request.max_blocks == 0 {
235			MAX_BLOCKS_IN_RESPONSE
236		} else {
237			min(request.max_blocks as usize, MAX_BLOCKS_IN_RESPONSE)
238		};
239
240		let direction =
241			i32::try_into(request.direction).map_err(|_| HandleRequestError::ParseDirection)?;
242
243		let attributes = BlockAttributes::from_be_u32(request.fields)?;
244
245		let support_multiple_justifications = request.support_multiple_justifications;
246
247		let key = SeenRequestsKey {
248			peer: *peer,
249			max_blocks,
250			direction,
251			from: from_block_id,
252			attributes,
253			support_multiple_justifications,
254		};
255
256		let mut reputation_change = None;
257
258		let small_request = attributes
259			.difference(BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION)
260			.is_empty();
261
262		match self.seen_requests.get(&key) {
263			Some(SeenRequestsValue::First) => {},
264			Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
265				*requests = requests.saturating_add(1);
266
267				if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
268					reputation_change = Some(if small_request {
269						rep::SAME_SMALL_REQUEST
270					} else {
271						rep::SAME_REQUEST
272					});
273				}
274			},
275			None => {
276				self.seen_requests.insert(key.clone(), SeenRequestsValue::First);
277			},
278		}
279
280		debug!(
281			target: LOG_TARGET,
282			"Handling block request from {peer}: Starting at `{from_block_id:?}` with \
283			maximum blocks of `{max_blocks}`, reputation_change: `{reputation_change:?}`, \
284			small_request `{small_request:?}`, direction `{direction:?}` and \
285			attributes `{attributes:?}`.",
286		);
287
288		let maybe_block_response = if reputation_change.is_none() || small_request {
289			let block_response = self.get_block_response(
290				attributes,
291				from_block_id,
292				direction,
293				max_blocks,
294				support_multiple_justifications,
295			)?;
296
297			// If any of the blocks contains any data, we can consider it as successful request.
298			if block_response
299				.blocks
300				.iter()
301				.any(|b| !b.header.is_empty() || !b.body.is_empty() || b.is_empty_justification)
302			{
303				if let Some(value) = self.seen_requests.get(&key) {
304					// If this is the first time we have processed this request, we need to change
305					// it to `Fulfilled`.
306					if let SeenRequestsValue::First = value {
307						*value = SeenRequestsValue::Fulfilled(1);
308					}
309				}
310			}
311
312			Some(block_response)
313		} else {
314			None
315		};
316
317		debug!(
318			target: LOG_TARGET,
319			"Sending result of block request from {peer} starting at `{from_block_id:?}`: \
320			blocks: {:?}, data: {:?}",
321			maybe_block_response.as_ref().map(|res| res.blocks.len()),
322			maybe_block_response.as_ref().map(|res| res.encoded_len()),
323		);
324
325		let result = if let Some(block_response) = maybe_block_response {
326			let mut data = Vec::with_capacity(block_response.encoded_len());
327			block_response.encode(&mut data)?;
328			Ok(data)
329		} else {
330			Err(())
331		};
332
333		pending_response
334			.send(OutgoingResponse {
335				result,
336				reputation_changes: reputation_change.into_iter().collect(),
337				sent_feedback: None,
338			})
339			.map_err(|_| HandleRequestError::SendResponse)
340	}
341
342	fn get_block_response(
343		&self,
344		attributes: BlockAttributes,
345		mut block_id: BlockId<B>,
346		direction: Direction,
347		max_blocks: usize,
348		support_multiple_justifications: bool,
349	) -> Result<BlockResponse, HandleRequestError> {
350		let get_header = attributes.contains(BlockAttributes::HEADER);
351		let get_body = attributes.contains(BlockAttributes::BODY);
352		let get_indexed_body = attributes.contains(BlockAttributes::INDEXED_BODY);
353		let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);
354
355		let mut blocks = Vec::new();
356
357		let mut total_size: usize = 0;
358
359		let client_header_from_block_id =
360			|block_id: BlockId<B>| -> Result<Option<B::Header>, HandleRequestError> {
361				if let Some(hash) = self.client.block_hash_from_id(&block_id)? {
362					return self.client.header(hash).map_err(Into::into)
363				}
364				Ok(None)
365			};
366
367		while let Some(header) = client_header_from_block_id(block_id).unwrap_or_default() {
368			let number = *header.number();
369			let hash = header.hash();
370			let parent_hash = *header.parent_hash();
371			let justifications =
372				if get_justification { self.client.justifications(hash)? } else { None };
373
374			let (justifications, justification, is_empty_justification) =
375				if support_multiple_justifications {
376					let justifications = match justifications {
377						Some(v) => v.encode(),
378						None => Vec::new(),
379					};
380					(justifications, Vec::new(), false)
381				} else {
382					// For now we keep compatibility by selecting precisely the GRANDPA one, and not
383					// just the first one. When sending we could have just taken the first one,
384					// since we don't expect there to be any other kind currently, but when
385					// receiving we need to add the engine ID tag.
386					// The ID tag is hardcoded here to avoid depending on the GRANDPA crate, and
387					// will be removed once we remove the backwards compatibility.
388					// See: https://github.com/paritytech/substrate/issues/8172
389					let justification =
390						justifications.and_then(|just| just.into_justification(*b"FRNK"));
391
392					let is_empty_justification =
393						justification.as_ref().map(|j| j.is_empty()).unwrap_or(false);
394
395					let justification = justification.unwrap_or_default();
396
397					(Vec::new(), justification, is_empty_justification)
398				};
399
400			let body = if get_body {
401				match self.client.block_body(hash)? {
402					Some(mut extrinsics) =>
403						extrinsics.iter_mut().map(|extrinsic| extrinsic.encode()).collect(),
404					None => {
405						log::trace!(target: LOG_TARGET, "Missing data for block request.");
406						break
407					},
408				}
409			} else {
410				Vec::new()
411			};
412
413			let indexed_body = if get_indexed_body {
414				match self.client.block_indexed_body(hash)? {
415					Some(transactions) => transactions,
416					None => {
417						log::trace!(
418							target: LOG_TARGET,
419							"Missing indexed block data for block request."
420						);
421						// If the indexed body is missing we still continue returning headers.
422						// Ideally `None` should distinguish a missing body from the empty body,
423						// but the current protobuf based protocol does not allow it.
424						Vec::new()
425					},
426				}
427			} else {
428				Vec::new()
429			};
430
431			let block_data = crate::schema::v1::BlockData {
432				hash: hash.encode(),
433				header: if get_header { header.encode() } else { Vec::new() },
434				body,
435				receipt: Vec::new(),
436				message_queue: Vec::new(),
437				justification,
438				is_empty_justification,
439				justifications,
440				indexed_body,
441			};
442
443			let new_total_size = total_size +
444				block_data.body.iter().map(|ex| ex.len()).sum::<usize>() +
445				block_data.indexed_body.iter().map(|ex| ex.len()).sum::<usize>();
446
447			// Send at least one block, but make sure to not exceed the limit.
448			if !blocks.is_empty() && new_total_size > MAX_BODY_BYTES {
449				break
450			}
451
452			total_size = new_total_size;
453
454			blocks.push(block_data);
455
456			if blocks.len() >= max_blocks as usize {
457				break
458			}
459
460			match direction {
461				Direction::Ascending => block_id = BlockId::Number(number + One::one()),
462				Direction::Descending => {
463					if number.is_zero() {
464						break
465					}
466					block_id = BlockId::Hash(parent_hash)
467				},
468			}
469		}
470
471		Ok(BlockResponse { blocks })
472	}
473}
474
475#[async_trait::async_trait]
476impl<B, Client> BlockServer<B> for BlockRequestHandler<B, Client>
477where
478	B: BlockT,
479	Client: HeaderBackend<B> + BlockBackend<B> + Send + Sync + 'static,
480{
481	async fn run(&mut self) {
482		self.process_requests().await;
483	}
484}
485
486#[derive(Debug, thiserror::Error)]
487enum HandleRequestError {
488	#[error("Failed to decode request: {0}.")]
489	DecodeProto(#[from] prost::DecodeError),
490	#[error("Failed to encode response: {0}.")]
491	EncodeProto(#[from] prost::EncodeError),
492	#[error("Failed to decode block hash: {0}.")]
493	DecodeScale(#[from] codec::Error),
494	#[error("Missing `BlockRequest::from_block` field.")]
495	MissingFromField,
496	#[error("Failed to parse BlockRequest::direction.")]
497	ParseDirection,
498	#[error(transparent)]
499	Client(#[from] sp_blockchain::Error),
500	#[error("Failed to send response.")]
501	SendResponse,
502}
503
504/// The full block downloader implementation of [`BlockDownloader].
505pub struct FullBlockDownloader {
506	protocol_name: ProtocolName,
507	network: NetworkServiceHandle,
508}
509
510impl FullBlockDownloader {
511	fn new(protocol_name: ProtocolName, network: NetworkServiceHandle) -> Self {
512		Self { protocol_name, network }
513	}
514
515	/// Extracts the blocks from the response schema.
516	fn blocks_from_schema<B: BlockT>(
517		&self,
518		request: &BlockRequest<B>,
519		response: BlockResponseSchema,
520	) -> Result<Vec<BlockData<B>>, String> {
521		response
522			.blocks
523			.into_iter()
524			.map(|block_data| {
525				Ok(BlockData::<B> {
526					hash: Decode::decode(&mut block_data.hash.as_ref())?,
527					header: if !block_data.header.is_empty() {
528						Some(Decode::decode(&mut block_data.header.as_ref())?)
529					} else {
530						None
531					},
532					body: if request.fields.contains(BlockAttributes::BODY) {
533						Some(
534							block_data
535								.body
536								.iter()
537								.map(|body| Decode::decode(&mut body.as_ref()))
538								.collect::<Result<Vec<_>, _>>()?,
539						)
540					} else {
541						None
542					},
543					indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) {
544						Some(block_data.indexed_body)
545					} else {
546						None
547					},
548					receipt: if !block_data.receipt.is_empty() {
549						Some(block_data.receipt)
550					} else {
551						None
552					},
553					message_queue: if !block_data.message_queue.is_empty() {
554						Some(block_data.message_queue)
555					} else {
556						None
557					},
558					justification: if !block_data.justification.is_empty() {
559						Some(block_data.justification)
560					} else if block_data.is_empty_justification {
561						Some(Vec::new())
562					} else {
563						None
564					},
565					justifications: if !block_data.justifications.is_empty() {
566						Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?)
567					} else {
568						None
569					},
570				})
571			})
572			.collect::<Result<_, _>>()
573			.map_err(|error: codec::Error| error.to_string())
574	}
575}
576
577#[async_trait::async_trait]
578impl<B: BlockT> BlockDownloader<B> for FullBlockDownloader {
579	async fn download_blocks(
580		&self,
581		who: PeerId,
582		request: BlockRequest<B>,
583	) -> Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled> {
584		// Build the request protobuf.
585		let bytes = BlockRequestSchema {
586			fields: request.fields.to_be_u32(),
587			from_block: match request.from {
588				FromBlock::Hash(h) => Some(FromBlockSchema::Hash(h.encode())),
589				FromBlock::Number(n) => Some(FromBlockSchema::Number(n.encode())),
590			},
591			direction: request.direction as i32,
592			max_blocks: request.max.unwrap_or(0),
593			support_multiple_justifications: true,
594		}
595		.encode_to_vec();
596
597		let (tx, rx) = oneshot::channel();
598		self.network.start_request(
599			who,
600			self.protocol_name.clone(),
601			bytes,
602			tx,
603			IfDisconnected::ImmediateError,
604		);
605		rx.await
606	}
607
608	fn block_response_into_blocks(
609		&self,
610		request: &BlockRequest<B>,
611		response: Vec<u8>,
612	) -> Result<Vec<BlockData<B>>, BlockResponseError> {
613		// Decode the response protobuf
614		let response_schema = BlockResponseSchema::decode(response.as_slice())
615			.map_err(|error| BlockResponseError::DecodeFailed(error.to_string()))?;
616
617		// Extract the block data from the protobuf
618		self.blocks_from_schema::<B>(request, response_schema)
619			.map_err(|error| BlockResponseError::ExtractionFailed(error.to_string()))
620	}
621}