1use crate::{
24 request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
25 types::ProtocolName,
26 MAX_RESPONSE_SIZE,
27};
28
29use cid::{self, Version};
30use futures::StreamExt;
31use log::{debug, error, trace};
32use prost::Message;
33use sc_client_api::BlockBackend;
34use sc_network_types::PeerId;
35use schema::bitswap::{
36 message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
37 Message as BitswapMessage,
38};
39use sp_runtime::traits::Block as BlockT;
40use std::{io, sync::Arc, time::Duration};
41use unsigned_varint::encode as varint_encode;
42
43mod schema;
44
45const LOG_TARGET: &str = "bitswap";
46
47const MAX_PACKET_SIZE: u64 = MAX_RESPONSE_SIZE;
52
53const MAX_REQUEST_QUEUE: usize = 20;
55
56const MAX_WANTED_BLOCKS: usize = 16;
58
59const PROTOCOL_NAME: &'static str = "/ipfs/bitswap/1.2.0";
61
62#[derive(PartialEq, Eq, Clone, Debug)]
64struct Prefix {
65 pub version: Version,
67 pub codec: u64,
69 pub mh_type: u64,
71 pub mh_len: u8,
73}
74
75impl Prefix {
76 pub fn to_bytes(&self) -> Vec<u8> {
78 let mut res = Vec::with_capacity(4);
79 let mut buf = varint_encode::u64_buffer();
80 let version = varint_encode::u64(self.version.into(), &mut buf);
81 res.extend_from_slice(version);
82 let mut buf = varint_encode::u64_buffer();
83 let codec = varint_encode::u64(self.codec, &mut buf);
84 res.extend_from_slice(codec);
85 let mut buf = varint_encode::u64_buffer();
86 let mh_type = varint_encode::u64(self.mh_type, &mut buf);
87 res.extend_from_slice(mh_type);
88 let mut buf = varint_encode::u64_buffer();
89 let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf);
90 res.extend_from_slice(mh_len);
91 res
92 }
93}
94
95pub struct BitswapRequestHandler<B> {
97 client: Arc<dyn BlockBackend<B> + Send + Sync>,
98 request_receiver: async_channel::Receiver<IncomingRequest>,
99}
100
101impl<B: BlockT> BitswapRequestHandler<B> {
102 pub fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
104 let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);
105
106 let config = ProtocolConfig {
107 name: ProtocolName::from(PROTOCOL_NAME),
108 fallback_names: vec![],
109 max_request_size: MAX_PACKET_SIZE,
110 max_response_size: MAX_PACKET_SIZE,
111 request_timeout: Duration::from_secs(15),
112 inbound_queue: Some(tx),
113 };
114
115 (Self { client, request_receiver }, config)
116 }
117
118 pub async fn run(mut self) {
120 while let Some(request) = self.request_receiver.next().await {
121 let IncomingRequest { peer, payload, pending_response } = request;
122
123 match self.handle_message(&peer, &payload) {
124 Ok(response) => {
125 let response = OutgoingResponse {
126 result: Ok(response),
127 reputation_changes: Vec::new(),
128 sent_feedback: None,
129 };
130
131 match pending_response.send(response) {
132 Ok(()) => {
133 trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",)
134 },
135 Err(_) => debug!(
136 target: LOG_TARGET,
137 "Failed to handle light client request from {peer}: {}",
138 BitswapError::SendResponse,
139 ),
140 }
141 },
142 Err(err) => {
143 error!(target: LOG_TARGET, "Failed to process request from {peer}: {err}");
144
145 let response = OutgoingResponse {
148 result: Err(()),
149 reputation_changes: vec![],
150 sent_feedback: None,
151 };
152
153 if pending_response.send(response).is_err() {
154 debug!(
155 target: LOG_TARGET,
156 "Failed to handle bitswap request from {peer}: {}",
157 BitswapError::SendResponse,
158 );
159 }
160 },
161 }
162 }
163 }
164
165 fn handle_message(
167 &mut self,
168 peer: &PeerId,
169 payload: &Vec<u8>,
170 ) -> Result<Vec<u8>, BitswapError> {
171 let request = schema::bitswap::Message::decode(&payload[..])?;
172
173 trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
174
175 let mut response = BitswapMessage::default();
176
177 let wantlist = match request.wantlist {
178 Some(wantlist) => wantlist,
179 None => {
180 debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer);
181 return Err(BitswapError::InvalidWantList)
182 },
183 };
184
185 if wantlist.entries.len() > MAX_WANTED_BLOCKS {
186 trace!(target: LOG_TARGET, "Ignored request: too many entries");
187 return Err(BitswapError::TooManyEntries)
188 }
189
190 for entry in wantlist.entries {
191 let cid = match cid::Cid::read_bytes(entry.block.as_slice()) {
192 Ok(cid) => cid,
193 Err(e) => {
194 trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e);
195 continue
196 },
197 };
198
199 if cid.version() != cid::Version::V1 ||
200 cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) ||
201 cid.hash().size() != 32
202 {
203 debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);
204 continue
205 }
206
207 let mut hash = B::Hash::default();
208 hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
209 let transaction = match self.client.indexed_transaction(hash) {
210 Ok(ex) => ex,
211 Err(e) => {
212 error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
213 None
214 },
215 };
216
217 match transaction {
218 Some(transaction) => {
219 trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
220
221 if entry.want_type == WantType::Block as i32 {
222 let prefix = Prefix {
223 version: cid.version(),
224 codec: cid.codec(),
225 mh_type: cid.hash().code(),
226 mh_len: cid.hash().size(),
227 };
228 response
229 .payload
230 .push(MessageBlock { prefix: prefix.to_bytes(), data: transaction });
231 } else {
232 response.block_presences.push(BlockPresence {
233 r#type: BlockPresenceType::Have as i32,
234 cid: cid.to_bytes(),
235 });
236 }
237 },
238 None => {
239 trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash);
240
241 if entry.send_dont_have {
242 response.block_presences.push(BlockPresence {
243 r#type: BlockPresenceType::DontHave as i32,
244 cid: cid.to_bytes(),
245 });
246 }
247 },
248 }
249 }
250
251 Ok(response.encode_to_vec())
252 }
253}
254
255#[derive(Debug, thiserror::Error)]
257pub enum BitswapError {
258 #[error("Failed to decode request: {0}.")]
260 DecodeProto(#[from] prost::DecodeError),
261
262 #[error("Failed to encode response: {0}.")]
264 EncodeProto(#[from] prost::EncodeError),
265
266 #[error(transparent)]
268 Client(#[from] sp_blockchain::Error),
269
270 #[error(transparent)]
272 BadCid(#[from] cid::Error),
273
274 #[error(transparent)]
276 Read(#[from] io::Error),
277
278 #[error("Failed to send response.")]
280 SendResponse,
281
282 #[error("Invalid WANT list.")]
284 InvalidWantList,
285
286 #[error("Too many block entries in the request.")]
288 TooManyEntries,
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use futures::channel::oneshot;
295 use sc_block_builder::BlockBuilderBuilder;
296 use schema::bitswap::{
297 message::{wantlist::Entry, Wantlist},
298 Message as BitswapMessage,
299 };
300 use sp_consensus::BlockOrigin;
301 use sp_runtime::codec::Encode;
302 use substrate_test_runtime::ExtrinsicBuilder;
303 use substrate_test_runtime_client::{self, prelude::*, TestClientBuilder};
304
305 #[tokio::test]
306 async fn undecodable_message() {
307 let client = substrate_test_runtime_client::new();
308 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
309
310 tokio::spawn(async move { bitswap.run().await });
311
312 let (tx, rx) = oneshot::channel();
313 config
314 .inbound_queue
315 .unwrap()
316 .send(IncomingRequest {
317 peer: PeerId::random(),
318 payload: vec![0x13, 0x37, 0x13, 0x38],
319 pending_response: tx,
320 })
321 .await
322 .unwrap();
323
324 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
325 assert_eq!(result, Err(()));
326 assert_eq!(reputation_changes, Vec::new());
327 assert!(sent_feedback.is_none());
328 } else {
329 panic!("invalid event received");
330 }
331 }
332
333 #[tokio::test]
334 async fn empty_want_list() {
335 let client = substrate_test_runtime_client::new();
336 let (bitswap, mut config) = BitswapRequestHandler::new(Arc::new(client));
337
338 tokio::spawn(async move { bitswap.run().await });
339
340 let (tx, rx) = oneshot::channel();
341 config
342 .inbound_queue
343 .as_mut()
344 .unwrap()
345 .send(IncomingRequest {
346 peer: PeerId::random(),
347 payload: BitswapMessage { wantlist: None, ..Default::default() }.encode_to_vec(),
348 pending_response: tx,
349 })
350 .await
351 .unwrap();
352
353 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
354 assert_eq!(result, Err(()));
355 assert_eq!(reputation_changes, Vec::new());
356 assert!(sent_feedback.is_none());
357 } else {
358 panic!("invalid event received");
359 }
360
361 let (tx, rx) = oneshot::channel();
363 config
364 .inbound_queue
365 .unwrap()
366 .send(IncomingRequest {
367 peer: PeerId::random(),
368 payload: BitswapMessage {
369 wantlist: Some(Default::default()),
370 ..Default::default()
371 }
372 .encode_to_vec(),
373 pending_response: tx,
374 })
375 .await
376 .unwrap();
377
378 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
379 assert_eq!(result, Ok(BitswapMessage::default().encode_to_vec()));
380 assert_eq!(reputation_changes, Vec::new());
381 assert!(sent_feedback.is_none());
382 } else {
383 panic!("invalid event received");
384 }
385 }
386
387 #[tokio::test]
388 async fn too_long_want_list() {
389 let client = substrate_test_runtime_client::new();
390 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
391
392 tokio::spawn(async move { bitswap.run().await });
393
394 let (tx, rx) = oneshot::channel();
395 config
396 .inbound_queue
397 .unwrap()
398 .send(IncomingRequest {
399 peer: PeerId::random(),
400 payload: BitswapMessage {
401 wantlist: Some(Wantlist {
402 entries: (0..MAX_WANTED_BLOCKS + 1)
403 .map(|_| Entry::default())
404 .collect::<Vec<_>>(),
405 full: false,
406 }),
407 ..Default::default()
408 }
409 .encode_to_vec(),
410 pending_response: tx,
411 })
412 .await
413 .unwrap();
414
415 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
416 assert_eq!(result, Err(()));
417 assert_eq!(reputation_changes, Vec::new());
418 assert!(sent_feedback.is_none());
419 } else {
420 panic!("invalid event received");
421 }
422 }
423
424 #[tokio::test]
425 async fn transaction_not_found() {
426 let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
427
428 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
429 tokio::spawn(async move { bitswap.run().await });
430
431 let (tx, rx) = oneshot::channel();
432 config
433 .inbound_queue
434 .unwrap()
435 .send(IncomingRequest {
436 peer: PeerId::random(),
437 payload: BitswapMessage {
438 wantlist: Some(Wantlist {
439 entries: vec![Entry {
440 block: cid::Cid::new_v1(
441 0x70,
442 cid::multihash::Multihash::wrap(
443 u64::from(cid::multihash::Code::Blake2b256),
444 &[0u8; 32],
445 )
446 .unwrap(),
447 )
448 .to_bytes(),
449 ..Default::default()
450 }],
451 full: false,
452 }),
453 ..Default::default()
454 }
455 .encode_to_vec(),
456 pending_response: tx,
457 })
458 .await
459 .unwrap();
460
461 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
462 assert_eq!(result, Ok(vec![]));
463 assert_eq!(reputation_changes, Vec::new());
464 assert!(sent_feedback.is_none());
465 } else {
466 panic!("invalid event received");
467 }
468 }
469
470 #[tokio::test]
471 async fn transaction_found() {
472 let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
473 let mut block_builder = BlockBuilderBuilder::new(&client)
474 .on_parent_block(client.chain_info().genesis_hash)
475 .with_parent_block_number(0)
476 .build()
477 .unwrap();
478
479 let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
481 let pattern_index = ext.encoded_size() - 4;
482
483 block_builder.push(ext.clone()).unwrap();
484 let block = block_builder.build().unwrap().block;
485
486 client.import(BlockOrigin::File, block).await.unwrap();
487
488 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
489
490 tokio::spawn(async move { bitswap.run().await });
491
492 let (tx, rx) = oneshot::channel();
493 config
494 .inbound_queue
495 .unwrap()
496 .send(IncomingRequest {
497 peer: PeerId::random(),
498 payload: BitswapMessage {
499 wantlist: Some(Wantlist {
500 entries: vec![Entry {
501 block: cid::Cid::new_v1(
502 0x70,
503 cid::multihash::Multihash::wrap(
504 u64::from(cid::multihash::Code::Blake2b256),
505 &sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
506 )
507 .unwrap(),
508 )
509 .to_bytes(),
510 ..Default::default()
511 }],
512 full: false,
513 }),
514 ..Default::default()
515 }
516 .encode_to_vec(),
517 pending_response: tx,
518 })
519 .await
520 .unwrap();
521
522 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
523 assert_eq!(reputation_changes, Vec::new());
524 assert!(sent_feedback.is_none());
525
526 let response =
527 schema::bitswap::Message::decode(&result.expect("fetch to succeed")[..]).unwrap();
528 assert_eq!(response.payload[0].data, vec![0x13, 0x37, 0x13, 0x38]);
529 } else {
530 panic!("invalid event received");
531 }
532 }
533}