referrerpolicy=no-referrer-when-downgrade

sc_network/bitswap/
client.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Substrate is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Substrate is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Substrate. If not, see <https://www.gnu.org/licenses/>.
17
18use crate::{IfDisconnected, NetworkRequest, ProtocolName};
19
20use cid::{multihash::Multihash as CidMultihash, Cid, Version as CidVersion};
21use log::{debug, trace, warn};
22use prost::Message;
23use sc_network_types::PeerId;
24use std::collections::{HashMap, HashSet};
25
26use super::{
27	is_cid_supported,
28	schema::bitswap::{
29		message::{
30			wantlist::{Entry, WantType as ProtoWantType},
31			BlockPresence, BlockPresenceType, Wantlist,
32		},
33		Message as BitswapMessage,
34	},
35	Prefix, LOG_TARGET, MAX_WANTED_BLOCKS, PROTOCOL_NAME,
36};
37
38/// Const from <https://github.com/multiformats/multicodec/blame/master/table.csv>
39/// Multihash code for BLAKE2b-256.
40pub const BLAKE2B_256_MULTIHASH_CODE: u64 = 0xb220;
41/// Multihash code for SHA2-256.
42pub const SHA2_256_MULTIHASH_CODE: u64 = 0x12;
43/// Multihash code for Keccak-256.
44pub const KECCAK_256_MULTIHASH_CODE: u64 = 0x1b;
45
46/// Per-CID outcome from a Bitswap block request.
47///
48/// The public contract is intentionally narrow: either the peer delivered the bytes for the CID
49/// or it did not. A peer signalling `DONT_HAVE` and a peer staying silent for a CID are both
50/// surfaced as [`FetchOutcome::Missing`]; callers needing a different policy must implement it
51/// over [`FetchOutcome`].
52#[derive(Debug)]
53pub enum FetchOutcome {
54	/// Peer returned bytes for the requested CID.
55	Block(Vec<u8>),
56	/// Peer did not deliver bytes for this CID.
57	///
58	/// Covers the peer explicitly answering `DONT_HAVE`, the peer answering `HAVE` without bytes,
59	/// and the peer not acknowledging the CID at all. From the caller's perspective these are
60	/// equivalent: no block was delivered.
61	Missing,
62}
63
64/// Multihash type with a 64-byte digest capacity.
65type Multihash = CidMultihash<64>;
66
67/// Validate the wantlist length is within bounds.
68fn validate_wantlist_size(len: usize) -> Result<(), BitswapError> {
69	if len == 0 {
70		return Err(BitswapError::DecodeError("empty wantlist".into()));
71	}
72	if len > MAX_WANTED_BLOCKS {
73		return Err(BitswapError::DecodeError(format!(
74			"wantlist too large: {len} > {MAX_WANTED_BLOCKS}",
75		)));
76	}
77	Ok(())
78}
79
80/// Validate CIDs: enforce length, CID support, and CID uniqueness.
81fn validate_cids(cids: &[Cid]) -> Result<(), BitswapError> {
82	validate_wantlist_size(cids.len())?;
83
84	let mut seen: HashSet<Cid> = HashSet::with_capacity(cids.len());
85	for cid in cids {
86		if !is_cid_supported(cid) {
87			return Err(BitswapError::UnsupportedHashing { multihash_code: cid.hash().code() });
88		}
89		if !seen.insert(*cid) {
90			return Err(BitswapError::DecodeError(format!("duplicate CID in wantlist: {cid}")));
91		}
92	}
93
94	Ok(())
95}
96
97/// Send one `WANT-BLOCK` request for `cids` to `peer` and classify the response.
98///
99/// Returned blocks are verified by recomputing the CID from the response prefix and bytes.
100/// Blocks whose recomputed CID was not requested are ignored.
101///
102/// Errors if `cids` is empty, larger than [`MAX_WANTED_BLOCKS`], contains an unsupported CID,
103/// or contains a duplicate CID.
104///
105/// Note: This is a temporary API that shall be superseeded by a better abstraction such as
106///  <https://github.com/paritytech/polkadot-sdk/issues/12052>
107pub async fn request_bitswap_blocks<N>(
108	network: &N,
109	peer: PeerId,
110	cids: &[Cid],
111) -> Result<HashMap<Cid, FetchOutcome>, BitswapError>
112where
113	N: NetworkRequest + ?Sized,
114{
115	validate_cids(cids)?;
116
117	let wanted: HashSet<Cid> = cids.iter().copied().collect();
118	let response = send_request(network, peer, cids).await?;
119	Ok(classify_response(response, &wanted, peer))
120}
121
122/// Like [`request_bitswap_blocks`], but does not recompute or verify the hash of received bytes.
123///
124/// Use this when the requester must fetch by CID-shaped identifiers before it can verify the
125/// returned bytes through an external authority. The response is matched by request order and
126/// CID prefix only; integrity verification is delegated to the caller.
127///
128/// Note: This is a temporary API that shall be superseeded by a better abstraction such as
129///  <https://github.com/paritytech/polkadot-sdk/issues/12052>
130pub async fn request_bitswap_blocks_unverified<N>(
131	network: &N,
132	peer: PeerId,
133	cids: &[Cid],
134) -> Result<HashMap<Cid, FetchOutcome>, BitswapError>
135where
136	N: NetworkRequest + ?Sized,
137{
138	validate_cids(cids)?;
139
140	let response = send_request(network, peer, cids).await?;
141	Ok(classify_response_unverified(response, cids, peer))
142}
143
144/// Dispatch a bitswap WANT request to `peer` and decode the response.
145async fn send_request<N>(
146	network: &N,
147	peer: PeerId,
148	cids: &[Cid],
149) -> Result<BitswapMessage, BitswapError>
150where
151	N: NetworkRequest + ?Sized,
152{
153	let entries: Vec<Entry> = cids
154		.iter()
155		.copied()
156		.map(|cid| Entry {
157			block: cid.to_bytes(),
158			want_type: ProtoWantType::Block as i32,
159			send_dont_have: true,
160			..Default::default()
161		})
162		.collect();
163	let request =
164		BitswapMessage { wantlist: Some(Wantlist { entries, full: false }), ..Default::default() };
165
166	trace!(
167		target: LOG_TARGET,
168		"client: sending Bitswap wantlist for {} CIDs to {peer}, protocol {PROTOCOL_NAME}",
169		cids.len(),
170	);
171
172	let payload = match network
173		.request(
174			peer,
175			ProtocolName::from(PROTOCOL_NAME),
176			request.encode_to_vec(),
177			None,
178			IfDisconnected::TryConnect,
179		)
180		.await
181	{
182		Ok((payload, _)) => payload,
183		Err(err) => {
184			debug!(target: LOG_TARGET, "client: batch request to {peer} rejected by network: {err:?}");
185			return Err(BitswapError::RequestFailed(err.to_string()));
186		},
187	};
188
189	BitswapMessage::decode(&payload[..]).map_err(|err| {
190		debug!(target: LOG_TARGET, "client: failed to decode batch response from {peer}: {err}");
191		BitswapError::DecodeError(err.to_string())
192	})
193}
194
195/// Classify the response by verifying each block's CID against the wanted set.
196///
197/// Every wanted CID is recorded exactly once: as [`FetchOutcome::Block`] if the peer delivered
198/// bytes whose recomputed CID is in `wanted`, otherwise as [`FetchOutcome::Missing`]. Presence
199/// frames (`HAVE` / `DONT_HAVE`) are logged for diagnostics but do not change the outcome.
200fn classify_response(
201	response: BitswapMessage,
202	wanted: &HashSet<Cid>,
203	peer: PeerId,
204) -> HashMap<Cid, FetchOutcome> {
205	let mut result: HashMap<Cid, FetchOutcome> = HashMap::with_capacity(wanted.len());
206
207	for block in response.payload {
208		let Ok(cid) = cid_from_block_prefix(&block.prefix, &block.data).inspect_err(|err| {
209			debug!(target: LOG_TARGET, "client: malformed block prefix from {peer}: {err:?}");
210		}) else {
211			continue;
212		};
213		if !wanted.contains(&cid) {
214			debug!(target: LOG_TARGET, "client: {peer} returned unsolicited block for CID {cid}");
215			continue;
216		}
217		debug!(target: LOG_TARGET, "client: {peer} returned {} bytes for CID {cid}", block.data.len());
218		result.insert(cid, FetchOutcome::Block(block.data));
219	}
220
221	log_presences(response.block_presences, wanted, peer);
222
223	for cid in wanted {
224		result.entry(*cid).or_insert(FetchOutcome::Missing);
225	}
226
227	result
228}
229
230/// Classify an unverified response via order-based correlation.
231///
232/// Every wanted CID is recorded exactly once: as [`FetchOutcome::Block`] if the peer delivered
233/// bytes whose declared prefix matches a requested CID at the corresponding position in the
234/// wantlist, otherwise as [`FetchOutcome::Missing`].
235fn classify_response_unverified(
236	response: BitswapMessage,
237	cids: &[Cid],
238	peer: PeerId,
239) -> HashMap<Cid, FetchOutcome> {
240	let mut result: HashMap<Cid, FetchOutcome> = HashMap::with_capacity(cids.len());
241	let wanted_set: HashSet<Cid> = cids.iter().copied().collect();
242	let mut dont_have_cids: HashSet<Cid> = HashSet::with_capacity(cids.len());
243
244	for presence in response.block_presences {
245		let Ok(cid) = Cid::read_bytes(presence.cid.as_slice()).inspect_err(|err| {
246			debug!(target: LOG_TARGET, "client: malformed presence CID from {peer}: {err}");
247		}) else {
248			continue;
249		};
250		if !wanted_set.contains(&cid) {
251			debug!(target: LOG_TARGET, "client: {peer} returned unsolicited presence for CID {cid}");
252			continue;
253		}
254		if presence.r#type == BlockPresenceType::DontHave as i32 {
255			debug!(target: LOG_TARGET, "client: {peer} DONT_HAVE for CID {cid}");
256			dont_have_cids.insert(cid);
257		} else if presence.r#type == BlockPresenceType::Have as i32 {
258			debug!(target: LOG_TARGET, "client: {peer} HAVE for CID {cid}");
259		} else {
260			warn!(
261				target: LOG_TARGET,
262				"client: {peer} unexpected presence type {} for CID {cid}",
263				presence.r#type,
264			);
265		}
266	}
267
268	// Unverified payloads cannot be matched by recomputing their CID from bytes, so attribute
269	// each block to the next requested CID (skipping any the peer already said it doesn't have)
270	// whose CID metadata matches the payload prefix.
271	let mut expected_payload_order =
272		cids.iter().copied().filter(|cid| !dont_have_cids.contains(cid));
273
274	for block in response.payload {
275		let Some(expected_cid) = expected_payload_order.next() else {
276			debug!(target: LOG_TARGET, "client: {peer} returned more payload blocks than expected; dropping extras");
277			break;
278		};
279		let Ok(prefix) = decode_prefix(&block.prefix).inspect_err(|err| {
280			debug!(target: LOG_TARGET, "client: malformed block prefix from {peer}: {err:?}");
281		}) else {
282			break;
283		};
284		if !prefix_matches_cid(&prefix, &expected_cid) {
285			debug!(
286				target: LOG_TARGET,
287				"client: {peer} returned block with prefix {:?} but expected CID {expected_cid}; \
288				 stopping payload attribution",
289				prefix,
290			);
291			break;
292		}
293		debug!(
294			target: LOG_TARGET,
295			"client: {peer} returned {} unverified bytes for CID {expected_cid}",
296			block.data.len(),
297		);
298		result.insert(expected_cid, FetchOutcome::Block(block.data.clone()));
299	}
300
301	for cid in cids {
302		result.entry(*cid).or_insert(FetchOutcome::Missing);
303	}
304
305	result
306}
307
308/// Log per-CID presence frames for diagnostics. Presence does not influence the public outcome.
309fn log_presences(presences: Vec<BlockPresence>, wanted: &HashSet<Cid>, peer: PeerId) {
310	for presence in presences {
311		let Ok(cid) = Cid::read_bytes(presence.cid.as_slice()).inspect_err(|err| {
312			debug!(target: LOG_TARGET, "client: malformed presence CID from {peer}: {err}");
313		}) else {
314			continue;
315		};
316		if !wanted.contains(&cid) {
317			debug!(target: LOG_TARGET, "client: {peer} returned unsolicited presence for CID {cid}");
318			continue;
319		}
320		if presence.r#type == BlockPresenceType::DontHave as i32 {
321			debug!(target: LOG_TARGET, "client: {peer} DONT_HAVE for CID {cid}");
322		} else if presence.r#type == BlockPresenceType::Have as i32 {
323			debug!(target: LOG_TARGET, "client: {peer} HAVE for CID {cid}");
324		} else {
325			debug!(
326				target: LOG_TARGET,
327				"client: {peer} unexpected presence type {} for CID {cid}",
328				presence.r#type,
329			);
330		}
331	}
332}
333
334/// Check that a decoded prefix matches a CID's version, codec, and multihash metadata.
335fn prefix_matches_cid(prefix: &Prefix, cid: &Cid) -> bool {
336	prefix.version == cid.version() &&
337		prefix.codec == cid.codec() &&
338		prefix.mh_type == cid.hash().code() &&
339		prefix.mh_len == cid.hash().size()
340}
341
342/// Reconstruct a CID from a block's prefix bytes and payload data.
343fn cid_from_block_prefix(prefix: &[u8], data: &[u8]) -> Result<Cid, BitswapError> {
344	let prefix = decode_prefix(prefix)?;
345	if prefix.version != CidVersion::V1 {
346		return Err(BitswapError::UnsupportedCidVersion { version: prefix.version.into() });
347	}
348
349	let hash = hash_for_multihash_code(prefix.mh_type, data)
350		.ok_or(BitswapError::UnsupportedHashing { multihash_code: prefix.mh_type })?;
351	let multihash = Multihash::wrap(prefix.mh_type, &hash)
352		.map_err(|err| BitswapError::DecodeError(err.to_string()))?;
353	Ok(Cid::new_v1(prefix.codec, multihash))
354}
355
356/// Compute a 32-byte hash for the given multihash code.
357fn hash_for_multihash_code(multihash_code: u64, data: &[u8]) -> Option<[u8; 32]> {
358	match multihash_code {
359		BLAKE2B_256_MULTIHASH_CODE => Some(sp_crypto_hashing::blake2_256(data)),
360		SHA2_256_MULTIHASH_CODE => Some(sp_crypto_hashing::sha2_256(data)),
361		KECCAK_256_MULTIHASH_CODE => Some(sp_crypto_hashing::keccak_256(data)),
362		_ => None,
363	}
364}
365
366/// Decode varint-encoded CID prefix bytes.
367fn decode_prefix(mut bytes: &[u8]) -> Result<Prefix, BitswapError> {
368	let mut read_varint = || -> Result<u64, BitswapError> {
369		let (v, rest) = unsigned_varint::decode::u64(bytes)
370			.map_err(|err| BitswapError::DecodeError(err.to_string()))?;
371		bytes = rest;
372		Ok(v)
373	};
374
375	let version = read_varint()?;
376	let codec = read_varint()?;
377	let mh_type = read_varint()?;
378	let mh_len = read_varint()?;
379
380	if !bytes.is_empty() {
381		return Err(BitswapError::DecodeError("bitswap block prefix had trailing bytes".into()));
382	}
383
384	let version = CidVersion::try_from(version)
385		.map_err(|_| BitswapError::UnsupportedCidVersion { version })?;
386	let mh_len = u8::try_from(mh_len).map_err(|_| {
387		BitswapError::DecodeError(format!("multihash length {mh_len} does not fit into u8"))
388	})?;
389
390	Ok(Prefix { version, codec, mh_type, mh_len })
391}
392
393/// Bitswap client errors.
394#[derive(Debug)]
395pub enum BitswapError {
396	/// Failed to decode or validate a bitswap payload.
397	DecodeError(String),
398	/// Request/response exchange failed.
399	RequestFailed(String),
400	/// Block prefix declared an unsupported multihash code.
401	UnsupportedHashing {
402		/// The unrecognised IPFS multihash code.
403		multihash_code: u64,
404	},
405	/// CID version is unsupported for this bitswap client.
406	UnsupportedCidVersion {
407		/// The unsupported CID version number.
408		version: u64,
409	},
410}
411
412#[cfg(test)]
413mod tests {
414	use super::*;
415	use crate::{OutboundFailure, RequestFailure};
416	use futures::channel::oneshot;
417	use sc_network_types::PeerId;
418	use std::{collections::VecDeque, sync::Mutex};
419
420	use super::super::{
421		is_supported_multihash_code,
422		schema::bitswap::message::{Block as MessageBlock, BlockPresence, BlockPresenceType},
423		RAW_CODEC,
424	};
425
426	/// Build a raw-codec CID from a 32-byte digest and supported multihash code.
427	fn raw_cid_from_digest(multihash_code: u64, digest: [u8; 32]) -> Result<Cid, BitswapError> {
428		if !is_supported_multihash_code(multihash_code) {
429			return Err(BitswapError::UnsupportedHashing { multihash_code });
430		}
431		let multihash = CidMultihash::wrap(multihash_code, &digest)
432			.map_err(|e| BitswapError::DecodeError(e.to_string()))?;
433		Ok(Cid::new_v1(RAW_CODEC, multihash))
434	}
435
436	struct StubSender {
437		responses: Mutex<VecDeque<Result<Vec<u8>, RequestFailure>>>,
438		requests: Mutex<Vec<Vec<u8>>>,
439	}
440
441	impl StubSender {
442		fn new(responses: impl IntoIterator<Item = Result<Vec<u8>, RequestFailure>>) -> Self {
443			Self {
444				responses: Mutex::new(responses.into_iter().collect()),
445				requests: Mutex::new(Vec::new()),
446			}
447		}
448
449		fn pop_request(&self) -> BitswapMessage {
450			let bytes = self.requests.lock().unwrap().pop().expect("request should be recorded");
451			BitswapMessage::decode(bytes.as_slice()).expect("request should decode")
452		}
453	}
454
455	#[async_trait::async_trait]
456	impl NetworkRequest for StubSender {
457		async fn request(
458			&self,
459			_target: PeerId,
460			_protocol: ProtocolName,
461			request: Vec<u8>,
462			_fallback_request: Option<(Vec<u8>, ProtocolName)>,
463			_connect: IfDisconnected,
464		) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
465			self.requests.lock().unwrap().push(request);
466			self.responses
467				.lock()
468				.unwrap()
469				.pop_front()
470				.expect("StubSender: no canned response queued")
471				.map(|bytes| (bytes, ProtocolName::from(PROTOCOL_NAME)))
472		}
473
474		fn start_request(
475			&self,
476			_peer: PeerId,
477			_protocol: ProtocolName,
478			payload: Vec<u8>,
479			_fallback_request: Option<(Vec<u8>, ProtocolName)>,
480			tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
481			_connect: IfDisconnected,
482		) {
483			self.requests.lock().unwrap().push(payload);
484			let resp = self
485				.responses
486				.lock()
487				.unwrap()
488				.pop_front()
489				.expect("StubSender: no canned response queued");
490			let _ = tx.send(resp.map(|bytes| (bytes, ProtocolName::from(PROTOCOL_NAME))));
491		}
492	}
493
494	fn prefix_for(multihash_code: u64) -> Vec<u8> {
495		Prefix { version: CidVersion::V1, codec: RAW_CODEC, mh_type: multihash_code, mh_len: 32 }
496			.to_bytes()
497	}
498
499	fn cid_for_data(multihash_code: u64, data: &[u8]) -> Cid {
500		raw_cid_from_digest(multihash_code, hash_for_multihash_code(multihash_code, data).unwrap())
501			.unwrap()
502	}
503
504	fn cid_for_digest(multihash_code: u64, digest: [u8; 32]) -> Cid {
505		raw_cid_from_digest(multihash_code, digest).unwrap()
506	}
507
508	fn encode_response(blocks: &[(u64, Vec<u8>)], presences: &[(Cid, i32)]) -> Vec<u8> {
509		let payload = blocks
510			.iter()
511			.map(|(multihash_code, data)| MessageBlock {
512				prefix: prefix_for(*multihash_code),
513				data: data.clone(),
514			})
515			.collect();
516		let block_presences = presences
517			.iter()
518			.map(|(cid, ptype)| BlockPresence { cid: cid.to_bytes(), r#type: *ptype })
519			.collect();
520		BitswapMessage { payload, block_presences, ..Default::default() }.encode_to_vec()
521	}
522
523	#[tokio::test]
524	async fn request_bitswap_blocks_returns_blocks_for_all_wanted() {
525		let data_a = b"hash-a-payload".to_vec();
526		let data_b = b"hash-b-payload".to_vec();
527		let data_c = b"hash-c-payload".to_vec();
528		let cid_a = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_a);
529		let cid_b = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_b);
530		let cid_c = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_c);
531
532		let response = encode_response(
533			&[
534				(BLAKE2B_256_MULTIHASH_CODE, data_a.clone()),
535				(BLAKE2B_256_MULTIHASH_CODE, data_b.clone()),
536				(BLAKE2B_256_MULTIHASH_CODE, data_c.clone()),
537			],
538			&[],
539		);
540		let stub = StubSender::new([Ok(response)]);
541
542		let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid_a, cid_b, cid_c])
543			.await
544			.expect("request_bitswap_blocks should succeed");
545
546		assert_eq!(result.len(), 3);
547		assert!(matches!(result.get(&cid_a), Some(FetchOutcome::Block(d)) if *d == data_a));
548		assert!(matches!(result.get(&cid_b), Some(FetchOutcome::Block(d)) if *d == data_b));
549		assert!(matches!(result.get(&cid_c), Some(FetchOutcome::Block(d)) if *d == data_c));
550	}
551
552	#[tokio::test]
553	async fn request_bitswap_blocks_dont_have_is_surfaced_as_missing() {
554		let data_a = b"a".to_vec();
555		let data_b = b"b".to_vec();
556		let cid_a = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_a);
557		let cid_b = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_b);
558		let cid_c = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, b"c-not-served");
559
560		let response = encode_response(
561			&[
562				(BLAKE2B_256_MULTIHASH_CODE, data_a.clone()),
563				(BLAKE2B_256_MULTIHASH_CODE, data_b.clone()),
564			],
565			&[(cid_c, BlockPresenceType::DontHave as i32)],
566		);
567		let stub = StubSender::new([Ok(response)]);
568
569		let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid_a, cid_b, cid_c])
570			.await
571			.unwrap();
572
573		assert_eq!(result.len(), 3);
574		assert!(matches!(result.get(&cid_a), Some(FetchOutcome::Block(_))));
575		assert!(matches!(result.get(&cid_b), Some(FetchOutcome::Block(_))));
576		assert!(matches!(result.get(&cid_c), Some(FetchOutcome::Missing)));
577	}
578
579	#[tokio::test]
580	async fn request_bitswap_blocks_corrupted_data_dropped_as_unsolicited() {
581		let real_data = b"real-payload".to_vec();
582		let wanted_cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &real_data);
583		let corrupted_data = b"i-am-not-the-real-payload".to_vec();
584		let response = encode_response(&[(BLAKE2B_256_MULTIHASH_CODE, corrupted_data)], &[]);
585		let stub = StubSender::new([Ok(response)]);
586
587		let result = request_bitswap_blocks(&stub, PeerId::random(), &[wanted_cid]).await.unwrap();
588
589		assert_eq!(result.len(), 1);
590		assert!(matches!(result.get(&wanted_cid), Some(FetchOutcome::Missing)));
591	}
592
593	#[tokio::test]
594	async fn request_bitswap_blocks_encodes_only_want_block_entries() {
595		let cid_a = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [1u8; 32]);
596		let cid_b = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [2u8; 32]);
597		let stub = StubSender::new([Ok(BitswapMessage::default().encode_to_vec())]);
598
599		let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid_a, cid_b])
600			.await
601			.expect("block-only request must encode");
602
603		assert!(matches!(result.get(&cid_a), Some(FetchOutcome::Missing)));
604		assert!(matches!(result.get(&cid_b), Some(FetchOutcome::Missing)));
605
606		let request = stub.pop_request();
607		let entries = request.wantlist.expect("wantlist should be present").entries;
608		assert_eq!(entries.len(), 2);
609		assert_eq!(entries[0].want_type, ProtoWantType::Block as i32);
610		assert_eq!(entries[1].want_type, ProtoWantType::Block as i32);
611	}
612
613	#[tokio::test]
614	async fn request_bitswap_blocks_have_presence_alone_is_missing() {
615		let cid = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [3u8; 32]);
616		let response = encode_response(&[], &[(cid, BlockPresenceType::Have as i32)]);
617		let stub = StubSender::new([Ok(response)]);
618
619		let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid])
620			.await
621			.expect("HAVE-only response should classify successfully");
622
623		assert_eq!(result.len(), 1);
624		assert!(matches!(result.get(&cid), Some(FetchOutcome::Missing)));
625	}
626
627	#[tokio::test]
628	async fn request_bitswap_blocks_unverified_accepts_bytes_without_hash_recompute() {
629		let data = b"sha2-digest-but-blake2b-request-prefix".to_vec();
630		let cid = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, sp_crypto_hashing::sha2_256(&data));
631		let response = encode_response(&[(BLAKE2B_256_MULTIHASH_CODE, data.clone())], &[]);
632		let stub = StubSender::new([Ok(response)]);
633
634		let result = request_bitswap_blocks_unverified(&stub, PeerId::random(), &[cid])
635			.await
636			.expect("unverified fetch should not recompute hashes");
637
638		assert_eq!(result.len(), 1);
639		assert!(matches!(result.get(&cid), Some(FetchOutcome::Block(d)) if *d == data));
640	}
641
642	#[tokio::test]
643	async fn request_bitswap_blocks_unverified_dont_have_returned_as_missing() {
644		let cid = cid_for_digest(
645			BLAKE2B_256_MULTIHASH_CODE,
646			sp_crypto_hashing::sha2_256(b"pruned-unverified-payload"),
647		);
648		let response = encode_response(&[], &[(cid, BlockPresenceType::DontHave as i32)]);
649		let stub = StubSender::new([Ok(response)]);
650
651		let result = request_bitswap_blocks_unverified(&stub, PeerId::random(), &[cid])
652			.await
653			.expect("unverified DONT_HAVE should classify successfully");
654
655		assert_eq!(result.len(), 1);
656		assert!(matches!(result.get(&cid), Some(FetchOutcome::Missing)));
657	}
658
659	#[tokio::test]
660	async fn request_bitswap_blocks_unverified_empty_wants_errors() {
661		let stub = StubSender::new(std::iter::empty());
662
663		let err = request_bitswap_blocks_unverified(&stub, PeerId::random(), &[])
664			.await
665			.expect_err("empty wantlist must error");
666		assert!(matches!(err, BitswapError::DecodeError(msg) if msg == "empty wantlist"));
667	}
668
669	#[tokio::test]
670	async fn request_bitswap_blocks_duplicate_cids_error() {
671		let cid = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [9u8; 32]);
672		let stub = StubSender::new(std::iter::empty());
673
674		let err = request_bitswap_blocks(&stub, PeerId::random(), &[cid, cid])
675			.await
676			.expect_err("two wants for the same CID are ambiguous");
677		assert!(matches!(err, BitswapError::DecodeError(msg) if msg.starts_with("duplicate CID")));
678	}
679
680	#[tokio::test]
681	async fn request_bitswap_blocks_unverified_multi_want_all_served_in_request_order() {
682		let data_a = b"first-unverified-payload".to_vec();
683		let data_b = b"second-unverified-payload".to_vec();
684		let data_c = b"third-unverified-payload".to_vec();
685		let cid_a =
686			cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, sp_crypto_hashing::sha2_256(&data_a));
687		let cid_b =
688			cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, sp_crypto_hashing::keccak_256(&data_b));
689		let cid_c = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_c);
690
691		let response = encode_response(
692			&[
693				(BLAKE2B_256_MULTIHASH_CODE, data_a.clone()),
694				(BLAKE2B_256_MULTIHASH_CODE, data_b.clone()),
695				(BLAKE2B_256_MULTIHASH_CODE, data_c.clone()),
696			],
697			&[],
698		);
699		let stub = StubSender::new([Ok(response)]);
700
701		let result =
702			request_bitswap_blocks_unverified(&stub, PeerId::random(), &[cid_a, cid_b, cid_c])
703				.await
704				.expect("multi-want unverified must succeed via positional correlation");
705
706		assert_eq!(result.len(), 3);
707		assert!(matches!(result.get(&cid_a), Some(FetchOutcome::Block(d)) if *d == data_a));
708		assert!(matches!(result.get(&cid_b), Some(FetchOutcome::Block(d)) if *d == data_b));
709		assert!(matches!(result.get(&cid_c), Some(FetchOutcome::Block(d)) if *d == data_c));
710	}
711
712	#[tokio::test]
713	async fn request_bitswap_blocks_unverified_dont_have_skips_position_in_payload_order() {
714		let data = b"second-payload-after-dont-have".to_vec();
715		let dont_have_cid = cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, [4u8; 32]);
716		let block_cid =
717			cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, sp_crypto_hashing::sha2_256(&data));
718		let response = encode_response(
719			&[(BLAKE2B_256_MULTIHASH_CODE, data.clone())],
720			&[(dont_have_cid, BlockPresenceType::DontHave as i32)],
721		);
722		let stub = StubSender::new([Ok(response)]);
723
724		let result =
725			request_bitswap_blocks_unverified(&stub, PeerId::random(), &[dont_have_cid, block_cid])
726				.await
727				.expect("unverified mixed presence/payload should classify successfully");
728
729		assert_eq!(result.len(), 2);
730		assert!(matches!(result.get(&dont_have_cid), Some(FetchOutcome::Missing)));
731		assert!(matches!(result.get(&block_cid), Some(FetchOutcome::Block(d)) if *d == data));
732	}
733
734	#[tokio::test]
735	async fn request_bitswap_blocks_dispatches_per_entry_multihash() {
736		let data_b2 = b"blake2b-payload".to_vec();
737		let data_sha = b"sha2-256-payload".to_vec();
738		let data_kec = b"keccak-256-payload".to_vec();
739		let cid_b2 = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data_b2);
740		let cid_sha = cid_for_data(SHA2_256_MULTIHASH_CODE, &data_sha);
741		let cid_kec = cid_for_data(KECCAK_256_MULTIHASH_CODE, &data_kec);
742
743		let response = encode_response(
744			&[
745				(BLAKE2B_256_MULTIHASH_CODE, data_b2.clone()),
746				(SHA2_256_MULTIHASH_CODE, data_sha.clone()),
747				(KECCAK_256_MULTIHASH_CODE, data_kec.clone()),
748			],
749			&[],
750		);
751		let stub = StubSender::new([Ok(response)]);
752
753		let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid_b2, cid_sha, cid_kec])
754			.await
755			.unwrap();
756
757		assert_eq!(result.len(), 3);
758		assert!(matches!(result.get(&cid_b2), Some(FetchOutcome::Block(d)) if *d == data_b2));
759		assert!(matches!(result.get(&cid_sha), Some(FetchOutcome::Block(d)) if *d == data_sha));
760		assert!(matches!(result.get(&cid_kec), Some(FetchOutcome::Block(d)) if *d == data_kec));
761	}
762
763	#[tokio::test]
764	async fn request_bitswap_blocks_over_cap_errors() {
765		let wants: Vec<_> = (0..(MAX_WANTED_BLOCKS + 1) as u8)
766			.map(|i| {
767				let mut h = [0u8; 32];
768				h[0] = i;
769				cid_for_digest(BLAKE2B_256_MULTIHASH_CODE, h)
770			})
771			.collect();
772		let stub = StubSender::new(std::iter::empty());
773
774		let err = request_bitswap_blocks(&stub, PeerId::random(), &wants)
775			.await
776			.expect_err("over-cap wantlist must error");
777		assert!(matches!(err, BitswapError::DecodeError(_)));
778	}
779
780	#[tokio::test]
781	async fn request_bitswap_blocks_at_exactly_max_wanted_blocks_succeeds() {
782		let mut wants = Vec::with_capacity(MAX_WANTED_BLOCKS);
783		let mut blocks = Vec::with_capacity(MAX_WANTED_BLOCKS);
784		for i in 0..MAX_WANTED_BLOCKS {
785			let data = format!("payload-{i}").into_bytes();
786			wants.push(cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data));
787			blocks.push((BLAKE2B_256_MULTIHASH_CODE, data));
788		}
789
790		let response = encode_response(&blocks, &[]);
791		let stub = StubSender::new([Ok(response)]);
792
793		let result = request_bitswap_blocks(&stub, PeerId::random(), &wants)
794			.await
795			.expect("exactly MAX_WANTED_BLOCKS must succeed");
796
797		assert_eq!(result.len(), MAX_WANTED_BLOCKS);
798		for cid in &wants {
799			assert!(matches!(result.get(cid), Some(FetchOutcome::Block(_))));
800		}
801	}
802
803	#[tokio::test]
804	async fn request_bitswap_blocks_block_beats_presence_for_same_cid() {
805		let data = b"both-block-and-presence".to_vec();
806		let cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &data);
807
808		let response = encode_response(
809			&[(BLAKE2B_256_MULTIHASH_CODE, data.clone())],
810			&[(cid, BlockPresenceType::DontHave as i32)],
811		);
812		let stub = StubSender::new([Ok(response)]);
813
814		let result = request_bitswap_blocks(&stub, PeerId::random(), &[cid]).await.unwrap();
815
816		assert_eq!(result.len(), 1);
817		assert!(matches!(result.get(&cid), Some(FetchOutcome::Block(d)) if *d == data));
818	}
819
820	#[tokio::test]
821	async fn request_bitswap_blocks_response_decode_failure() {
822		let stub = StubSender::new([Ok(vec![0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff])]);
823		let cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, b"any");
824
825		let err = request_bitswap_blocks(&stub, PeerId::random(), &[cid])
826			.await
827			.expect_err("malformed response bytes must surface as DecodeError");
828		assert!(matches!(err, BitswapError::DecodeError(_)));
829	}
830
831	#[tokio::test]
832	async fn request_bitswap_blocks_request_failure_propagates() {
833		struct FailingSender;
834		#[async_trait::async_trait]
835		impl NetworkRequest for FailingSender {
836			async fn request(
837				&self,
838				_target: PeerId,
839				_protocol: ProtocolName,
840				_request: Vec<u8>,
841				_fallback_request: Option<(Vec<u8>, ProtocolName)>,
842				_connect: IfDisconnected,
843			) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
844				Err(RequestFailure::Network(OutboundFailure::ConnectionClosed))
845			}
846
847			fn start_request(
848				&self,
849				_peer: PeerId,
850				_protocol: ProtocolName,
851				_payload: Vec<u8>,
852				_fallback_request: Option<(Vec<u8>, ProtocolName)>,
853				tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
854				_connect: IfDisconnected,
855			) {
856				drop(tx);
857			}
858		}
859
860		let cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, b"any");
861		let err = request_bitswap_blocks(&FailingSender, PeerId::random(), &[cid])
862			.await
863			.expect_err("request failure must surface as RequestFailed");
864		assert!(matches!(err, BitswapError::RequestFailed(_)));
865	}
866
867	#[tokio::test]
868	async fn request_bitswap_blocks_unsupported_multihash_in_block_dropped() {
869		let wanted_data = b"wanted".to_vec();
870		let wanted_cid = cid_for_data(BLAKE2B_256_MULTIHASH_CODE, &wanted_data);
871		const UNSUPPORTED_MH_CODE: u64 = 0x99;
872		let bad_prefix = Prefix {
873			version: CidVersion::V1,
874			codec: RAW_CODEC,
875			mh_type: UNSUPPORTED_MH_CODE,
876			mh_len: 32,
877		}
878		.to_bytes();
879
880		let mut payload_msg = BitswapMessage::default();
881		payload_msg.payload =
882			vec![MessageBlock { prefix: bad_prefix, data: b"some-bytes".to_vec() }];
883		let response = payload_msg.encode_to_vec();
884		let stub = StubSender::new([Ok(response)]);
885
886		let result = request_bitswap_blocks(&stub, PeerId::random(), &[wanted_cid]).await.unwrap();
887
888		assert_eq!(result.len(), 1);
889		assert!(matches!(result.get(&wanted_cid), Some(FetchOutcome::Missing)));
890	}
891
892	#[test]
893	fn cid_from_block_prefix_rejects_cid_v0_as_unsupported() {
894		let prefix = Prefix {
895			version: CidVersion::V0,
896			codec: RAW_CODEC,
897			mh_type: BLAKE2B_256_MULTIHASH_CODE,
898			mh_len: 32,
899		}
900		.to_bytes();
901
902		let err =
903			cid_from_block_prefix(&prefix, b"payload").expect_err("CIDv0 must be unsupported");
904		assert!(matches!(err, BitswapError::UnsupportedCidVersion { version: 0 }));
905	}
906}