referrerpolicy=no-referrer-when-downgrade

sc_network/bitswap/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Substrate is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Substrate is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Substrate. If not, see <https://www.gnu.org/licenses/>.
17
18//! Bitswap server for Substrate.
19//!
20//! Supports querying indexed transactions by hash over the standard bitswap protocol (v1.2.0).
21//! CIDs must reference a supported 256-bit transaction hash.
22
23use crate::{
24	request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
25	types::ProtocolName,
26	MAX_RESPONSE_SIZE,
27};
28
29use cid::{Error as CidError, Version as CidVersion};
30use futures::StreamExt;
31use log::{debug, error, trace};
32use prost::Message;
33use sc_client_api::BlockBackend;
34use sc_network_types::PeerId;
35use schema::bitswap::{
36	message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
37	Message as BitswapMessage,
38};
39use sp_core::H256;
40use sp_runtime::traits::Block as BlockT;
41use std::{io, sync::Arc, time::Duration};
42use unsigned_varint::encode as varint_encode;
43
44/// Bitswap client.
45mod client;
46pub(crate) mod schema;
47
48pub use cid::Cid;
49pub use client::{
50	request_bitswap_blocks, request_bitswap_blocks_unverified, BitswapError, FetchOutcome,
51	BLAKE2B_256_MULTIHASH_CODE, KECCAK_256_MULTIHASH_CODE, SHA2_256_MULTIHASH_CODE,
52};
53
54pub(crate) use schema::bitswap::Message as BitswapProtoMessage;
55
56pub(crate) const LOG_TARGET: &str = "sub-libp2p::bitswap";
57
58// Use the network-wide response cap for Bitswap messages.
59const MAX_PACKET_SIZE: u64 = MAX_RESPONSE_SIZE;
60
61/// Max number of queued responses before denying requests.
62const MAX_REQUEST_QUEUE: usize = 20;
63
64/// Max number of blocks per wantlist.
65pub const MAX_WANTED_BLOCKS: usize = 16;
66
67/// Bitswap protocol name.
68pub(crate) const PROTOCOL_NAME: &str = "/ipfs/bitswap/1.2.0";
69
70/// IPFS raw multicodec used for indexed transaction payload bytes.
71pub const RAW_CODEC: u64 = 0x55;
72
73/// Check if a CID is supported by the bitswap protocol — CIDv1, 32-byte digest, with a
74/// supported multihash code (Blake2b-256, SHA2-256, or Keccak-256).
75pub fn is_cid_supported(cid: &Cid) -> bool {
76	cid.version() != CidVersion::V0 &&
77		cid.hash().size() == 32 &&
78		is_supported_multihash_code(cid.hash().code())
79}
80
81/// Return `true` if `code` is a supported multihash code.
82pub(crate) fn is_supported_multihash_code(code: u64) -> bool {
83	matches!(code, BLAKE2B_256_MULTIHASH_CODE | SHA2_256_MULTIHASH_CODE | KECCAK_256_MULTIHASH_CODE)
84}
85
86/// CID metadata without the actual content bytes.
87#[derive(PartialEq, Eq, Clone, Debug)]
88pub(crate) struct Prefix {
89	/// The version of CID.
90	pub version: CidVersion,
91	/// The codec of CID.
92	pub codec: u64,
93	/// The multihash type of CID.
94	pub mh_type: u64,
95	/// The multihash length of CID.
96	pub mh_len: u8,
97}
98
99impl From<&Cid> for Prefix {
100	fn from(cid: &Cid) -> Self {
101		Self {
102			version: cid.version(),
103			codec: cid.codec(),
104			mh_type: cid.hash().code(),
105			mh_len: cid.hash().size(),
106		}
107	}
108}
109
110impl Prefix {
111	/// Convert the prefix to encoded bytes.
112	pub(crate) fn to_bytes(&self) -> Vec<u8> {
113		let mut res = Vec::with_capacity(4);
114		let mut buf = varint_encode::u64_buffer();
115		let version = varint_encode::u64(self.version.into(), &mut buf);
116		res.extend_from_slice(version);
117		let mut buf = varint_encode::u64_buffer();
118		let codec = varint_encode::u64(self.codec, &mut buf);
119		res.extend_from_slice(codec);
120		let mut buf = varint_encode::u64_buffer();
121		let mh_type = varint_encode::u64(self.mh_type, &mut buf);
122		res.extend_from_slice(mh_type);
123		let mut buf = varint_encode::u64_buffer();
124		let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf);
125		res.extend_from_slice(mh_len);
126		res
127	}
128}
129
130/// Bitswap request handler.
131pub(crate) struct BitswapRequestHandler<B> {
132	client: Arc<dyn BlockBackend<B> + Send + Sync>,
133	request_receiver: async_channel::Receiver<IncomingRequest>,
134}
135
136impl<B: BlockT> BitswapRequestHandler<B> {
137	/// Create a new [`BitswapRequestHandler`].
138	pub(crate) fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
139		let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);
140
141		let config = ProtocolConfig {
142			name: ProtocolName::from(PROTOCOL_NAME),
143			fallback_names: vec![],
144			max_request_size: MAX_PACKET_SIZE,
145			max_response_size: MAX_PACKET_SIZE,
146			request_timeout: Duration::from_secs(15),
147			inbound_queue: Some(tx),
148		};
149
150		(Self { client, request_receiver }, config)
151	}
152
153	/// Run [`BitswapRequestHandler`].
154	pub(crate) async fn run(mut self) {
155		while let Some(request) = self.request_receiver.next().await {
156			let IncomingRequest { peer, payload, pending_response } = request;
157
158			match self.handle_message(&peer, &payload) {
159				Ok(response) => {
160					let response = OutgoingResponse {
161						result: Ok(response),
162						reputation_changes: Vec::new(),
163						sent_feedback: None,
164					};
165
166					match pending_response.send(response) {
167						Ok(()) => {
168							trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",)
169						},
170						Err(_) => debug!(
171							target: LOG_TARGET,
172							"Failed to handle bitswap request from {peer}: {}",
173							RequestHandlerError::SendResponse,
174						),
175					}
176				},
177				Err(err) => {
178					error!(target: LOG_TARGET, "Failed to process request from {peer}: {err}");
179
180					// TODO: adjust reputation?
181
182					let response = OutgoingResponse {
183						result: Err(()),
184						reputation_changes: vec![],
185						sent_feedback: None,
186					};
187
188					if pending_response.send(response).is_err() {
189						debug!(
190							target: LOG_TARGET,
191							"Failed to handle bitswap request from {peer}: {}",
192							RequestHandlerError::SendResponse,
193						);
194					}
195				},
196			}
197		}
198	}
199
200	/// Handle received Bitswap request
201	fn handle_message(
202		&mut self,
203		peer: &PeerId,
204		payload: &[u8],
205	) -> Result<Vec<u8>, RequestHandlerError> {
206		let request = schema::bitswap::Message::decode(payload)?;
207
208		trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
209
210		let mut response = BitswapMessage::default();
211
212		let wantlist = match request.wantlist {
213			Some(wantlist) => wantlist,
214			None => {
215				debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer);
216				return Err(RequestHandlerError::InvalidWantList);
217			},
218		};
219
220		if wantlist.entries.len() > MAX_WANTED_BLOCKS {
221			trace!(target: LOG_TARGET, "Ignored request: too many entries");
222			return Err(RequestHandlerError::TooManyEntries);
223		}
224
225		for entry in wantlist.entries {
226			let cid = match Cid::read_bytes(entry.block.as_slice()) {
227				Ok(cid) => cid,
228				Err(e) => {
229					trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e);
230					continue;
231				},
232			};
233
234			if !is_cid_supported(&cid) {
235				trace!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);
236				continue;
237			}
238
239			let mut hash = H256::default();
240			hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
241			let transaction = match self.client.indexed_transaction(hash) {
242				Ok(ex) => ex,
243				Err(e) => {
244					error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
245					None
246				},
247			};
248
249			match transaction {
250				Some(transaction) => {
251					trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
252
253					if entry.want_type == WantType::Block as i32 {
254						let prefix: Prefix = (&cid).into();
255						response
256							.payload
257							.push(MessageBlock { prefix: prefix.to_bytes(), data: transaction });
258					} else {
259						response.block_presences.push(BlockPresence {
260							r#type: BlockPresenceType::Have as i32,
261							cid: cid.to_bytes(),
262						});
263					}
264				},
265				None => {
266					trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash);
267
268					if entry.send_dont_have {
269						response.block_presences.push(BlockPresence {
270							r#type: BlockPresenceType::DontHave as i32,
271							cid: cid.to_bytes(),
272						});
273					}
274				},
275			}
276		}
277
278		Ok(response.encode_to_vec())
279	}
280}
281
282/// Bitswap protocol error.
283#[derive(Debug, thiserror::Error)]
284enum RequestHandlerError {
285	/// Protobuf decoding error.
286	#[error("Failed to decode request: {0}.")]
287	DecodeProto(#[from] prost::DecodeError),
288
289	/// Protobuf encoding error.
290	#[error("Failed to encode response: {0}.")]
291	EncodeProto(#[from] prost::EncodeError),
292
293	/// Client backend error.
294	#[error(transparent)]
295	Client(#[from] sp_blockchain::Error),
296
297	/// Error parsing CID
298	#[error(transparent)]
299	BadCid(#[from] CidError),
300
301	/// Packet read error.
302	#[error(transparent)]
303	Read(#[from] io::Error),
304
305	/// Error sending response.
306	#[error("Failed to send response.")]
307	SendResponse,
308
309	/// Message doesn't have a WANT list.
310	#[error("Invalid WANT list.")]
311	InvalidWantList,
312
313	/// Too many blocks requested.
314	#[error("Too many block entries in the request.")]
315	TooManyEntries,
316}
317
318#[cfg(test)]
319mod tests {
320	use super::*;
321	use futures::channel::oneshot;
322	use litep2p::types::multihash::Code as LiteP2pCode;
323	use sc_block_builder::BlockBuilderBuilder;
324	use schema::bitswap::{
325		message::{wantlist::Entry, Wantlist},
326		Message as BitswapMessage,
327	};
328	use sp_consensus::BlockOrigin;
329	use sp_runtime::codec::Encode;
330	use substrate_test_runtime::ExtrinsicBuilder;
331	use substrate_test_runtime_client::{self, prelude::*, TestClientBuilder};
332
333	#[tokio::test]
334	async fn undecodable_message() {
335		let client = substrate_test_runtime_client::new();
336		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
337
338		tokio::spawn(async move { bitswap.run().await });
339
340		let (tx, rx) = oneshot::channel();
341		config
342			.inbound_queue
343			.unwrap()
344			.send(IncomingRequest {
345				peer: PeerId::random(),
346				payload: vec![0x13, 0x37, 0x13, 0x38],
347				pending_response: tx,
348			})
349			.await
350			.unwrap();
351
352		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
353			assert_eq!(result, Err(()));
354			assert_eq!(reputation_changes, Vec::new());
355			assert!(sent_feedback.is_none());
356		} else {
357			panic!("invalid event received");
358		}
359	}
360
361	#[tokio::test]
362	async fn empty_want_list() {
363		let client = substrate_test_runtime_client::new();
364		let (bitswap, mut config) = BitswapRequestHandler::new(Arc::new(client));
365
366		tokio::spawn(async move { bitswap.run().await });
367
368		let (tx, rx) = oneshot::channel();
369		config
370			.inbound_queue
371			.as_mut()
372			.unwrap()
373			.send(IncomingRequest {
374				peer: PeerId::random(),
375				payload: BitswapMessage { wantlist: None, ..Default::default() }.encode_to_vec(),
376				pending_response: tx,
377			})
378			.await
379			.unwrap();
380
381		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
382			assert_eq!(result, Err(()));
383			assert_eq!(reputation_changes, Vec::new());
384			assert!(sent_feedback.is_none());
385		} else {
386			panic!("invalid event received");
387		}
388
389		// Empty WANT list should not cause an error
390		let (tx, rx) = oneshot::channel();
391		config
392			.inbound_queue
393			.unwrap()
394			.send(IncomingRequest {
395				peer: PeerId::random(),
396				payload: BitswapMessage {
397					wantlist: Some(Default::default()),
398					..Default::default()
399				}
400				.encode_to_vec(),
401				pending_response: tx,
402			})
403			.await
404			.unwrap();
405
406		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
407			assert_eq!(result, Ok(BitswapMessage::default().encode_to_vec()));
408			assert_eq!(reputation_changes, Vec::new());
409			assert!(sent_feedback.is_none());
410		} else {
411			panic!("invalid event received");
412		}
413	}
414
415	#[tokio::test]
416	async fn too_long_want_list() {
417		let client = substrate_test_runtime_client::new();
418		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
419
420		tokio::spawn(async move { bitswap.run().await });
421
422		let (tx, rx) = oneshot::channel();
423		config
424			.inbound_queue
425			.unwrap()
426			.send(IncomingRequest {
427				peer: PeerId::random(),
428				payload: BitswapMessage {
429					wantlist: Some(Wantlist {
430						entries: (0..MAX_WANTED_BLOCKS + 1)
431							.map(|_| Entry::default())
432							.collect::<Vec<_>>(),
433						full: false,
434					}),
435					..Default::default()
436				}
437				.encode_to_vec(),
438				pending_response: tx,
439			})
440			.await
441			.unwrap();
442
443		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
444			assert_eq!(result, Err(()));
445			assert_eq!(reputation_changes, Vec::new());
446			assert!(sent_feedback.is_none());
447		} else {
448			panic!("invalid event received");
449		}
450	}
451
452	#[tokio::test]
453	async fn transaction_not_found() {
454		let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
455
456		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
457		tokio::spawn(async move { bitswap.run().await });
458
459		let (tx, rx) = oneshot::channel();
460		config
461			.inbound_queue
462			.unwrap()
463			.send(IncomingRequest {
464				peer: PeerId::random(),
465				payload: BitswapMessage {
466					wantlist: Some(Wantlist {
467						entries: vec![Entry {
468							block: cid::Cid::new_v1(
469								0x70,
470								cid::multihash::Multihash::wrap(
471									u64::from(LiteP2pCode::Blake2b256),
472									&[0u8; 32],
473								)
474								.unwrap(),
475							)
476							.to_bytes(),
477							..Default::default()
478						}],
479						full: false,
480					}),
481					..Default::default()
482				}
483				.encode_to_vec(),
484				pending_response: tx,
485			})
486			.await
487			.unwrap();
488
489		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
490			assert_eq!(result, Ok(vec![]));
491			assert_eq!(reputation_changes, Vec::new());
492			assert!(sent_feedback.is_none());
493		} else {
494			panic!("invalid event received");
495		}
496	}
497
498	#[tokio::test]
499	async fn transaction_found() {
500		let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
501		let mut block_builder = BlockBuilderBuilder::new(&client)
502			.on_parent_block(client.chain_info().genesis_hash)
503			.with_parent_block_number(0)
504			.build()
505			.unwrap();
506
507		// encoded extrinsic: [161, .. , 2, 6, 16, 19, 55, 19, 56]
508		let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
509		let pattern_index = ext.encoded_size() - 4;
510
511		block_builder.push(ext.clone()).unwrap();
512		let block = block_builder.build().unwrap().block;
513
514		client.import(BlockOrigin::File, block).await.unwrap();
515
516		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
517
518		tokio::spawn(async move { bitswap.run().await });
519
520		let (tx, rx) = oneshot::channel();
521		config
522			.inbound_queue
523			.unwrap()
524			.send(IncomingRequest {
525				peer: PeerId::random(),
526				payload: BitswapMessage {
527					wantlist: Some(Wantlist {
528						entries: vec![Entry {
529							block: cid::Cid::new_v1(
530								0x70,
531								cid::multihash::Multihash::wrap(
532									u64::from(LiteP2pCode::Blake2b256),
533									&sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
534								)
535								.unwrap(),
536							)
537							.to_bytes(),
538							..Default::default()
539						}],
540						full: false,
541					}),
542					..Default::default()
543				}
544				.encode_to_vec(),
545				pending_response: tx,
546			})
547			.await
548			.unwrap();
549
550		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
551			assert_eq!(reputation_changes, Vec::new());
552			assert!(sent_feedback.is_none());
553
554			let response =
555				schema::bitswap::Message::decode(&result.expect("fetch to succeed")[..]).unwrap();
556			assert_eq!(response.payload[0].data, vec![0x13, 0x37, 0x13, 0x38]);
557		} else {
558			panic!("invalid event received");
559		}
560	}
561
562	#[tokio::test]
563	async fn transaction_not_found_sends_dont_have_when_requested() {
564		let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
565		let (mut bitswap, _config) = BitswapRequestHandler::new(Arc::new(client));
566		let cid = cid::Cid::new_v1(
567			0x70,
568			cid::multihash::Multihash::wrap(u64::from(LiteP2pCode::Blake2b256), &[0u8; 32])
569				.unwrap(),
570		);
571		let request = BitswapMessage {
572			wantlist: Some(Wantlist {
573				entries: vec![Entry {
574					block: cid.to_bytes(),
575					send_dont_have: true,
576					..Default::default()
577				}],
578				full: false,
579			}),
580			..Default::default()
581		}
582		.encode_to_vec();
583
584		let response = BitswapMessage::decode(
585			bitswap.handle_message(&PeerId::random(), &request).unwrap().as_slice(),
586		)
587		.unwrap();
588
589		assert!(response.payload.is_empty());
590		assert_eq!(response.block_presences.len(), 1);
591		assert_eq!(response.block_presences[0].cid, cid.to_bytes());
592		assert_eq!(response.block_presences[0].r#type, BlockPresenceType::DontHave as i32);
593	}
594
595	#[tokio::test]
596	async fn transaction_found_sends_have_for_want_have() {
597		let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
598		let mut block_builder = BlockBuilderBuilder::new(&client)
599			.on_parent_block(client.chain_info().genesis_hash)
600			.with_parent_block_number(0)
601			.build()
602			.unwrap();
603
604		let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
605		let pattern_index = ext.encoded_size() - 4;
606		let cid = cid::Cid::new_v1(
607			0x70,
608			cid::multihash::Multihash::wrap(
609				u64::from(LiteP2pCode::Blake2b256),
610				&sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
611			)
612			.unwrap(),
613		);
614
615		block_builder.push(ext).unwrap();
616		let block = block_builder.build().unwrap().block;
617		client.import(BlockOrigin::File, block).await.unwrap();
618
619		let (mut bitswap, _config) = BitswapRequestHandler::new(Arc::new(client));
620		let request = BitswapMessage {
621			wantlist: Some(Wantlist {
622				entries: vec![Entry {
623					block: cid.to_bytes(),
624					want_type: WantType::Have as i32,
625					..Default::default()
626				}],
627				full: false,
628			}),
629			..Default::default()
630		}
631		.encode_to_vec();
632
633		let response = BitswapMessage::decode(
634			bitswap.handle_message(&PeerId::random(), &request).unwrap().as_slice(),
635		)
636		.unwrap();
637
638		assert!(response.payload.is_empty());
639		assert_eq!(response.block_presences.len(), 1);
640		assert_eq!(response.block_presences[0].cid, cid.to_bytes());
641		assert_eq!(response.block_presences[0].r#type, BlockPresenceType::Have as i32);
642	}
643
644	#[test]
645	fn is_cid_supported_accepts_all_three_supported_hashings() {
646		use cid::multihash::Multihash;
647		for multihash_code in
648			[BLAKE2B_256_MULTIHASH_CODE, SHA2_256_MULTIHASH_CODE, KECCAK_256_MULTIHASH_CODE]
649		{
650			let digest = [9u8; 32];
651			let mh = Multihash::<64>::wrap(multihash_code, &digest).unwrap();
652			let cid = Cid::new_v1(RAW_CODEC, mh);
653			assert!(is_cid_supported(&cid), "{multihash_code} CID should be supported");
654		}
655	}
656
657	#[test]
658	fn is_cid_supported_rejects_unknown_multihash_code() {
659		use cid::multihash::Multihash;
660		let digest = [9u8; 32];
661		let mh = Multihash::<64>::wrap(0x99, &digest).unwrap();
662		let cid = Cid::new_v1(RAW_CODEC, mh);
663		assert!(!is_cid_supported(&cid));
664	}
665}