1use crate::{IfDisconnected, NetworkRequest, ProtocolName};
19
20use cid::{multihash::Multihash as CidMultihash, Cid, Version as CidVersion};
21use log::{debug, trace, warn};
22use prost::Message;
23use sc_network_types::PeerId;
24use std::collections::{HashMap, HashSet};
25
26use super::{
27 is_cid_supported,
28 schema::bitswap::{
29 message::{
30 wantlist::{Entry, WantType as ProtoWantType},
31 BlockPresence, BlockPresenceType, Wantlist,
32 },
33 Message as BitswapMessage,
34 },
35 Prefix, LOG_TARGET, MAX_WANTED_BLOCKS, PROTOCOL_NAME,
36};
37
38pub const BLAKE2B_256_MULTIHASH_CODE: u64 = 0xb220;
41pub const SHA2_256_MULTIHASH_CODE: u64 = 0x12;
43pub const KECCAK_256_MULTIHASH_CODE: u64 = 0x1b;
45
46#[derive(Debug)]
53pub enum FetchOutcome {
54 Block(Vec<u8>),
56 Missing,
62}
63
64type Multihash = CidMultihash<64>;
66
67fn validate_wantlist_size(len: usize) -> Result<(), BitswapError> {
69 if len == 0 {
70 return Err(BitswapError::DecodeError("empty wantlist".into()));
71 }
72 if len > MAX_WANTED_BLOCKS {
73 return Err(BitswapError::DecodeError(format!(
74 "wantlist too large: {len} > {MAX_WANTED_BLOCKS}",
75 )));
76 }
77 Ok(())
78}
79
80fn validate_cids(cids: &[Cid]) -> Result<(), BitswapError> {
82 validate_wantlist_size(cids.len())?;
83
84 let mut seen: HashSet<Cid> = HashSet::with_capacity(cids.len());
85 for cid in cids {
86 if !is_cid_supported(cid) {
87 return Err(BitswapError::UnsupportedHashing { multihash_code: cid.hash().code() });
88 }
89 if !seen.insert(*cid) {
90 return Err(BitswapError::DecodeError(format!("duplicate CID in wantlist: {cid}")));
91 }
92 }
93
94 Ok(())
95}
96
97pub async fn request_bitswap_blocks<N>(
108 network: &N,
109 peer: PeerId,
110 cids: &[Cid],
111) -> Result<HashMap<Cid, FetchOutcome>, BitswapError>
112where
113 N: NetworkRequest + ?Sized,
114{
115 validate_cids(cids)?;
116
117 let wanted: HashSet<Cid> = cids.iter().copied().collect();
118 let response = send_request(network, peer, cids).await?;
119 Ok(classify_response(response, &wanted, peer))
120}
121
122pub async fn request_bitswap_blocks_unverified<N>(
131 network: &N,
132 peer: PeerId,
133 cids: &[Cid],
134) -> Result<HashMap<Cid, FetchOutcome>, BitswapError>
135where
136 N: NetworkRequest + ?Sized,
137{
138 validate_cids(cids)?;
139
140 let response = send_request(network, peer, cids).await?;
141 Ok(classify_response_unverified(response, cids, peer))
142}
143
144async fn send_request<N>(
146 network: &N,
147 peer: PeerId,
148 cids: &[Cid],
149) -> Result<BitswapMessage, BitswapError>
150where
151 N: NetworkRequest + ?Sized,
152{
153 let entries: Vec<Entry> = cids
154 .iter()
155 .copied()
156 .map(|cid| Entry {
157 block: cid.to_bytes(),
158 want_type: ProtoWantType::Block as i32,
159 send_dont_have: true,
160 ..Default::default()
161 })
162 .collect();
163 let request =
164 BitswapMessage { wantlist: Some(Wantlist { entries, full: false }), ..Default::default() };
165
166 trace!(
167 target: LOG_TARGET,
168 "client: sending Bitswap wantlist for {} CIDs to {peer}, protocol {PROTOCOL_NAME}",
169 cids.len(),
170 );
171
172 let payload = match network
173 .request(
174 peer,
175 ProtocolName::from(PROTOCOL_NAME),
176 request.encode_to_vec(),
177 None,
178 IfDisconnected::TryConnect,
179 )
180 .await
181 {
182 Ok((payload, _)) => payload,
183 Err(err) => {
184 debug!(target: LOG_TARGET, "client: batch request to {peer} rejected by network: {err:?}");
185 return Err(BitswapError::RequestFailed(err.to_string()));
186 },
187 };
188
189 BitswapMessage::decode(&payload[..]).map_err(|err| {
190 debug!(target: LOG_TARGET, "client: failed to decode batch response from {peer}: {err}");
191 BitswapError::DecodeError(err.to_string())
192 })
193}
194
195fn classify_response(
201 response: BitswapMessage,
202 wanted: &HashSet<Cid>,
203 peer: PeerId,
204) -> HashMap<Cid, FetchOutcome> {
205 let mut result: HashMap<Cid, FetchOutcome> = HashMap::with_capacity(wanted.len());
206
207 for block in response.payload {
208 let Ok(cid) = cid_from_block_prefix(&block.prefix, &block.data).inspect_err(|err| {
209 debug!(target: LOG_TARGET, "client: malformed block prefix from {peer}: {err:?}");
210 }) else {
211 continue;
212 };
213 if !wanted.contains(&cid) {
214 debug!(target: LOG_TARGET, "client: {peer} returned unsolicited block for CID {cid}");
215 continue;
216 }
217 debug!(target: LOG_TARGET, "client: {peer} returned {} bytes for CID {cid}", block.data.len());
218 result.insert(cid, FetchOutcome::Block(block.data));
219 }
220
221 log_presences(response.block_presences, wanted, peer);
222
223 for cid in wanted {
224 result.entry(*cid).or_insert(FetchOutcome::Missing);
225 }
226
227 result
228}
229
230fn classify_response_unverified(
236 response: BitswapMessage,
237 cids: &[Cid],
238 peer: PeerId,
239) -> HashMap<Cid, FetchOutcome> {
240 let mut result: HashMap<Cid, FetchOutcome> = HashMap::with_capacity(cids.len());
241 let wanted_set: HashSet<Cid> = cids.iter().copied().collect();
242 let mut dont_have_cids: HashSet<Cid> = HashSet::with_capacity(cids.len());
243
244 for presence in response.block_presences {
245 let Ok(cid) = Cid::read_bytes(presence.cid.as_slice()).inspect_err(|err| {
246 debug!(target: LOG_TARGET, "client: malformed presence CID from {peer}: {err}");
247 }) else {
248 continue;
249 };
250 if !wanted_set.contains(&cid) {
251 debug!(target: LOG_TARGET, "client: {peer} returned unsolicited presence for CID {cid}");
252 continue;
253 }
254 if presence.r#type == BlockPresenceType::DontHave as i32 {
255 debug!(target: LOG_TARGET, "client: {peer} DONT_HAVE for CID {cid}");
256 dont_have_cids.insert(cid);
257 } else if presence.r#type == BlockPresenceType::Have as i32 {
258 debug!(target: LOG_TARGET, "client: {peer} HAVE for CID {cid}");
259 } else {
260 warn!(
261 target: LOG_TARGET,
262 "client: {peer} unexpected presence type {} for CID {cid}",
263 presence.r#type,
264 );
265 }
266 }
267
268 let mut expected_payload_order =
272 cids.iter().copied().filter(|cid| !dont_have_cids.contains(cid));
273
274 for block in response.payload {
275 let Some(expected_cid) = expected_payload_order.next() else {
276 debug!(target: LOG_TARGET, "client: {peer} returned more payload blocks than expected; dropping extras");
277 break;
278 };
279 let Ok(prefix) = decode_prefix(&block.prefix).inspect_err(|err| {
280 debug!(target: LOG_TARGET, "client: malformed block prefix from {peer}: {err:?}");
281 }) else {
282 break;
283 };
284 if !prefix_matches_cid(&prefix, &expected_cid) {
285 debug!(
286 target: LOG_TARGET,
287 "client: {peer} returned block with prefix {:?} but expected CID {expected_cid}; \
288 stopping payload attribution",
289 prefix,
290 );
291 break;
292 }
293 debug!(
294 target: LOG_TARGET,
295 "client: {peer} returned {} unverified bytes for CID {expected_cid}",
296 block.data.len(),
297 );
298 result.insert(expected_cid, FetchOutcome::Block(block.data.clone()));
299 }
300
301 for cid in cids {
302 result.entry(*cid).or_insert(FetchOutcome::Missing);
303 }
304
305 result
306}
307
308fn log_presences(presences: Vec<BlockPresence>, wanted: &HashSet<Cid>, peer: PeerId) {
310 for presence in presences {
311 let Ok(cid) = Cid::read_bytes(presence.cid.as_slice()).inspect_err(|err| {
312 debug!(target: LOG_TARGET, "client: malformed presence CID from {peer}: {err}");
313 }) else {
314 continue;
315 };
316 if !wanted.contains(&cid) {
317 debug!(target: LOG_TARGET, "client: {peer} returned unsolicited presence for CID {cid}");
318 continue;
319 }
320 if presence.r#type == BlockPresenceType::DontHave as i32 {
321 debug!(target: LOG_TARGET, "client: {peer} DONT_HAVE for CID {cid}");
322 } else if presence.r#type == BlockPresenceType::Have as i32 {
323 debug!(target: LOG_TARGET, "client: {peer} HAVE for CID {cid}");
324 } else {
325 debug!(
326 target: LOG_TARGET,
327 "client: {peer} unexpected presence type {} for CID {cid}",
328 presence.r#type,
329 );
330 }
331 }
332}
333
334fn prefix_matches_cid(prefix: &Prefix, cid: &Cid) -> bool {
336 prefix.version == cid.version() &&
337 prefix.codec == cid.codec() &&
338 prefix.mh_type == cid.hash().code() &&
339 prefix.mh_len == cid.hash().size()
340}
341
342fn cid_from_block_prefix(prefix: &[u8], data: &[u8]) -> Result<Cid, BitswapError> {
344 let prefix = decode_prefix(prefix)?;
345 if prefix.version != CidVersion::V1 {
346 return Err(BitswapError::UnsupportedCidVersion { version: prefix.version.into() });
347 }
348
349 let hash = hash_for_multihash_code(prefix.mh_type, data)
350 .ok_or(BitswapError::UnsupportedHashing { multihash_code: prefix.mh_type })?;
351 let multihash = Multihash::wrap(prefix.mh_type, &hash)
352 .map_err(|err| BitswapError::DecodeError(err.to_string()))?;
353 Ok(Cid::new_v1(prefix.codec, multihash))
354}
355
356fn hash_for_multihash_code(multihash_code: u64, data: &[u8]) -> Option<[u8; 32]> {
358 match multihash_code {
359 BLAKE2B_256_MULTIHASH_CODE => Some(sp_crypto_hashing::blake2_256(data)),
360 SHA2_256_MULTIHASH_CODE => Some(sp_crypto_hashing::sha2_256(data)),
361 KECCAK_256_MULTIHASH_CODE => Some(sp_crypto_hashing::keccak_256(data)),
362 _ => None,
363 }
364}
365
366fn decode_prefix(mut bytes: &[u8]) -> Result<Prefix, BitswapError> {
368 let mut read_varint = || -> Result<u64, BitswapError> {
369 let (v, rest) = unsigned_varint::decode::u64(bytes)
370 .map_err(|err| BitswapError::DecodeError(err.to_string()))?;
371 bytes = rest;
372 Ok(v)
373 };
374
375 let version = read_varint()?;
376 let codec = read_varint()?;
377 let mh_type = read_varint()?;
378 let mh_len = read_varint()?;
379
380 if !bytes.is_empty() {
381 return Err(BitswapError::DecodeError("bitswap block prefix had trailing bytes".into()));
382 }
383
384 let version = CidVersion::try_from(version)
385 .map_err(|_| BitswapError::UnsupportedCidVersion { version })?;
386 let mh_len = u8::try_from(mh_len).map_err(|_| {
387 BitswapError::DecodeError(format!("multihash length {mh_len} does not fit into u8"))
388 })?;
389
390 Ok(Prefix { version, codec, mh_type, mh_len })
391}
392
393#[derive(Debug)]
395pub enum BitswapError {
396 DecodeError(String),
398 RequestFailed(String),
400 UnsupportedHashing {
402 multihash_code: u64,
404 },
405 UnsupportedCidVersion {
407 version: u64,
409 },
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::{OutboundFailure, RequestFailure};
416 use futures::channel::oneshot;
417 use sc_network_types::PeerId;
418 use std::{collections::VecDeque, sync::Mutex};
419
420 use super::super::{
421 is_supported_multihash_code,
422 schema::bitswap::message::{Block as MessageBlock, BlockPresence, BlockPresenceType},
423 RAW_CODEC,
424 };
425
426 fn raw_cid_from_digest(multihash_code: u64, digest: [u8; 32]) -> Result<Cid, BitswapError> {
428 if !is_supported_multihash_code(multihash_code) {
429 return Err(BitswapError::UnsupportedHashing { multihash_code });
430 }
431 let multihash = CidMultihash::wrap(multihash_code, &digest)
432 .map_err(|e| BitswapError::DecodeError(e.to_string()))?;
433 Ok(Cid::new_v1(RAW_CODEC, multihash))
434 }
435
436 struct StubSender {
437 responses: Mutex<VecDeque<Result<Vec<u8>, RequestFailure>>>,
438 requests: Mutex<Vec<Vec<u8>>>,
439 }
440
441 impl StubSender {
442 fn new(responses: impl IntoIterator<Item = Result<Vec<u8>, RequestFailure>>) -> Self {
443 Self {
444 responses: Mutex::new(responses.into_iter().collect()),
445 requests: Mutex::new(Vec::new()),
446 }
447 }
448
449 fn pop_request(&self) -> BitswapMessage {
450 let bytes = self.requests.lock().unwrap().pop().expect("request should be recorded");
451 BitswapMessage::decode(bytes.as_slice()).expect("request should decode")
452 }
453 }
454
455 #[async_trait::async_trait]
456 impl NetworkRequest for StubSender {
457 async fn request(
458 &self,
459 _target: PeerId,
460 _protocol: ProtocolName,
461 request: Vec<u8>,
462 _fallback_request: Option<(Vec<u8>, ProtocolName)>,
463 _connect: IfDisconnected,
464 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
465 self.requests.lock().unwrap().push(request);
466 self.responses
467 .lock()
468 .unwrap()
469 .pop_front()
470 .expect("StubSender: no canned response queued")
471 .map(|bytes| (bytes, ProtocolName::from(PROTOCOL_NAME)))
472 }
473
474 fn start_request(
475 &self,
476 _peer: PeerId,
477 _protocol: ProtocolName,
478 payload: Vec<u8>,
479 _fallback_request: Option<(Vec<u8>, ProtocolName)>,
480 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
481 _connect: IfDisconnected,
482 ) {
483 self.requests.lock().unwrap().push(payload);
484 let resp = self
485 .responses
486 .lock()
487 .unwrap()
488 .pop_front()
489 .expect("StubSender: no canned response queued");
490 let _ = tx.send(resp.map(|bytes| (bytes, ProtocolName::from(PROTOCOL_NAME))));
491 }
492 }
493
494 fn prefix_for(multihash_code: u64) -> Vec<u8> {
495 Prefix { version: CidVersion::V1, codec: RAW_CODEC, mh_type: multihash_code, mh_len: 32 }
496 .to_bytes()
497 }
498
499 fn cid_for_data(multihash_code: u64, data: &[u8]) -> Cid {
500 raw_cid_from_digest(multihash_code, hash_for_multihash_code(multihash_code, data).unwrap())
501 .unwrap()
502 }
503
504 fn cid_for_digest(multihash_code: u64, digest: [u8; 32]) -> Cid {
505 raw_cid_from_digest(multihash_code, digest).unwrap()
506 }
507
508 fn encode_response(blocks: &[(u64, Vec<u8>)], presences: &[(Cid, i32)]) -> Vec<u8> {
509 let payload = blocks
510 .iter()
511 .map(|(multihash_code, data)| MessageBlock {
512 prefix: prefix_for(*multihash_code),
513 data: data.clone(),
514 })
515 .collect();
516 let block_presences = presences
517 .iter()
518 .map(|(cid, ptype)| BlockPresence { cid: cid.to_bytes(), r#type: *ptype })
519 .collect();
520 BitswapMessage { payload, block_presences, ..Default::default() }.encode_to_vec()
521 }
522
523 #[tokio::test]
524 async fn request_bitswap_blocks_returns_blocks_for_all_wanted() {
525 let data_a = b"hash-a-payload".to_vec();
526 let data_b = b"hash-b-payload".to_vec();
527 let data_c = b"hash-c-payload".to_vec();
528 let cid_a = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_a);
529 let cid_b = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_b);
530 let cid_c = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_c);
531
532 let response = encode_response(
533 &[
534 (BLAKE2B_256_MULTIHASH_CODE, data_a.clone()),
535 (BLAKE2B_256_MULTIHASH_CODE, data_b.clone()),
536 (BLAKE2B_256_MULTIHASH_CODE, data_c.clone()),
537 ],
538 &[],
539 );
540 let stub = StubSender::new([Ok(response)]);
541
542 let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid_a, cid_b, cid_c])
543 .await
544 .expect("request_bitswap_blocks should succeed");
545
546 assert_eq!(result.len(), 3);
547 assert!(matches!(result.get(&cid_a), Some(FetchOutcome::Block(d)) if *d == data_a));
548 assert!(matches!(result.get(&cid_b), Some(FetchOutcome::Block(d)) if *d == data_b));
549 assert!(matches!(result.get(&cid_c), Some(FetchOutcome::Block(d)) if *d == data_c));
550 }
551
552 #[tokio::test]
553 async fn request_bitswap_blocks_dont_have_is_surfaced_as_missing() {
554 let data_a = b"a".to_vec();
555 let data_b = b"b".to_vec();
556 let cid_a = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_a);
557 let cid_b = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_b);
558 let cid_c = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, b"c-not-served");
559
560 let response = encode_response(
561 &[
562 (BLAKE2B_256_MULTIHASH_CODE, data_a.clone()),
563 (BLAKE2B_256_MULTIHASH_CODE, data_b.clone()),
564 ],
565 &[(cid_c, BlockPresenceType::DontHave as i32)],
566 );
567 let stub = StubSender::new([Ok(response)]);
568
569 let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid_a, cid_b, cid_c])
570 .await
571 .unwrap();
572
573 assert_eq!(result.len(), 3);
574 assert!(matches!(result.get(&cid_a), Some(FetchOutcome::Block(_))));
575 assert!(matches!(result.get(&cid_b), Some(FetchOutcome::Block(_))));
576 assert!(matches!(result.get(&cid_c), Some(FetchOutcome::Missing)));
577 }
578
579 #[tokio::test]
580 async fn request_bitswap_blocks_corrupted_data_dropped_as_unsolicited() {
581 let real_data = b"real-payload".to_vec();
582 let wanted_cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &real_data);
583 let corrupted_data = b"i-am-not-the-real-payload".to_vec();
584 let response = encode_response(&[(BLAKE2B_256_MULTIHASH_CODE, corrupted_data)], &[]);
585 let stub = StubSender::new([Ok(response)]);
586
587 let result = request_bitswap_blocks(&stub, PeerId::random(), &[wanted_cid]).await.unwrap();
588
589 assert_eq!(result.len(), 1);
590 assert!(matches!(result.get(&wanted_cid), Some(FetchOutcome::Missing)));
591 }
592
593 #[tokio::test]
594 async fn request_bitswap_blocks_encodes_only_want_block_entries() {
595 let cid_a = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [1u8; 32]);
596 let cid_b = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [2u8; 32]);
597 let stub = StubSender::new([Ok(BitswapMessage::default().encode_to_vec())]);
598
599 let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid_a, cid_b])
600 .await
601 .expect("block-only request must encode");
602
603 assert!(matches!(result.get(&cid_a), Some(FetchOutcome::Missing)));
604 assert!(matches!(result.get(&cid_b), Some(FetchOutcome::Missing)));
605
606 let request = stub.pop_request();
607 let entries = request.wantlist.expect("wantlist should be present").entries;
608 assert_eq!(entries.len(), 2);
609 assert_eq!(entries[0].want_type, ProtoWantType::Block as i32);
610 assert_eq!(entries[1].want_type, ProtoWantType::Block as i32);
611 }
612
613 #[tokio::test]
614 async fn request_bitswap_blocks_have_presence_alone_is_missing() {
615 let cid = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [3u8; 32]);
616 let response = encode_response(&[], &[(cid, BlockPresenceType::Have as i32)]);
617 let stub = StubSender::new([Ok(response)]);
618
619 let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid])
620 .await
621 .expect("HAVE-only response should classify successfully");
622
623 assert_eq!(result.len(), 1);
624 assert!(matches!(result.get(&cid), Some(FetchOutcome::Missing)));
625 }
626
627 #[tokio::test]
628 async fn request_bitswap_blocks_unverified_accepts_bytes_without_hash_recompute() {
629 let data = b"sha2-digest-but-blake2b-request-prefix".to_vec();
630 let cid = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, sp_crypto_hashing::sha2_256(&data));
631 let response = encode_response(&[(BLAKE2B_256_MULTIHASH_CODE, data.clone())], &[]);
632 let stub = StubSender::new([Ok(response)]);
633
634 let result = request_bitswap_blocks_unverified(&stub, PeerId::random(), &[cid])
635 .await
636 .expect("unverified fetch should not recompute hashes");
637
638 assert_eq!(result.len(), 1);
639 assert!(matches!(result.get(&cid), Some(FetchOutcome::Block(d)) if *d == data));
640 }
641
642 #[tokio::test]
643 async fn request_bitswap_blocks_unverified_dont_have_returned_as_missing() {
644 let cid = cid_for_digest(
645 BLAKE2B_256_MULTIHASH_CODE,
646 sp_crypto_hashing::sha2_256(b"pruned-unverified-payload"),
647 );
648 let response = encode_response(&[], &[(cid, BlockPresenceType::DontHave as i32)]);
649 let stub = StubSender::new([Ok(response)]);
650
651 let result = request_bitswap_blocks_unverified(&stub, PeerId::random(), &[cid])
652 .await
653 .expect("unverified DONT_HAVE should classify successfully");
654
655 assert_eq!(result.len(), 1);
656 assert!(matches!(result.get(&cid), Some(FetchOutcome::Missing)));
657 }
658
659 #[tokio::test]
660 async fn request_bitswap_blocks_unverified_empty_wants_errors() {
661 let stub = StubSender::new(std::iter::empty());
662
663 let err = request_bitswap_blocks_unverified(&stub, PeerId::random(), &[])
664 .await
665 .expect_err("empty wantlist must error");
666 assert!(matches!(err, BitswapError::DecodeError(msg) if msg == "empty wantlist"));
667 }
668
669 #[tokio::test]
670 async fn request_bitswap_blocks_duplicate_cids_error() {
671 let cid = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [9u8; 32]);
672 let stub = StubSender::new(std::iter::empty());
673
674 let err = request_bitswap_blocks(&stub, PeerId::random(), &[cid, cid])
675 .await
676 .expect_err("two wants for the same CID are ambiguous");
677 assert!(matches!(err, BitswapError::DecodeError(msg) if msg.starts_with("duplicate CID")));
678 }
679
680 #[tokio::test]
681 async fn request_bitswap_blocks_unverified_multi_want_all_served_in_request_order() {
682 let data_a = b"first-unverified-payload".to_vec();
683 let data_b = b"second-unverified-payload".to_vec();
684 let data_c = b"third-unverified-payload".to_vec();
685 let cid_a =
686 cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, sp_crypto_hashing::sha2_256(&data_a));
687 let cid_b =
688 cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, sp_crypto_hashing::keccak_256(&data_b));
689 let cid_c = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_c);
690
691 let response = encode_response(
692 &[
693 (BLAKE2B_256_MULTIHASH_CODE, data_a.clone()),
694 (BLAKE2B_256_MULTIHASH_CODE, data_b.clone()),
695 (BLAKE2B_256_MULTIHASH_CODE, data_c.clone()),
696 ],
697 &[],
698 );
699 let stub = StubSender::new([Ok(response)]);
700
701 let result =
702 request_bitswap_blocks_unverified(&stub, PeerId::random(), &[cid_a, cid_b, cid_c])
703 .await
704 .expect("multi-want unverified must succeed via positional correlation");
705
706 assert_eq!(result.len(), 3);
707 assert!(matches!(result.get(&cid_a), Some(FetchOutcome::Block(d)) if *d == data_a));
708 assert!(matches!(result.get(&cid_b), Some(FetchOutcome::Block(d)) if *d == data_b));
709 assert!(matches!(result.get(&cid_c), Some(FetchOutcome::Block(d)) if *d == data_c));
710 }
711
712 #[tokio::test]
713 async fn request_bitswap_blocks_unverified_dont_have_skips_position_in_payload_order() {
714 let data = b"second-payload-after-dont-have".to_vec();
715 let dont_have_cid = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [4u8; 32]);
716 let block_cid =
717 cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, sp_crypto_hashing::sha2_256(&data));
718 let response = encode_response(
719 &[(BLAKE2B_256_MULTIHASH_CODE, data.clone())],
720 &[(dont_have_cid, BlockPresenceType::DontHave as i32)],
721 );
722 let stub = StubSender::new([Ok(response)]);
723
724 let result =
725 request_bitswap_blocks_unverified(&stub, PeerId::random(), &[dont_have_cid, block_cid])
726 .await
727 .expect("unverified mixed presence/payload should classify successfully");
728
729 assert_eq!(result.len(), 2);
730 assert!(matches!(result.get(&dont_have_cid), Some(FetchOutcome::Missing)));
731 assert!(matches!(result.get(&block_cid), Some(FetchOutcome::Block(d)) if *d == data));
732 }
733
734 #[tokio::test]
735 async fn request_bitswap_blocks_dispatches_per_entry_multihash() {
736 let data_b2 = b"blake2b-payload".to_vec();
737 let data_sha = b"sha2-256-payload".to_vec();
738 let data_kec = b"keccak-256-payload".to_vec();
739 let cid_b2 = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_b2);
740 let cid_sha = cid_for_data(SHA2_256_MULTIHASH_CODE, &data_sha);
741 let cid_kec = cid_for_data(KECCAK_256_MULTIHASH_CODE, &data_kec);
742
743 let response = encode_response(
744 &[
745 (BLAKE2B_256_MULTIHASH_CODE, data_b2.clone()),
746 (SHA2_256_MULTIHASH_CODE, data_sha.clone()),
747 (KECCAK_256_MULTIHASH_CODE, data_kec.clone()),
748 ],
749 &[],
750 );
751 let stub = StubSender::new([Ok(response)]);
752
753 let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid_b2, cid_sha, cid_kec])
754 .await
755 .unwrap();
756
757 assert_eq!(result.len(), 3);
758 assert!(matches!(result.get(&cid_b2), Some(FetchOutcome::Block(d)) if *d == data_b2));
759 assert!(matches!(result.get(&cid_sha), Some(FetchOutcome::Block(d)) if *d == data_sha));
760 assert!(matches!(result.get(&cid_kec), Some(FetchOutcome::Block(d)) if *d == data_kec));
761 }
762
763 #[tokio::test]
764 async fn request_bitswap_blocks_over_cap_errors() {
765 let wants: Vec<_> = (0..(MAX_WANTED_BLOCKS + 1) as u8)
766 .map(|i| {
767 let mut h = [0u8; 32];
768 h[0] = i;
769 cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, h)
770 })
771 .collect();
772 let stub = StubSender::new(std::iter::empty());
773
774 let err = request_bitswap_blocks(&stub, PeerId::random(), &wants)
775 .await
776 .expect_err("over-cap wantlist must error");
777 assert!(matches!(err, BitswapError::DecodeError(_)));
778 }
779
780 #[tokio::test]
781 async fn request_bitswap_blocks_at_exactly_max_wanted_blocks_succeeds() {
782 let mut wants = Vec::with_capacity(MAX_WANTED_BLOCKS);
783 let mut blocks = Vec::with_capacity(MAX_WANTED_BLOCKS);
784 for i in 0..MAX_WANTED_BLOCKS {
785 let data = format!("payload-{i}").into_bytes();
786 wants.push(cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data));
787 blocks.push((BLAKE2B_256_MULTIHASH_CODE, data));
788 }
789
790 let response = encode_response(&blocks, &[]);
791 let stub = StubSender::new([Ok(response)]);
792
793 let result = request_bitswap_blocks(&stub, PeerId::random(), &wants)
794 .await
795 .expect("exactly MAX_WANTED_BLOCKS must succeed");
796
797 assert_eq!(result.len(), MAX_WANTED_BLOCKS);
798 for cid in &wants {
799 assert!(matches!(result.get(cid), Some(FetchOutcome::Block(_))));
800 }
801 }
802
803 #[tokio::test]
804 async fn request_bitswap_blocks_block_beats_presence_for_same_cid() {
805 let data = b"both-block-and-presence".to_vec();
806 let cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data);
807
808 let response = encode_response(
809 &[(BLAKE2B_256_MULTIHASH_CODE, data.clone())],
810 &[(cid, BlockPresenceType::DontHave as i32)],
811 );
812 let stub = StubSender::new([Ok(response)]);
813
814 let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid]).await.unwrap();
815
816 assert_eq!(result.len(), 1);
817 assert!(matches!(result.get(&cid), Some(FetchOutcome::Block(d)) if *d == data));
818 }
819
820 #[tokio::test]
821 async fn request_bitswap_blocks_response_decode_failure() {
822 let stub = StubSender::new([Ok(vec![0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff])]);
823 let cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, b"any");
824
825 let err = request_bitswap_blocks(&stub, PeerId::random(), &[cid])
826 .await
827 .expect_err("malformed response bytes must surface as DecodeError");
828 assert!(matches!(err, BitswapError::DecodeError(_)));
829 }
830
831 #[tokio::test]
832 async fn request_bitswap_blocks_request_failure_propagates() {
833 struct FailingSender;
834 #[async_trait::async_trait]
835 impl NetworkRequest for FailingSender {
836 async fn request(
837 &self,
838 _target: PeerId,
839 _protocol: ProtocolName,
840 _request: Vec<u8>,
841 _fallback_request: Option<(Vec<u8>, ProtocolName)>,
842 _connect: IfDisconnected,
843 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
844 Err(RequestFailure::Network(OutboundFailure::ConnectionClosed))
845 }
846
847 fn start_request(
848 &self,
849 _peer: PeerId,
850 _protocol: ProtocolName,
851 _payload: Vec<u8>,
852 _fallback_request: Option<(Vec<u8>, ProtocolName)>,
853 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
854 _connect: IfDisconnected,
855 ) {
856 drop(tx);
857 }
858 }
859
860 let cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, b"any");
861 let err = request_bitswap_blocks(&FailingSender, PeerId::random(), &[cid])
862 .await
863 .expect_err("request failure must surface as RequestFailed");
864 assert!(matches!(err, BitswapError::RequestFailed(_)));
865 }
866
867 #[tokio::test]
868 async fn request_bitswap_blocks_unsupported_multihash_in_block_dropped() {
869 let wanted_data = b"wanted".to_vec();
870 let wanted_cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &wanted_data);
871 const UNSUPPORTED_MH_CODE: u64 = 0x99;
872 let bad_prefix = Prefix {
873 version: CidVersion::V1,
874 codec: RAW_CODEC,
875 mh_type: UNSUPPORTED_MH_CODE,
876 mh_len: 32,
877 }
878 .to_bytes();
879
880 let mut payload_msg = BitswapMessage::default();
881 payload_msg.payload =
882 vec![MessageBlock { prefix: bad_prefix, data: b"some-bytes".to_vec() }];
883 let response = payload_msg.encode_to_vec();
884 let stub = StubSender::new([Ok(response)]);
885
886 let result = request_bitswap_blocks(&stub, PeerId::random(), &[wanted_cid]).await.unwrap();
887
888 assert_eq!(result.len(), 1);
889 assert!(matches!(result.get(&wanted_cid), Some(FetchOutcome::Missing)));
890 }
891
892 #[test]
893 fn cid_from_block_prefix_rejects_cid_v0_as_unsupported() {
894 let prefix = Prefix {
895 version: CidVersion::V0,
896 codec: RAW_CODEC,
897 mh_type: BLAKE2B_256_MULTIHASH_CODE,
898 mh_len: 32,
899 }
900 .to_bytes();
901
902 let err =
903 cid_from_block_prefix(&prefix, b"payload").expect_err("CIDv0 must be unsupported");
904 assert!(matches!(err, BitswapError::UnsupportedCidVersion { version: 0 }));
905 }
906}