sc_network/bitswap/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3
4// Substrate is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Substrate is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Bitswap server for Substrate.
18//!
19//! Allows querying transactions by hash over standard bitswap protocol
20//! Only supports bitswap 1.2.0.
21//! CID is expected to reference 256-bit Blake2b transaction hash.
22
23use crate::{
24	request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
25	types::ProtocolName,
26	MAX_RESPONSE_SIZE,
27};
28
29use cid::{self, Version};
30use futures::StreamExt;
31use log::{debug, error, trace};
32use prost::Message;
33use sc_client_api::BlockBackend;
34use sc_network_types::PeerId;
35use schema::bitswap::{
36	message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
37	Message as BitswapMessage,
38};
39use sp_runtime::traits::Block as BlockT;
40use std::{io, sync::Arc, time::Duration};
41use unsigned_varint::encode as varint_encode;
42
43mod schema;
44
45const LOG_TARGET: &str = "bitswap";
46
47// Undocumented, but according to JS the bitswap messages have a max size of 512*1024 bytes
48// https://github.com/ipfs/js-ipfs-bitswap/blob/
49// d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16
50// We set it to the same value as max substrate protocol message
51const MAX_PACKET_SIZE: u64 = MAX_RESPONSE_SIZE;
52
53/// Max number of queued responses before denying requests.
54const MAX_REQUEST_QUEUE: usize = 20;
55
56/// Max number of blocks per wantlist
57const MAX_WANTED_BLOCKS: usize = 16;
58
59/// Bitswap protocol name
60const PROTOCOL_NAME: &'static str = "/ipfs/bitswap/1.2.0";
61
62/// Prefix represents all metadata of a CID, without the actual content.
63#[derive(PartialEq, Eq, Clone, Debug)]
64struct Prefix {
65	/// The version of CID.
66	pub version: Version,
67	/// The codec of CID.
68	pub codec: u64,
69	/// The multihash type of CID.
70	pub mh_type: u64,
71	/// The multihash length of CID.
72	pub mh_len: u8,
73}
74
75impl Prefix {
76	/// Convert the prefix to encoded bytes.
77	pub fn to_bytes(&self) -> Vec<u8> {
78		let mut res = Vec::with_capacity(4);
79		let mut buf = varint_encode::u64_buffer();
80		let version = varint_encode::u64(self.version.into(), &mut buf);
81		res.extend_from_slice(version);
82		let mut buf = varint_encode::u64_buffer();
83		let codec = varint_encode::u64(self.codec, &mut buf);
84		res.extend_from_slice(codec);
85		let mut buf = varint_encode::u64_buffer();
86		let mh_type = varint_encode::u64(self.mh_type, &mut buf);
87		res.extend_from_slice(mh_type);
88		let mut buf = varint_encode::u64_buffer();
89		let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf);
90		res.extend_from_slice(mh_len);
91		res
92	}
93}
94
95/// Bitswap request handler
96pub struct BitswapRequestHandler<B> {
97	client: Arc<dyn BlockBackend<B> + Send + Sync>,
98	request_receiver: async_channel::Receiver<IncomingRequest>,
99}
100
101impl<B: BlockT> BitswapRequestHandler<B> {
102	/// Create a new [`BitswapRequestHandler`].
103	pub fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
104		let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);
105
106		let config = ProtocolConfig {
107			name: ProtocolName::from(PROTOCOL_NAME),
108			fallback_names: vec![],
109			max_request_size: MAX_PACKET_SIZE,
110			max_response_size: MAX_PACKET_SIZE,
111			request_timeout: Duration::from_secs(15),
112			inbound_queue: Some(tx),
113		};
114
115		(Self { client, request_receiver }, config)
116	}
117
118	/// Run [`BitswapRequestHandler`].
119	pub async fn run(mut self) {
120		while let Some(request) = self.request_receiver.next().await {
121			let IncomingRequest { peer, payload, pending_response } = request;
122
123			match self.handle_message(&peer, &payload) {
124				Ok(response) => {
125					let response = OutgoingResponse {
126						result: Ok(response),
127						reputation_changes: Vec::new(),
128						sent_feedback: None,
129					};
130
131					match pending_response.send(response) {
132						Ok(()) => {
133							trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",)
134						},
135						Err(_) => debug!(
136							target: LOG_TARGET,
137							"Failed to handle light client request from {peer}: {}",
138							BitswapError::SendResponse,
139						),
140					}
141				},
142				Err(err) => {
143					error!(target: LOG_TARGET, "Failed to process request from {peer}: {err}");
144
145					// TODO: adjust reputation?
146
147					let response = OutgoingResponse {
148						result: Err(()),
149						reputation_changes: vec![],
150						sent_feedback: None,
151					};
152
153					if pending_response.send(response).is_err() {
154						debug!(
155							target: LOG_TARGET,
156							"Failed to handle bitswap request from {peer}: {}",
157							BitswapError::SendResponse,
158						);
159					}
160				},
161			}
162		}
163	}
164
165	/// Handle received Bitswap request
166	fn handle_message(
167		&mut self,
168		peer: &PeerId,
169		payload: &Vec<u8>,
170	) -> Result<Vec<u8>, BitswapError> {
171		let request = schema::bitswap::Message::decode(&payload[..])?;
172
173		trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
174
175		let mut response = BitswapMessage::default();
176
177		let wantlist = match request.wantlist {
178			Some(wantlist) => wantlist,
179			None => {
180				debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer);
181				return Err(BitswapError::InvalidWantList)
182			},
183		};
184
185		if wantlist.entries.len() > MAX_WANTED_BLOCKS {
186			trace!(target: LOG_TARGET, "Ignored request: too many entries");
187			return Err(BitswapError::TooManyEntries)
188		}
189
190		for entry in wantlist.entries {
191			let cid = match cid::Cid::read_bytes(entry.block.as_slice()) {
192				Ok(cid) => cid,
193				Err(e) => {
194					trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e);
195					continue
196				},
197			};
198
199			if cid.version() != cid::Version::V1 ||
200				cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) ||
201				cid.hash().size() != 32
202			{
203				debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);
204				continue
205			}
206
207			let mut hash = B::Hash::default();
208			hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
209			let transaction = match self.client.indexed_transaction(hash) {
210				Ok(ex) => ex,
211				Err(e) => {
212					error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
213					None
214				},
215			};
216
217			match transaction {
218				Some(transaction) => {
219					trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
220
221					if entry.want_type == WantType::Block as i32 {
222						let prefix = Prefix {
223							version: cid.version(),
224							codec: cid.codec(),
225							mh_type: cid.hash().code(),
226							mh_len: cid.hash().size(),
227						};
228						response
229							.payload
230							.push(MessageBlock { prefix: prefix.to_bytes(), data: transaction });
231					} else {
232						response.block_presences.push(BlockPresence {
233							r#type: BlockPresenceType::Have as i32,
234							cid: cid.to_bytes(),
235						});
236					}
237				},
238				None => {
239					trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash);
240
241					if entry.send_dont_have {
242						response.block_presences.push(BlockPresence {
243							r#type: BlockPresenceType::DontHave as i32,
244							cid: cid.to_bytes(),
245						});
246					}
247				},
248			}
249		}
250
251		Ok(response.encode_to_vec())
252	}
253}
254
255/// Bitswap protocol error.
256#[derive(Debug, thiserror::Error)]
257pub enum BitswapError {
258	/// Protobuf decoding error.
259	#[error("Failed to decode request: {0}.")]
260	DecodeProto(#[from] prost::DecodeError),
261
262	/// Protobuf encoding error.
263	#[error("Failed to encode response: {0}.")]
264	EncodeProto(#[from] prost::EncodeError),
265
266	/// Client backend error.
267	#[error(transparent)]
268	Client(#[from] sp_blockchain::Error),
269
270	/// Error parsing CID
271	#[error(transparent)]
272	BadCid(#[from] cid::Error),
273
274	/// Packet read error.
275	#[error(transparent)]
276	Read(#[from] io::Error),
277
278	/// Error sending response.
279	#[error("Failed to send response.")]
280	SendResponse,
281
282	/// Message doesn't have a WANT list.
283	#[error("Invalid WANT list.")]
284	InvalidWantList,
285
286	/// Too many blocks requested.
287	#[error("Too many block entries in the request.")]
288	TooManyEntries,
289}
290
291#[cfg(test)]
292mod tests {
293	use super::*;
294	use futures::channel::oneshot;
295	use sc_block_builder::BlockBuilderBuilder;
296	use schema::bitswap::{
297		message::{wantlist::Entry, Wantlist},
298		Message as BitswapMessage,
299	};
300	use sp_consensus::BlockOrigin;
301	use sp_runtime::codec::Encode;
302	use substrate_test_runtime::ExtrinsicBuilder;
303	use substrate_test_runtime_client::{self, prelude::*, TestClientBuilder};
304
305	#[tokio::test]
306	async fn undecodable_message() {
307		let client = substrate_test_runtime_client::new();
308		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
309
310		tokio::spawn(async move { bitswap.run().await });
311
312		let (tx, rx) = oneshot::channel();
313		config
314			.inbound_queue
315			.unwrap()
316			.send(IncomingRequest {
317				peer: PeerId::random(),
318				payload: vec![0x13, 0x37, 0x13, 0x38],
319				pending_response: tx,
320			})
321			.await
322			.unwrap();
323
324		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
325			assert_eq!(result, Err(()));
326			assert_eq!(reputation_changes, Vec::new());
327			assert!(sent_feedback.is_none());
328		} else {
329			panic!("invalid event received");
330		}
331	}
332
333	#[tokio::test]
334	async fn empty_want_list() {
335		let client = substrate_test_runtime_client::new();
336		let (bitswap, mut config) = BitswapRequestHandler::new(Arc::new(client));
337
338		tokio::spawn(async move { bitswap.run().await });
339
340		let (tx, rx) = oneshot::channel();
341		config
342			.inbound_queue
343			.as_mut()
344			.unwrap()
345			.send(IncomingRequest {
346				peer: PeerId::random(),
347				payload: BitswapMessage { wantlist: None, ..Default::default() }.encode_to_vec(),
348				pending_response: tx,
349			})
350			.await
351			.unwrap();
352
353		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
354			assert_eq!(result, Err(()));
355			assert_eq!(reputation_changes, Vec::new());
356			assert!(sent_feedback.is_none());
357		} else {
358			panic!("invalid event received");
359		}
360
361		// Empty WANT list should not cause an error
362		let (tx, rx) = oneshot::channel();
363		config
364			.inbound_queue
365			.unwrap()
366			.send(IncomingRequest {
367				peer: PeerId::random(),
368				payload: BitswapMessage {
369					wantlist: Some(Default::default()),
370					..Default::default()
371				}
372				.encode_to_vec(),
373				pending_response: tx,
374			})
375			.await
376			.unwrap();
377
378		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
379			assert_eq!(result, Ok(BitswapMessage::default().encode_to_vec()));
380			assert_eq!(reputation_changes, Vec::new());
381			assert!(sent_feedback.is_none());
382		} else {
383			panic!("invalid event received");
384		}
385	}
386
387	#[tokio::test]
388	async fn too_long_want_list() {
389		let client = substrate_test_runtime_client::new();
390		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
391
392		tokio::spawn(async move { bitswap.run().await });
393
394		let (tx, rx) = oneshot::channel();
395		config
396			.inbound_queue
397			.unwrap()
398			.send(IncomingRequest {
399				peer: PeerId::random(),
400				payload: BitswapMessage {
401					wantlist: Some(Wantlist {
402						entries: (0..MAX_WANTED_BLOCKS + 1)
403							.map(|_| Entry::default())
404							.collect::<Vec<_>>(),
405						full: false,
406					}),
407					..Default::default()
408				}
409				.encode_to_vec(),
410				pending_response: tx,
411			})
412			.await
413			.unwrap();
414
415		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
416			assert_eq!(result, Err(()));
417			assert_eq!(reputation_changes, Vec::new());
418			assert!(sent_feedback.is_none());
419		} else {
420			panic!("invalid event received");
421		}
422	}
423
424	#[tokio::test]
425	async fn transaction_not_found() {
426		let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
427
428		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
429		tokio::spawn(async move { bitswap.run().await });
430
431		let (tx, rx) = oneshot::channel();
432		config
433			.inbound_queue
434			.unwrap()
435			.send(IncomingRequest {
436				peer: PeerId::random(),
437				payload: BitswapMessage {
438					wantlist: Some(Wantlist {
439						entries: vec![Entry {
440							block: cid::Cid::new_v1(
441								0x70,
442								cid::multihash::Multihash::wrap(
443									u64::from(cid::multihash::Code::Blake2b256),
444									&[0u8; 32],
445								)
446								.unwrap(),
447							)
448							.to_bytes(),
449							..Default::default()
450						}],
451						full: false,
452					}),
453					..Default::default()
454				}
455				.encode_to_vec(),
456				pending_response: tx,
457			})
458			.await
459			.unwrap();
460
461		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
462			assert_eq!(result, Ok(vec![]));
463			assert_eq!(reputation_changes, Vec::new());
464			assert!(sent_feedback.is_none());
465		} else {
466			panic!("invalid event received");
467		}
468	}
469
470	#[tokio::test]
471	async fn transaction_found() {
472		let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
473		let mut block_builder = BlockBuilderBuilder::new(&client)
474			.on_parent_block(client.chain_info().genesis_hash)
475			.with_parent_block_number(0)
476			.build()
477			.unwrap();
478
479		// encoded extrinsic: [161, .. , 2, 6, 16, 19, 55, 19, 56]
480		let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
481		let pattern_index = ext.encoded_size() - 4;
482
483		block_builder.push(ext.clone()).unwrap();
484		let block = block_builder.build().unwrap().block;
485
486		client.import(BlockOrigin::File, block).await.unwrap();
487
488		let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
489
490		tokio::spawn(async move { bitswap.run().await });
491
492		let (tx, rx) = oneshot::channel();
493		config
494			.inbound_queue
495			.unwrap()
496			.send(IncomingRequest {
497				peer: PeerId::random(),
498				payload: BitswapMessage {
499					wantlist: Some(Wantlist {
500						entries: vec![Entry {
501							block: cid::Cid::new_v1(
502								0x70,
503								cid::multihash::Multihash::wrap(
504									u64::from(cid::multihash::Code::Blake2b256),
505									&sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
506								)
507								.unwrap(),
508							)
509							.to_bytes(),
510							..Default::default()
511						}],
512						full: false,
513					}),
514					..Default::default()
515				}
516				.encode_to_vec(),
517				pending_response: tx,
518			})
519			.await
520			.unwrap();
521
522		if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
523			assert_eq!(reputation_changes, Vec::new());
524			assert!(sent_feedback.is_none());
525
526			let response =
527				schema::bitswap::Message::decode(&result.expect("fetch to succeed")[..]).unwrap();
528			assert_eq!(response.payload[0].data, vec![0x13, 0x37, 0x13, 0x38]);
529		} else {
530			panic!("invalid event received");
531		}
532	}
533}