1use crate::{
25 request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
26 types::ProtocolName,
27 MAX_RESPONSE_SIZE,
28};
29
30use cid::{self, Version};
31use futures::StreamExt;
32use log::{debug, error, trace};
33use prost::Message;
34use sc_client_api::BlockBackend;
35use sc_network_types::PeerId;
36use schema::bitswap::{
37 message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
38 Message as BitswapMessage,
39};
40use sp_runtime::traits::Block as BlockT;
41use std::{io, sync::Arc, time::Duration};
42use unsigned_varint::encode as varint_encode;
43
44mod schema;
45
46const LOG_TARGET: &str = "bitswap";
47
48const MAX_PACKET_SIZE: u64 = MAX_RESPONSE_SIZE;
53
54const MAX_REQUEST_QUEUE: usize = 20;
56
57const MAX_WANTED_BLOCKS: usize = 16;
59
60const PROTOCOL_NAME: &'static str = "/ipfs/bitswap/1.2.0";
62
63#[derive(PartialEq, Eq, Clone, Debug)]
65struct Prefix {
66 pub version: Version,
68 pub codec: u64,
70 pub mh_type: u64,
72 pub mh_len: u8,
74}
75
76impl Prefix {
77 pub fn to_bytes(&self) -> Vec<u8> {
79 let mut res = Vec::with_capacity(4);
80 let mut buf = varint_encode::u64_buffer();
81 let version = varint_encode::u64(self.version.into(), &mut buf);
82 res.extend_from_slice(version);
83 let mut buf = varint_encode::u64_buffer();
84 let codec = varint_encode::u64(self.codec, &mut buf);
85 res.extend_from_slice(codec);
86 let mut buf = varint_encode::u64_buffer();
87 let mh_type = varint_encode::u64(self.mh_type, &mut buf);
88 res.extend_from_slice(mh_type);
89 let mut buf = varint_encode::u64_buffer();
90 let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf);
91 res.extend_from_slice(mh_len);
92 res
93 }
94}
95
96pub struct BitswapRequestHandler<B> {
98 client: Arc<dyn BlockBackend<B> + Send + Sync>,
99 request_receiver: async_channel::Receiver<IncomingRequest>,
100}
101
102impl<B: BlockT> BitswapRequestHandler<B> {
103 pub fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
105 let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);
106
107 let config = ProtocolConfig {
108 name: ProtocolName::from(PROTOCOL_NAME),
109 fallback_names: vec![],
110 max_request_size: MAX_PACKET_SIZE,
111 max_response_size: MAX_PACKET_SIZE,
112 request_timeout: Duration::from_secs(15),
113 inbound_queue: Some(tx),
114 };
115
116 (Self { client, request_receiver }, config)
117 }
118
119 pub async fn run(mut self) {
121 while let Some(request) = self.request_receiver.next().await {
122 let IncomingRequest { peer, payload, pending_response } = request;
123
124 match self.handle_message(&peer, &payload) {
125 Ok(response) => {
126 let response = OutgoingResponse {
127 result: Ok(response),
128 reputation_changes: Vec::new(),
129 sent_feedback: None,
130 };
131
132 match pending_response.send(response) {
133 Ok(()) => {
134 trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",)
135 },
136 Err(_) => debug!(
137 target: LOG_TARGET,
138 "Failed to handle light client request from {peer}: {}",
139 BitswapError::SendResponse,
140 ),
141 }
142 },
143 Err(err) => {
144 error!(target: LOG_TARGET, "Failed to process request from {peer}: {err}");
145
146 let response = OutgoingResponse {
149 result: Err(()),
150 reputation_changes: vec![],
151 sent_feedback: None,
152 };
153
154 if pending_response.send(response).is_err() {
155 debug!(
156 target: LOG_TARGET,
157 "Failed to handle bitswap request from {peer}: {}",
158 BitswapError::SendResponse,
159 );
160 }
161 },
162 }
163 }
164 }
165
166 fn handle_message(
168 &mut self,
169 peer: &PeerId,
170 payload: &Vec<u8>,
171 ) -> Result<Vec<u8>, BitswapError> {
172 let request = schema::bitswap::Message::decode(&payload[..])?;
173
174 trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
175
176 let mut response = BitswapMessage::default();
177
178 let wantlist = match request.wantlist {
179 Some(wantlist) => wantlist,
180 None => {
181 debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer);
182 return Err(BitswapError::InvalidWantList)
183 },
184 };
185
186 if wantlist.entries.len() > MAX_WANTED_BLOCKS {
187 trace!(target: LOG_TARGET, "Ignored request: too many entries");
188 return Err(BitswapError::TooManyEntries)
189 }
190
191 for entry in wantlist.entries {
192 let cid = match cid::Cid::read_bytes(entry.block.as_slice()) {
193 Ok(cid) => cid,
194 Err(e) => {
195 trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e);
196 continue
197 },
198 };
199
200 if cid.version() != cid::Version::V1 ||
201 cid.hash().code() != u64::from(cid::multihash::Code::Blake2b256) ||
202 cid.hash().size() != 32
203 {
204 debug!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);
205 continue
206 }
207
208 let mut hash = B::Hash::default();
209 hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
210 let transaction = match self.client.indexed_transaction(hash) {
211 Ok(ex) => ex,
212 Err(e) => {
213 error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
214 None
215 },
216 };
217
218 match transaction {
219 Some(transaction) => {
220 trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
221
222 if entry.want_type == WantType::Block as i32 {
223 let prefix = Prefix {
224 version: cid.version(),
225 codec: cid.codec(),
226 mh_type: cid.hash().code(),
227 mh_len: cid.hash().size(),
228 };
229 response
230 .payload
231 .push(MessageBlock { prefix: prefix.to_bytes(), data: transaction });
232 } else {
233 response.block_presences.push(BlockPresence {
234 r#type: BlockPresenceType::Have as i32,
235 cid: cid.to_bytes(),
236 });
237 }
238 },
239 None => {
240 trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash);
241
242 if entry.send_dont_have {
243 response.block_presences.push(BlockPresence {
244 r#type: BlockPresenceType::DontHave as i32,
245 cid: cid.to_bytes(),
246 });
247 }
248 },
249 }
250 }
251
252 Ok(response.encode_to_vec())
253 }
254}
255
256#[derive(Debug, thiserror::Error)]
258pub enum BitswapError {
259 #[error("Failed to decode request: {0}.")]
261 DecodeProto(#[from] prost::DecodeError),
262
263 #[error("Failed to encode response: {0}.")]
265 EncodeProto(#[from] prost::EncodeError),
266
267 #[error(transparent)]
269 Client(#[from] sp_blockchain::Error),
270
271 #[error(transparent)]
273 BadCid(#[from] cid::Error),
274
275 #[error(transparent)]
277 Read(#[from] io::Error),
278
279 #[error("Failed to send response.")]
281 SendResponse,
282
283 #[error("Invalid WANT list.")]
285 InvalidWantList,
286
287 #[error("Too many block entries in the request.")]
289 TooManyEntries,
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use futures::channel::oneshot;
296 use sc_block_builder::BlockBuilderBuilder;
297 use schema::bitswap::{
298 message::{wantlist::Entry, Wantlist},
299 Message as BitswapMessage,
300 };
301 use sp_consensus::BlockOrigin;
302 use sp_runtime::codec::Encode;
303 use substrate_test_runtime::ExtrinsicBuilder;
304 use substrate_test_runtime_client::{self, prelude::*, TestClientBuilder};
305
306 #[tokio::test]
307 async fn undecodable_message() {
308 let client = substrate_test_runtime_client::new();
309 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
310
311 tokio::spawn(async move { bitswap.run().await });
312
313 let (tx, rx) = oneshot::channel();
314 config
315 .inbound_queue
316 .unwrap()
317 .send(IncomingRequest {
318 peer: PeerId::random(),
319 payload: vec![0x13, 0x37, 0x13, 0x38],
320 pending_response: tx,
321 })
322 .await
323 .unwrap();
324
325 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
326 assert_eq!(result, Err(()));
327 assert_eq!(reputation_changes, Vec::new());
328 assert!(sent_feedback.is_none());
329 } else {
330 panic!("invalid event received");
331 }
332 }
333
334 #[tokio::test]
335 async fn empty_want_list() {
336 let client = substrate_test_runtime_client::new();
337 let (bitswap, mut config) = BitswapRequestHandler::new(Arc::new(client));
338
339 tokio::spawn(async move { bitswap.run().await });
340
341 let (tx, rx) = oneshot::channel();
342 config
343 .inbound_queue
344 .as_mut()
345 .unwrap()
346 .send(IncomingRequest {
347 peer: PeerId::random(),
348 payload: BitswapMessage { wantlist: None, ..Default::default() }.encode_to_vec(),
349 pending_response: tx,
350 })
351 .await
352 .unwrap();
353
354 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
355 assert_eq!(result, Err(()));
356 assert_eq!(reputation_changes, Vec::new());
357 assert!(sent_feedback.is_none());
358 } else {
359 panic!("invalid event received");
360 }
361
362 let (tx, rx) = oneshot::channel();
364 config
365 .inbound_queue
366 .unwrap()
367 .send(IncomingRequest {
368 peer: PeerId::random(),
369 payload: BitswapMessage {
370 wantlist: Some(Default::default()),
371 ..Default::default()
372 }
373 .encode_to_vec(),
374 pending_response: tx,
375 })
376 .await
377 .unwrap();
378
379 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
380 assert_eq!(result, Ok(BitswapMessage::default().encode_to_vec()));
381 assert_eq!(reputation_changes, Vec::new());
382 assert!(sent_feedback.is_none());
383 } else {
384 panic!("invalid event received");
385 }
386 }
387
388 #[tokio::test]
389 async fn too_long_want_list() {
390 let client = substrate_test_runtime_client::new();
391 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
392
393 tokio::spawn(async move { bitswap.run().await });
394
395 let (tx, rx) = oneshot::channel();
396 config
397 .inbound_queue
398 .unwrap()
399 .send(IncomingRequest {
400 peer: PeerId::random(),
401 payload: BitswapMessage {
402 wantlist: Some(Wantlist {
403 entries: (0..MAX_WANTED_BLOCKS + 1)
404 .map(|_| Entry::default())
405 .collect::<Vec<_>>(),
406 full: false,
407 }),
408 ..Default::default()
409 }
410 .encode_to_vec(),
411 pending_response: tx,
412 })
413 .await
414 .unwrap();
415
416 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
417 assert_eq!(result, Err(()));
418 assert_eq!(reputation_changes, Vec::new());
419 assert!(sent_feedback.is_none());
420 } else {
421 panic!("invalid event received");
422 }
423 }
424
425 #[tokio::test]
426 async fn transaction_not_found() {
427 let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
428
429 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
430 tokio::spawn(async move { bitswap.run().await });
431
432 let (tx, rx) = oneshot::channel();
433 config
434 .inbound_queue
435 .unwrap()
436 .send(IncomingRequest {
437 peer: PeerId::random(),
438 payload: BitswapMessage {
439 wantlist: Some(Wantlist {
440 entries: vec![Entry {
441 block: cid::Cid::new_v1(
442 0x70,
443 cid::multihash::Multihash::wrap(
444 u64::from(cid::multihash::Code::Blake2b256),
445 &[0u8; 32],
446 )
447 .unwrap(),
448 )
449 .to_bytes(),
450 ..Default::default()
451 }],
452 full: false,
453 }),
454 ..Default::default()
455 }
456 .encode_to_vec(),
457 pending_response: tx,
458 })
459 .await
460 .unwrap();
461
462 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
463 assert_eq!(result, Ok(vec![]));
464 assert_eq!(reputation_changes, Vec::new());
465 assert!(sent_feedback.is_none());
466 } else {
467 panic!("invalid event received");
468 }
469 }
470
471 #[tokio::test]
472 async fn transaction_found() {
473 let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
474 let mut block_builder = BlockBuilderBuilder::new(&client)
475 .on_parent_block(client.chain_info().genesis_hash)
476 .with_parent_block_number(0)
477 .build()
478 .unwrap();
479
480 let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
482 let pattern_index = ext.encoded_size() - 4;
483
484 block_builder.push(ext.clone()).unwrap();
485 let block = block_builder.build().unwrap().block;
486
487 client.import(BlockOrigin::File, block).await.unwrap();
488
489 let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
490
491 tokio::spawn(async move { bitswap.run().await });
492
493 let (tx, rx) = oneshot::channel();
494 config
495 .inbound_queue
496 .unwrap()
497 .send(IncomingRequest {
498 peer: PeerId::random(),
499 payload: BitswapMessage {
500 wantlist: Some(Wantlist {
501 entries: vec![Entry {
502 block: cid::Cid::new_v1(
503 0x70,
504 cid::multihash::Multihash::wrap(
505 u64::from(cid::multihash::Code::Blake2b256),
506 &sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
507 )
508 .unwrap(),
509 )
510 .to_bytes(),
511 ..Default::default()
512 }],
513 full: false,
514 }),
515 ..Default::default()
516 }
517 .encode_to_vec(),
518 pending_response: tx,
519 })
520 .await
521 .unwrap();
522
523 if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
524 assert_eq!(reputation_changes, Vec::new());
525 assert!(sent_feedback.is_none());
526
527 let response =
528 schema::bitswap::Message::decode(&result.expect("fetch to succeed")[..]).unwrap();
529 assert_eq!(response.payload[0].data, vec![0x13, 0x37, 0x13, 0x38]);
530 } else {
531 panic!("invalid event received");
532 }
533 }
534}