1use crate::{
24 request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
25 types::ProtocolName,
26 MAX_RESPONSE_SIZE,
27};
28
29use cid::{Error as CidError, Version as CidVersion};
30use futures::StreamExt;
31use log::{debug, error, trace};
32use prost::Message;
33use sc_client_api::BlockBackend;
34use sc_network_types::PeerId;
35use schema::bitswap::{
36 message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
37 Message as BitswapMessage,
38};
39use sp_core::H256;
40use sp_runtime::traits::Block as BlockT;
41use std::{io, sync::Arc, time::Duration};
42use unsigned_varint::encode as varint_encode;
43
44mod client;
46pub(crate) mod schema;
47
48pub use cid::Cid;
49pub use client::{
50 request_bitswap_blocks, request_bitswap_blocks_unverified, BitswapError, FetchOutcome,
51 BLAKE2B_256_MULTIHASH_CODE, KECCAK_256_MULTIHASH_CODE, SHA2_256_MULTIHASH_CODE,
52};
53
54pub(crate) use schema::bitswap::Message as BitswapProtoMessage;
55
56pub(crate) const LOG_TARGET: &str = "sub-libp2p::bitswap";
57
58const MAX_PACKET_SIZE: u64 = MAX_RESPONSE_SIZE;
60
61const MAX_REQUEST_QUEUE: usize = 20;
63
64pub const MAX_WANTED_BLOCKS: usize = 16;
66
67pub(crate) const PROTOCOL_NAME: &str = "/ipfs/bitswap/1.2.0";
69
70pub const RAW_CODEC: u64 = 0x55;
72
73pub fn is_cid_supported(cid: &Cid) -> bool {
76 cid.version() != CidVersion::V0 &&
77 cid.hash().size() == 32 &&
78 is_supported_multihash_code(cid.hash().code())
79}
80
81pub(crate) fn is_supported_multihash_code(code: u64) -> bool {
83 matches!(code, BLAKE2B_256_MULTIHASH_CODE | SHA2_256_MULTIHASH_CODE | KECCAK_256_MULTIHASH_CODE)
84}
85
86#[derive(PartialEq, Eq, Clone, Debug)]
88pub(crate) struct Prefix {
89 pub version: CidVersion,
91 pub codec: u64,
93 pub mh_type: u64,
95 pub mh_len: u8,
97}
98
99impl From<&Cid> for Prefix {
100 fn from(cid: &Cid) -> Self {
101 Self {
102 version: cid.version(),
103 codec: cid.codec(),
104 mh_type: cid.hash().code(),
105 mh_len: cid.hash().size(),
106 }
107 }
108}
109
110impl Prefix {
111 pub(crate) fn to_bytes(&self) -> Vec<u8> {
113 let mut res = Vec::with_capacity(4);
114 let mut buf = varint_encode::u64_buffer();
115 let version = varint_encode::u64(self.version.into(), &mut buf);
116 res.extend_from_slice(version);
117 let mut buf = varint_encode::u64_buffer();
118 let codec = varint_encode::u64(self.codec, &mut buf);
119 res.extend_from_slice(codec);
120 let mut buf = varint_encode::u64_buffer();
121 let mh_type = varint_encode::u64(self.mh_type, &mut buf);
122 res.extend_from_slice(mh_type);
123 let mut buf = varint_encode::u64_buffer();
124 let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf);
125 res.extend_from_slice(mh_len);
126 res
127 }
128}
129
130pub(crate) struct BitswapRequestHandler<B> {
132 client: Arc<dyn BlockBackend<B> + Send + Sync>,
133 request_receiver: async_channel::Receiver<IncomingRequest>,
134}
135
136impl<B: BlockT> BitswapRequestHandler<B> {
137 pub(crate) fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
139 let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);
140
141 let config = ProtocolConfig {
142 name: ProtocolName::from(PROTOCOL_NAME),
143 fallback_names: vec![],
144 max_request_size: MAX_PACKET_SIZE,
145 max_response_size: MAX_PACKET_SIZE,
146 request_timeout: Duration::from_secs(15),
147 inbound_queue: Some(tx),
148 };
149
150 (Self { client, request_receiver }, config)
151 }
152
153 pub(crate) async fn run(mut self) {
155 while let Some(request) = self.request_receiver.next().await {
156 let IncomingRequest { peer, payload, pending_response } = request;
157
158 match self.handle_message(&peer, &payload) {
159 Ok(response) => {
160 let response = OutgoingResponse {
161 result: Ok(response),
162 reputation_changes: Vec::new(),
163 sent_feedback: None,
164 };
165
166 match pending_response.send(response) {
167 Ok(()) => {
168 trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",)
169 },
170 Err(_) => debug!(
171 target: LOG_TARGET,
172 "Failed to handle bitswap request from {peer}: {}",
173 RequestHandlerError::SendResponse,
174 ),
175 }
176 },
177 Err(err) => {
178 error!(target: LOG_TARGET, "Failed to process request from {peer}: {err}");
179
180 let response = OutgoingResponse {
183 result: Err(()),
184 reputation_changes: vec![],
185 sent_feedback: None,
186 };
187
188 if pending_response.send(response).is_err() {
189 debug!(
190 target: LOG_TARGET,
191 "Failed to handle bitswap request from {peer}: {}",
192 RequestHandlerError::SendResponse,
193 );
194 }
195 },
196 }
197 }
198 }
199
200 fn handle_message(
202 &mut self,
203 peer: &PeerId,
204 payload: &[u8],
205 ) -> Result<Vec<u8>, RequestHandlerError> {
206 let request = schema::bitswap::Message::decode(payload)?;
207
208 trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
209
210 let mut response = BitswapMessage::default();
211
212 let wantlist = match request.wantlist {
213 Some(wantlist) => wantlist,
214 None => {
215 debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer);
216 return Err(RequestHandlerError::InvalidWantList);
217 },
218 };
219
220 if wantlist.entries.len() > MAX_WANTED_BLOCKS {
221 trace!(target: LOG_TARGET, "Ignored request: too many entries");
222 return Err(RequestHandlerError::TooManyEntries);
223 }
224
225 for entry in wantlist.entries {
226 let cid = match Cid::read_bytes(entry.block.as_slice()) {
227 Ok(cid) => cid,
228 Err(e) => {
229 trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e);
230 continue;
231 },
232 };
233
234 if !is_cid_supported(&cid) {
235 trace!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);
236 continue;
237 }
238
239 let mut hash = H256::default();
240 hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
241 let transaction = match self.client.indexed_transaction(hash) {
242 Ok(ex) => ex,
243 Err(e) => {
244 error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
245 None
246 },
247 };
248
249 match transaction {
250 Some(transaction) => {
251 trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
252
253 if entry.want_type == WantType::Block as i32 {
254 let prefix: Prefix = (&cid).into();
255 response
256 .payload
257 .push(MessageBlock { prefix: prefix.to_bytes(), data: transaction });
258 } else {
259 response.block_presences.push(BlockPresence {
260 r#type: BlockPresenceType::Have as i32,
261 cid: cid.to_bytes(),
262 });
263 }
264 },
265 None => {
266 trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash);
267
268 if entry.send_dont_have {
269 response.block_presences.push(BlockPresence {
270 r#type: BlockPresenceType::DontHave as i32,
271 cid: cid.to_bytes(),
272 });
273 }
274 },
275 }
276 }
277
278 Ok(response.encode_to_vec())
279 }
280}
281
282#[derive(Debug, thiserror::Error)]
284enum RequestHandlerError {
285 #[error("Failed to decode request: {0}.")]
287 DecodeProto(#[from] prost::DecodeError),
288
289 #[error("Failed to encode response: {0}.")]
291 EncodeProto(#[from] prost::EncodeError),
292
293 #[error(transparent)]
295 Client(#[from] sp_blockchain::Error),
296
297 #[error(transparent)]
299 BadCid(#[from] CidError),
300
301 #[error(transparent)]
303 Read(#[from] io::Error),
304
305 #[error("Failed to send response.")]
307 SendResponse,
308
309 #[error("Invalid WANT list.")]
311 InvalidWantList,
312
313 #[error("Too many block entries in the request.")]
315 TooManyEntries,
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use futures::channel::oneshot;
322 use litep2p::types::multihash::Code as LiteP2pCode;
323 use sc_block_builder::BlockBuilderBuilder;
324 use schema::bitswap::{
325 message::{wantlist::Entry, Wantlist},
326 Message as BitswapMessage,
327 };
328 use sp_consensus::BlockOrigin;
329 use sp_runtime::codec::Encode;
330 use substrate_test_runtime::ExtrinsicBuilder;
331 use substrate_test_runtime_client::{self, prelude::*, TestClientBuilder};
332
333 #[tokio::test]
334 async fn undecodable_message() {
335 let client = substrate_test_runtime_client::new();
336 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
337
338 tokio::spawn(async move { bitswap.run().await });
339
340 let (tx, rx) = oneshot::channel();
341 config
342 .inbound_queue
343 .unwrap()
344 .send(IncomingRequest {
345 peer: PeerId::random(),
346 payload: vec![0x13, 0x37, 0x13, 0x38],
347 pending_response: tx,
348 })
349 .await
350 .unwrap();
351
352 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
353 assert_eq!(result, Err(()));
354 assert_eq!(reputation_changes, Vec::new());
355 assert!(sent_feedback.is_none());
356 } else {
357 panic!("invalid event received");
358 }
359 }
360
361 #[tokio::test]
362 async fn empty_want_list() {
363 let client = substrate_test_runtime_client::new();
364 let (bitswap, mut config) = BitswapRequestHandler::new(Arc::new(client));
365
366 tokio::spawn(async move { bitswap.run().await });
367
368 let (tx, rx) = oneshot::channel();
369 config
370 .inbound_queue
371 .as_mut()
372 .unwrap()
373 .send(IncomingRequest {
374 peer: PeerId::random(),
375 payload: BitswapMessage { wantlist: None, ..Default::default() }.encode_to_vec(),
376 pending_response: tx,
377 })
378 .await
379 .unwrap();
380
381 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
382 assert_eq!(result, Err(()));
383 assert_eq!(reputation_changes, Vec::new());
384 assert!(sent_feedback.is_none());
385 } else {
386 panic!("invalid event received");
387 }
388
389 let (tx, rx) = oneshot::channel();
391 config
392 .inbound_queue
393 .unwrap()
394 .send(IncomingRequest {
395 peer: PeerId::random(),
396 payload: BitswapMessage {
397 wantlist: Some(Default::default()),
398 ..Default::default()
399 }
400 .encode_to_vec(),
401 pending_response: tx,
402 })
403 .await
404 .unwrap();
405
406 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
407 assert_eq!(result, Ok(BitswapMessage::default().encode_to_vec()));
408 assert_eq!(reputation_changes, Vec::new());
409 assert!(sent_feedback.is_none());
410 } else {
411 panic!("invalid event received");
412 }
413 }
414
415 #[tokio::test]
416 async fn too_long_want_list() {
417 let client = substrate_test_runtime_client::new();
418 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
419
420 tokio::spawn(async move { bitswap.run().await });
421
422 let (tx, rx) = oneshot::channel();
423 config
424 .inbound_queue
425 .unwrap()
426 .send(IncomingRequest {
427 peer: PeerId::random(),
428 payload: BitswapMessage {
429 wantlist: Some(Wantlist {
430 entries: (0..MAX_WANTED_BLOCKS + 1)
431 .map(|_| Entry::default())
432 .collect::<Vec<_>>(),
433 full: false,
434 }),
435 ..Default::default()
436 }
437 .encode_to_vec(),
438 pending_response: tx,
439 })
440 .await
441 .unwrap();
442
443 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
444 assert_eq!(result, Err(()));
445 assert_eq!(reputation_changes, Vec::new());
446 assert!(sent_feedback.is_none());
447 } else {
448 panic!("invalid event received");
449 }
450 }
451
452 #[tokio::test]
453 async fn transaction_not_found() {
454 let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
455
456 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
457 tokio::spawn(async move { bitswap.run().await });
458
459 let (tx, rx) = oneshot::channel();
460 config
461 .inbound_queue
462 .unwrap()
463 .send(IncomingRequest {
464 peer: PeerId::random(),
465 payload: BitswapMessage {
466 wantlist: Some(Wantlist {
467 entries: vec![Entry {
468 block: cid::Cid::new_v1(
469 0x70,
470 cid::multihash::Multihash::wrap(
471 u64::from(LiteP2pCode::Blake2b256),
472 &[0u8; 32],
473 )
474 .unwrap(),
475 )
476 .to_bytes(),
477 ..Default::default()
478 }],
479 full: false,
480 }),
481 ..Default::default()
482 }
483 .encode_to_vec(),
484 pending_response: tx,
485 })
486 .await
487 .unwrap();
488
489 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
490 assert_eq!(result, Ok(vec![]));
491 assert_eq!(reputation_changes, Vec::new());
492 assert!(sent_feedback.is_none());
493 } else {
494 panic!("invalid event received");
495 }
496 }
497
498 #[tokio::test]
499 async fn transaction_found() {
500 let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
501 let mut block_builder = BlockBuilderBuilder::new(&client)
502 .on_parent_block(client.chain_info().genesis_hash)
503 .with_parent_block_number(0)
504 .build()
505 .unwrap();
506
507 let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
509 let pattern_index = ext.encoded_size() - 4;
510
511 block_builder.push(ext.clone()).unwrap();
512 let block = block_builder.build().unwrap().block;
513
514 client.import(BlockOrigin::File, block).await.unwrap();
515
516 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
517
518 tokio::spawn(async move { bitswap.run().await });
519
520 let (tx, rx) = oneshot::channel();
521 config
522 .inbound_queue
523 .unwrap()
524 .send(IncomingRequest {
525 peer: PeerId::random(),
526 payload: BitswapMessage {
527 wantlist: Some(Wantlist {
528 entries: vec![Entry {
529 block: cid::Cid::new_v1(
530 0x70,
531 cid::multihash::Multihash::wrap(
532 u64::from(LiteP2pCode::Blake2b256),
533 &sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
534 )
535 .unwrap(),
536 )
537 .to_bytes(),
538 ..Default::default()
539 }],
540 full: false,
541 }),
542 ..Default::default()
543 }
544 .encode_to_vec(),
545 pending_response: tx,
546 })
547 .await
548 .unwrap();
549
550 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
551 assert_eq!(reputation_changes, Vec::new());
552 assert!(sent_feedback.is_none());
553
554 let response =
555 schema::bitswap::Message::decode(&result.expect("fetch to succeed")[..]).unwrap();
556 assert_eq!(response.payload[0].data, vec![0x13, 0x37, 0x13, 0x38]);
557 } else {
558 panic!("invalid event received");
559 }
560 }
561
562 #[tokio::test]
563 async fn transaction_not_found_sends_dont_have_when_requested() {
564 let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
565 let (mut bitswap, _config) = BitswapRequestHandler::new(Arc::new(client));
566 let cid = cid::Cid::new_v1(
567 0x70,
568 cid::multihash::Multihash::wrap(u64::from(LiteP2pCode::Blake2b256), &[0u8; 32])
569 .unwrap(),
570 );
571 let request = BitswapMessage {
572 wantlist: Some(Wantlist {
573 entries: vec![Entry {
574 block: cid.to_bytes(),
575 send_dont_have: true,
576 ..Default::default()
577 }],
578 full: false,
579 }),
580 ..Default::default()
581 }
582 .encode_to_vec();
583
584 let response = BitswapMessage::decode(
585 bitswap.handle_message(&PeerId::random(), &request).unwrap().as_slice(),
586 )
587 .unwrap();
588
589 assert!(response.payload.is_empty());
590 assert_eq!(response.block_presences.len(), 1);
591 assert_eq!(response.block_presences[0].cid, cid.to_bytes());
592 assert_eq!(response.block_presences[0].r#type, BlockPresenceType::DontHave as i32);
593 }
594
595 #[tokio::test]
596 async fn transaction_found_sends_have_for_want_have() {
597 let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
598 let mut block_builder = BlockBuilderBuilder::new(&client)
599 .on_parent_block(client.chain_info().genesis_hash)
600 .with_parent_block_number(0)
601 .build()
602 .unwrap();
603
604 let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
605 let pattern_index = ext.encoded_size() - 4;
606 let cid = cid::Cid::new_v1(
607 0x70,
608 cid::multihash::Multihash::wrap(
609 u64::from(LiteP2pCode::Blake2b256),
610 &sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
611 )
612 .unwrap(),
613 );
614
615 block_builder.push(ext).unwrap();
616 let block = block_builder.build().unwrap().block;
617 client.import(BlockOrigin::File, block).await.unwrap();
618
619 let (mut bitswap, _config) = BitswapRequestHandler::new(Arc::new(client));
620 let request = BitswapMessage {
621 wantlist: Some(Wantlist {
622 entries: vec![Entry {
623 block: cid.to_bytes(),
624 want_type: WantType::Have as i32,
625 ..Default::default()
626 }],
627 full: false,
628 }),
629 ..Default::default()
630 }
631 .encode_to_vec();
632
633 let response = BitswapMessage::decode(
634 bitswap.handle_message(&PeerId::random(), &request).unwrap().as_slice(),
635 )
636 .unwrap();
637
638 assert!(response.payload.is_empty());
639 assert_eq!(response.block_presences.len(), 1);
640 assert_eq!(response.block_presences[0].cid, cid.to_bytes());
641 assert_eq!(response.block_presences[0].r#type, BlockPresenceType::Have as i32);
642 }
643
644 #[test]
645 fn is_cid_supported_accepts_all_three_supported_hashings() {
646 use cid::multihash::Multihash;
647 for multihash_code in
648 [BLAKE2B_256_MULTIHASH_CODE, SHA2_256_MULTIHASH_CODE, KECCAK_256_MULTIHASH_CODE]
649 {
650 let digest = [9u8; 32];
651 let mh = Multihash::<64>::wrap(multihash_code, &digest).unwrap();
652 let cid = Cid::new_v1(RAW_CODEC, mh);
653 assert!(is_cid_supported(&cid), "{multihash_code} CID should be supported");
654 }
655 }
656
657 #[test]
658 fn is_cid_supported_rejects_unknown_multihash_code() {
659 use cid::multihash::Multihash;
660 let digest = [9u8; 32];
661 let mh = Multihash::<64>::wrap(0x99, &digest).unwrap();
662 let cid = Cid::new_v1(RAW_CODEC, mh);
663 assert!(!is_cid_supported(&cid));
664 }
665}