referrerpolicy=no-referrer-when-downgrade

sc_network/bitswap/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Substrate is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Substrate is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Substrate. If not, see <https://www.gnu.org/licenses/>.
17
18//! Bitswap server for Substrate.
19//!
20//! Allows querying transactions by hash over standard bitswap protocol
21//! Only supports bitswap 1.2.0.
22//! CID is expected to reference 256-bit Blake2b transaction hash.
23
24use crate::{
25	request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
26	types::ProtocolName,
27	MAX_RESPONSE_SIZE,
28};
29
30use cid::{self, Version};
31use futures::StreamExt;
32use log::{debug, error, trace};
33use prost::Message;
34use sc_client_api::BlockBackend;
35use sc_network_types::PeerId;
36use schema::bitswap::{
37	message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
38	Message as BitswapMessage,
39};
40use sp_runtime::traits::Block as BlockT;
41use std::{io, sync::Arc, time::Duration};
42use unsigned_varint::encode as varint_encode;
43
44mod schema;
45
46const LOG_TARGET: &str = "bitswap";
47
48// Undocumented, but according to JS the bitswap messages have a max size of 512*1024 bytes
49// https://github.com/ipfs/js-ipfs-bitswap/blob/
50// d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16
51// We set it to the same value as max substrate protocol message
52const MAX_PACKET_SIZE: u64 = MAX_RESPONSE_SIZE;
53
54/// Max number of queued responses before denying requests.
55const MAX_REQUEST_QUEUE: usize = 20;
56
57/// Max number of blocks per wantlist
58const MAX_WANTED_BLOCKS: usize = 16;
59
60/// Bitswap protocol name
61const PROTOCOL_NAME: &'static str = "/ipfs/bitswap/1.2.0";
62
63/// Prefix represents all metadata of a CID, without the actual content.
64#[derive(PartialEq, Eq, Clone, Debug)]
65struct Prefix {
66	/// The version of CID.
67	pub version: Version,
68	/// The codec of CID.
69	pub codec: u64,
70	/// The multihash type of CID.
71	pub mh_type: u64,
72	/// The multihash length of CID.
73	pub mh_len: u8,
74}
75
76impl Prefix {
77	/// Convert the prefix to encoded bytes.
78	pub fn to_bytes(&self) -> Vec<u8> {
79		let mut res = Vec::with_capacity(4);
80		let mut buf = varint_encode::u64_buffer();
81		let version = varint_encode::u64(self.version.into(), &mut buf);
82		res.extend_from_slice(version);
83		let mut buf = varint_encode::u64_buffer();
84		let codec = varint_encode::u64(self.codec, &mut buf);
85		res.extend_from_slice(codec);
86		let mut buf = varint_encode::u64_buffer();
87		let mh_type = varint_encode::u64(self.mh_type, &mut buf);
88		res.extend_from_slice(mh_type);
89		let mut buf = varint_encode::u64_buffer();
90		let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf);
91		res.extend_from_slice(mh_len);
92		res
93	}
94}
95
96/// Bitswap request handler
97pub struct BitswapRequestHandler<B> {
98	client: Arc<dyn BlockBackend<B> + Send + Sync>,
99	request_receiver: async_channel::Receiver<IncomingRequest>,
100}
101
102impl<B: BlockT> BitswapRequestHandler<B> {
103	/// Create a new [`BitswapRequestHandler`].
104	pub fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
105		let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);
106
107		let config = ProtocolConfig {
108			name: ProtocolName::from(PROTOCOL_NAME),
109			fallback_names: vec![],
110			max_request_size: MAX_PACKET_SIZE,
111			max_response_size: MAX_PACKET_SIZE,
112			request_timeout: Duration::from_secs(15),
113			inbound_queue: Some(tx),
114		};
115
116		(Self { client, request_receiver }, config)
117	}
118
119	/// Run [`BitswapRequestHandler`].
120	pub async fn run(mut self) {
121		while let Some(request) = self.request_receiver.next().await {
122			let IncomingRequest { peer, payload, pending_response } = request;
123
124			match self.handle_message(&peer, &payload) {
125				Ok(response) => {
126					let response = OutgoingResponse {
127						result: Ok(response),
128						reputation_changes: Vec::new(),
129						sent_feedback: None,
130					};
131
132					match pending_response.send(response) {
133						Ok(()) => {
134							trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",)
135						},
136						Err(_) => debug!(
137							target: LOG_TARGET,
138							"Failed to handle light client request from {peer}: {}",
139							BitswapError::SendResponse,
140						),
141					}
142				},
143				Err(err) => {
144					error!(target: LOG_TARGET, "Failed to process request from {peer}: {err}");
145
146					// TODO: adjust reputation?
147
148					let response = OutgoingResponse {
149						result: Err(()),
150						reputation_changes: vec![],
151						sent_feedback: None,
152					};
153
154					if pending_response.send(response).is_err() {
155						debug!(
156							target: LOG_TARGET,
157							"Failed to handle bitswap request from {peer}: {}",
158							BitswapError::SendResponse,
159						);
160					}
161				},
162			}
163		}
164	}
165
166	/// Handle received Bitswap request
167	fn handle_message(
168		&mut self,
169		peer: &PeerId,
170		payload: &Vec<u8>,
171	) -> Result<Vec<u8>, BitswapError> {
172		let request = schema::bitswap::Message::decode(&payload[..])?;
173
174		trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
175
176		let mut response = BitswapMessage::default();
177
178		let wantlist = match request.wantlist {
179			Some(wantlist) => wantlist,
180			None => {
181				debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer);
182				return Err(BitswapError::InvalidWantList)
183			},
184		};
185
186		if wantlist.entries.len() > MAX_WANTED_BLOCKS {
187			trace!(target: LOG_TARGET, "Ignored request: too many entries");
188			return Err(BitswapError::TooManyEntries)
189		}
190
191		for entry in wantlist.entries {
192			let cid = match cid::Cid::read_bytes(entry.block.as_slice()) {
193				Ok(cid) => cid,
194				Err(e) => {
195					trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e);
196					continue
197				},
198			};
199
200			if cid.version() != cid::Version::V1 ||
201				cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) ||
202				cid.hash().size() != 32
203			{
204				debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);
205				continue
206			}
207
208			let mut hash = B::Hash::default();
209			hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
210			let transaction = match self.client.indexed_transaction(hash) {
211				Ok(ex) => ex,
212				Err(e) => {
213					error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
214					None
215				},
216			};
217
218			match transaction {
219				Some(transaction) => {
220					trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
221
222					if entry.want_type == WantType::Block as i32 {
223						let prefix = Prefix {
224							version: cid.version(),
225							codec: cid.codec(),
226							mh_type: cid.hash().code(),
227							mh_len: cid.hash().size(),
228						};
229						response
230							.payload
231							.push(MessageBlock { prefix: prefix.to_bytes(), data: transaction });
232					} else {
233						response.block_presences.push(BlockPresence {
234							r#type: BlockPresenceType::Have as i32,
235							cid: cid.to_bytes(),
236						});
237					}
238				},
239				None => {
240					trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash);
241
242					if entry.send_dont_have {
243						response.block_presences.push(BlockPresence {
244							r#type: BlockPresenceType::DontHave as i32,
245							cid: cid.to_bytes(),
246						});
247					}
248				},
249			}
250		}
251
252		Ok(response.encode_to_vec())
253	}
254}
255
256/// Bitswap protocol error.
257#[derive(Debug, thiserror::Error)]
258pub enum BitswapError {
259	/// Protobuf decoding error.
260	#[error("Failed to decode request: {0}.")]
261	DecodeProto(#[from] prost::DecodeError),
262
263	/// Protobuf encoding error.
264	#[error("Failed to encode response: {0}.")]
265	EncodeProto(#[from] prost::EncodeError),
266
267	/// Client backend error.
268	#[error(transparent)]
269	Client(#[from] sp_blockchain::Error),
270
271	/// Error parsing CID
272	#[error(transparent)]
273	BadCid(#[from] cid::Error),
274
275	/// Packet read error.
276	#[error(transparent)]
277	Read(#[from] io::Error),
278
279	/// Error sending response.
280	#[error("Failed to send response.")]
281	SendResponse,
282
283	/// Message doesn't have a WANT list.
284	#[error("Invalid WANT list.")]
285	InvalidWantList,
286
287	/// Too many blocks requested.
288	#[error("Too many block entries in the request.")]
289	TooManyEntries,
290}
291
292#[cfg(test)]
293mod tests {
294	use super::*;
295	use futures::channel::oneshot;
296	use sc_block_builder::BlockBuilderBuilder;
297	use schema::bitswap::{
298		message::{wantlist::Entry, Wantlist},
299		Message as BitswapMessage,
300	};
301	use sp_consensus::BlockOrigin;
302	use sp_runtime::codec::Encode;
303	use substrate_test_runtime::ExtrinsicBuilder;
304	use substrate_test_runtime_client::{self, prelude::*, TestClientBuilder};
305
306	#[tokio::test]
307	async fn undecodable_message() {
308		let client = substrate_test_runtime_client::new();
309		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
310
311		tokio::spawn(async move { bitswap.run().await });
312
313		let (tx, rx) = oneshot::channel();
314		config
315			.inbound_queue
316			.unwrap()
317			.send(IncomingRequest {
318				peer: PeerId::random(),
319				payload: vec![0x13, 0x37, 0x13, 0x38],
320				pending_response: tx,
321			})
322			.await
323			.unwrap();
324
325		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
326			assert_eq!(result, Err(()));
327			assert_eq!(reputation_changes, Vec::new());
328			assert!(sent_feedback.is_none());
329		} else {
330			panic!("invalid event received");
331		}
332	}
333
334	#[tokio::test]
335	async fn empty_want_list() {
336		let client = substrate_test_runtime_client::new();
337		let (bitswap, mut config) = BitswapRequestHandler::new(Arc::new(client));
338
339		tokio::spawn(async move { bitswap.run().await });
340
341		let (tx, rx) = oneshot::channel();
342		config
343			.inbound_queue
344			.as_mut()
345			.unwrap()
346			.send(IncomingRequest {
347				peer: PeerId::random(),
348				payload: BitswapMessage { wantlist: None, ..Default::default() }.encode_to_vec(),
349				pending_response: tx,
350			})
351			.await
352			.unwrap();
353
354		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
355			assert_eq!(result, Err(()));
356			assert_eq!(reputation_changes, Vec::new());
357			assert!(sent_feedback.is_none());
358		} else {
359			panic!("invalid event received");
360		}
361
362		// Empty WANT list should not cause an error
363		let (tx, rx) = oneshot::channel();
364		config
365			.inbound_queue
366			.unwrap()
367			.send(IncomingRequest {
368				peer: PeerId::random(),
369				payload: BitswapMessage {
370					wantlist: Some(Default::default()),
371					..Default::default()
372				}
373				.encode_to_vec(),
374				pending_response: tx,
375			})
376			.await
377			.unwrap();
378
379		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
380			assert_eq!(result, Ok(BitswapMessage::default().encode_to_vec()));
381			assert_eq!(reputation_changes, Vec::new());
382			assert!(sent_feedback.is_none());
383		} else {
384			panic!("invalid event received");
385		}
386	}
387
388	#[tokio::test]
389	async fn too_long_want_list() {
390		let client = substrate_test_runtime_client::new();
391		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
392
393		tokio::spawn(async move { bitswap.run().await });
394
395		let (tx, rx) = oneshot::channel();
396		config
397			.inbound_queue
398			.unwrap()
399			.send(IncomingRequest {
400				peer: PeerId::random(),
401				payload: BitswapMessage {
402					wantlist: Some(Wantlist {
403						entries: (0..MAX_WANTED_BLOCKS + 1)
404							.map(|_| Entry::default())
405							.collect::<Vec<_>>(),
406						full: false,
407					}),
408					..Default::default()
409				}
410				.encode_to_vec(),
411				pending_response: tx,
412			})
413			.await
414			.unwrap();
415
416		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
417			assert_eq!(result, Err(()));
418			assert_eq!(reputation_changes, Vec::new());
419			assert!(sent_feedback.is_none());
420		} else {
421			panic!("invalid event received");
422		}
423	}
424
425	#[tokio::test]
426	async fn transaction_not_found() {
427		let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
428
429		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
430		tokio::spawn(async move { bitswap.run().await });
431
432		let (tx, rx) = oneshot::channel();
433		config
434			.inbound_queue
435			.unwrap()
436			.send(IncomingRequest {
437				peer: PeerId::random(),
438				payload: BitswapMessage {
439					wantlist: Some(Wantlist {
440						entries: vec![Entry {
441							block: cid::Cid::new_v1(
442								0x70,
443								cid::multihash::Multihash::wrap(
444									u64::from(cid::multihash::Code::Blake2b256),
445									&[0u8; 32],
446								)
447								.unwrap(),
448							)
449							.to_bytes(),
450							..Default::default()
451						}],
452						full: false,
453					}),
454					..Default::default()
455				}
456				.encode_to_vec(),
457				pending_response: tx,
458			})
459			.await
460			.unwrap();
461
462		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
463			assert_eq!(result, Ok(vec![]));
464			assert_eq!(reputation_changes, Vec::new());
465			assert!(sent_feedback.is_none());
466		} else {
467			panic!("invalid event received");
468		}
469	}
470
471	#[tokio::test]
472	async fn transaction_found() {
473		let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
474		let mut block_builder = BlockBuilderBuilder::new(&client)
475			.on_parent_block(client.chain_info().genesis_hash)
476			.with_parent_block_number(0)
477			.build()
478			.unwrap();
479
480		// encoded extrinsic: [161, .. , 2, 6, 16, 19, 55, 19, 56]
481		let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
482		let pattern_index = ext.encoded_size() - 4;
483
484		block_builder.push(ext.clone()).unwrap();
485		let block = block_builder.build().unwrap().block;
486
487		client.import(BlockOrigin::File, block).await.unwrap();
488
489		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
490
491		tokio::spawn(async move { bitswap.run().await });
492
493		let (tx, rx) = oneshot::channel();
494		config
495			.inbound_queue
496			.unwrap()
497			.send(IncomingRequest {
498				peer: PeerId::random(),
499				payload: BitswapMessage {
500					wantlist: Some(Wantlist {
501						entries: vec![Entry {
502							block: cid::Cid::new_v1(
503								0x70,
504								cid::multihash::Multihash::wrap(
505									u64::from(cid::multihash::Code::Blake2b256),
506									&sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
507								)
508								.unwrap(),
509							)
510							.to_bytes(),
511							..Default::default()
512						}],
513						full: false,
514					}),
515					..Default::default()
516				}
517				.encode_to_vec(),
518				pending_response: tx,
519			})
520			.await
521			.unwrap();
522
523		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
524			assert_eq!(reputation_changes, Vec::new());
525			assert!(sent_feedback.is_none());
526
527			let response =
528				schema::bitswap::Message::decode(&result.expect("fetch to succeed")[..]).unwrap();
529			assert_eq!(response.payload[0].data, vec![0x13, 0x37, 0x13, 0x38]);
530		} else {
531			panic!("invalid event received");
532		}
533	}
534}