1use crate::{
24 error::{Error, ImmediateDialError},
25 protocol::{Direction, TransportEvent, TransportService},
26 substream::Substream,
27 types::{
28 multihash::{Code, MultihashDigest},
29 SubstreamId,
30 },
31 PeerId,
32};
33
34use bytes::Bytes;
35use cid::{Cid, Version};
36use prost::Message;
37use tokio::sync::mpsc::{Receiver, Sender};
38use tokio_stream::{StreamExt, StreamMap};
39
40pub use config::Config;
41pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType};
42pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
43use std::{
44 collections::{hash_map::Entry, vec_deque::Drain, HashMap, HashSet, VecDeque},
45 time::Duration,
46};
47
48mod config;
49mod handle;
50
51mod schema {
52 pub(super) mod bitswap {
53 include!(concat!(env!("OUT_DIR"), "/bitswap.rs"));
54 }
55}
56
57const LOG_TARGET: &str = "litep2p::ipfs::bitswap";
59
60const WRITE_TIMEOUT: Duration = Duration::from_secs(15);
62
63#[derive(Debug)]
65struct Prefix {
66 version: Version,
68
69 codec: u64,
71
72 multihash_type: u64,
74
75 multihash_len: u8,
77}
78
79impl Prefix {
80 pub fn to_bytes(&self) -> Vec<u8> {
82 let mut res = Vec::with_capacity(4 * 10);
83
84 let mut buf = unsigned_varint::encode::u64_buffer();
85 let version = unsigned_varint::encode::u64(self.version.into(), &mut buf);
86 res.extend_from_slice(version);
87
88 let mut buf = unsigned_varint::encode::u64_buffer();
89 let codec = unsigned_varint::encode::u64(self.codec, &mut buf);
90 res.extend_from_slice(codec);
91
92 let mut buf = unsigned_varint::encode::u64_buffer();
93 let multihash_type = unsigned_varint::encode::u64(self.multihash_type, &mut buf);
94 res.extend_from_slice(multihash_type);
95
96 let mut buf = unsigned_varint::encode::u64_buffer();
97 let multihash_len = unsigned_varint::encode::u64(self.multihash_len as u64, &mut buf);
98 res.extend_from_slice(multihash_len);
99 res
100 }
101
102 pub fn from_bytes(prefix_bytes: &[u8]) -> Option<Prefix> {
104 let (version, rest) = unsigned_varint::decode::u64(prefix_bytes).ok()?;
105 let (codec, rest) = unsigned_varint::decode::u64(rest).ok()?;
106 let (multihash_type, rest) = unsigned_varint::decode::u64(rest).ok()?;
107 let (multihash_len, rest) = unsigned_varint::decode::u64(rest).ok()?;
108 if !rest.is_empty() {
109 return None;
110 }
111
112 let version = Version::try_from(version).ok()?;
113 let multihash_len = u8::try_from(multihash_len).ok()?;
114
115 Some(Prefix {
116 version,
117 codec,
118 multihash_type,
119 multihash_len,
120 })
121 }
122}
123
124#[derive(Debug)]
126enum SubstreamAction {
127 SendRequest(Vec<(Cid, WantType)>),
129 SendResponse(Vec<ResponseType>),
131}
132
133pub(crate) struct Bitswap {
135 service: TransportService,
137
138 event_tx: Sender<BitswapEvent>,
140
141 cmd_rx: Receiver<BitswapCommand>,
143
144 pending_outbound: HashMap<PeerId, Vec<SubstreamAction>>,
146
147 inbound: StreamMap<PeerId, Substream>,
149
150 outbound: HashMap<PeerId, Substream>,
152
153 pending_dials: HashSet<PeerId>,
155}
156
157impl Bitswap {
158 pub(crate) fn new(service: TransportService, config: Config) -> Self {
160 Self {
161 service,
162 cmd_rx: config.cmd_rx,
163 event_tx: config.event_tx,
164 pending_outbound: HashMap::new(),
165 inbound: StreamMap::new(),
166 outbound: HashMap::new(),
167 pending_dials: HashSet::new(),
168 }
169 }
170
171 fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
173 tracing::debug!(target: LOG_TARGET, ?peer, "handle inbound substream");
174
175 if self.inbound.insert(peer, substream).is_some() {
176 tracing::debug!(
178 target: LOG_TARGET,
179 ?peer,
180 "dropping inbound substream as remote opened a new one",
181 );
182 }
183 }
184
185 async fn on_message_received(
187 &mut self,
188 peer: PeerId,
189 message: bytes::BytesMut,
190 ) -> Result<(), Error> {
191 tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound message");
192
193 let message = schema::bitswap::Message::decode(message)?;
194
195 if let Some(wantlist) = &message.wantlist {
197 if !wantlist.entries.is_empty() {
198 let cids = wantlist
199 .entries
200 .iter()
201 .filter_map(|entry| {
202 let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
203
204 let want_type = match entry.want_type {
205 0 => WantType::Block,
206 1 => WantType::Have,
207 _ => return None,
208 };
209
210 Some((cid, want_type))
211 })
212 .collect::<Vec<_>>();
213
214 if !cids.is_empty() {
215 let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
216 }
217 }
218 }
219
220 if !message.payload.is_empty() || !message.block_presences.is_empty() {
222 let mut responses = Vec::new();
223
224 for block in message.payload {
226 let Some(Prefix {
227 version,
228 codec,
229 multihash_type,
230 multihash_len: _,
231 }) = Prefix::from_bytes(&block.prefix)
232 else {
233 tracing::trace!(target: LOG_TARGET, ?peer, "invalid CID prefix received");
234 continue;
235 };
236
237 let Ok(code) = Code::try_from(multihash_type) else {
239 tracing::trace!(
240 target: LOG_TARGET,
241 ?peer,
242 multihash_type,
243 "usupported multihash type",
244 );
245 continue;
246 };
247
248 let multihash = code.digest(&block.data);
249
250 let Ok(multihash) =
252 cid::multihash::Multihash::wrap(multihash.code(), multihash.digest())
253 else {
254 tracing::trace!(
255 target: LOG_TARGET,
256 ?peer,
257 multihash_type,
258 "multihash size > 64 unsupported",
259 );
260 continue;
261 };
262
263 match Cid::new(version, codec, multihash) {
264 Ok(cid) => responses.push(ResponseType::Block {
265 cid,
266 block: block.data,
267 }),
268 Err(error) => tracing::trace!(
269 target: LOG_TARGET,
270 ?peer,
271 ?error,
272 "invalid CID received",
273 ),
274 }
275 }
276
277 for presence in message.block_presences {
279 if let Ok(cid) = Cid::read_bytes(&presence.cid[..]) {
280 let presence_type = match presence.r#type {
281 0 => BlockPresenceType::Have,
282 1 => BlockPresenceType::DontHave,
283 _ => continue,
284 };
285
286 responses.push(ResponseType::Presence {
287 cid,
288 presence: presence_type,
289 });
290 }
291 }
292
293 if !responses.is_empty() {
294 let _ = self.event_tx.send(BitswapEvent::Response { peer, responses }).await;
295 }
296 }
297
298 Ok(())
299 }
300
301 async fn on_outbound_substream(
303 &mut self,
304 peer: PeerId,
305 substream_id: SubstreamId,
306 mut substream: Substream,
307 ) {
308 let Some(actions) = self.pending_outbound.remove(&peer) else {
309 tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist");
310 return;
311 };
312
313 tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");
314
315 for action in actions {
316 match action {
317 SubstreamAction::SendRequest(cids) => {
318 if let Err(error) = send_request(&mut substream, cids).await {
319 tracing::debug!(target: LOG_TARGET, ?peer, ?error, "bitswap request failed");
321 return;
322 }
323 }
324 SubstreamAction::SendResponse(entries) => {
325 if let Err(error) = send_response(&mut substream, entries).await {
326 tracing::debug!(target: LOG_TARGET, ?peer, ?error, "bitswap response failed");
328 return;
329 }
330 }
331 }
332 }
333
334 self.outbound.insert(peer, substream);
335 }
336
337 fn on_connection_established(&mut self, peer: PeerId) {
339 if self.pending_dials.remove(&peer) {
341 tracing::trace!(
342 target: LOG_TARGET,
343 ?peer,
344 "open substream after connection established",
345 );
346
347 if let Err(error) = self.service.open_substream(peer) {
348 tracing::debug!(
349 target: LOG_TARGET,
350 ?peer,
351 ?error,
352 "failed to open substream after connection established",
353 );
354 self.pending_outbound.remove(&peer);
357 }
358 }
359 }
360
361 fn open_substream_or_dial(&mut self, peer: PeerId) {
363 tracing::trace!(target: LOG_TARGET, ?peer, "open substream");
364
365 if let Err(error) = self.service.open_substream(peer) {
366 tracing::trace!(
367 target: LOG_TARGET,
368 ?peer,
369 ?error,
370 "failed to open substream, dialing peer",
371 );
372
373 match self.service.dial(&peer) {
375 Ok(()) => {
376 self.pending_dials.insert(peer);
378 }
379 Err(ImmediateDialError::AlreadyConnected) => {
380 if let Err(error) = self.service.open_substream(peer) {
382 tracing::trace!(
383 target: LOG_TARGET,
384 ?peer,
385 ?error,
386 "failed to open substream for a second time",
387 );
388 }
389 }
390 Err(error) => {
391 tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer");
392 }
393 }
394 }
395 }
396
397 async fn on_bitswap_request(&mut self, peer: PeerId, cids: Vec<(Cid, WantType)>) {
399 if let Entry::Occupied(mut entry) = self.outbound.entry(peer) {
401 if send_request(entry.get_mut(), cids.clone()).await.is_ok() {
402 return;
403 } else {
404 tracing::debug!(
405 target: LOG_TARGET,
406 ?peer,
407 "failed to send request over existing substream",
408 );
409 entry.remove();
410 }
411 }
412
413 let pending_actions = self.pending_outbound.entry(peer).or_default();
415 let no_substream_pending = pending_actions.is_empty();
419
420 pending_actions.push(SubstreamAction::SendRequest(cids));
421
422 if no_substream_pending {
423 self.open_substream_or_dial(peer);
424 }
425 }
426
427 async fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
429 if let Entry::Occupied(mut entry) = self.outbound.entry(peer) {
431 if send_response(entry.get_mut(), responses.clone()).await.is_ok() {
432 return;
433 } else {
434 tracing::debug!(
435 target: LOG_TARGET,
436 ?peer,
437 "failed to send response over existing substream",
438 );
439 entry.remove();
440 }
441 }
442
443 let pending_actions = self.pending_outbound.entry(peer).or_default();
445 let no_pending_substream = pending_actions.is_empty();
446 pending_actions.push(SubstreamAction::SendResponse(responses));
447
448 if no_pending_substream {
449 self.open_substream_or_dial(peer);
450 }
451 }
452
453 pub async fn run(mut self) {
455 tracing::debug!(target: LOG_TARGET, "starting bitswap event loop");
456
457 loop {
458 tokio::select! {
459 event = self.service.next() => match event {
460 Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
461 self.on_connection_established(peer);
462 }
463 Some(TransportEvent::SubstreamOpened {
464 peer,
465 substream,
466 direction,
467 ..
468 }) => match direction {
469 Direction::Inbound => self.on_inbound_substream(peer, substream),
470 Direction::Outbound(substream_id) =>
471 self.on_outbound_substream(peer, substream_id, substream).await,
472 },
473 None => return,
474 event => tracing::trace!(target: LOG_TARGET, ?event, "unhandled event"),
475 },
476 command = self.cmd_rx.recv() => match command {
477 Some(BitswapCommand::SendRequest { peer, cids }) => {
478 self.on_bitswap_request(peer, cids).await;
479 }
480 Some(BitswapCommand::SendResponse { peer, responses }) => {
481 self.on_bitswap_response(peer, responses).await;
482 }
483 None => return,
484 },
485 Some((peer, message)) = self.inbound.next(), if !self.inbound.is_empty() => {
486 match message {
487 Ok(message) => if let Err(e) = self.on_message_received(peer, message).await {
488 tracing::trace!(
489 target: LOG_TARGET,
490 ?peer,
491 ?e,
492 "error handling inbound message, dropping substream",
493 );
494 self.inbound.remove(&peer);
495 },
496 Err(e) => {
497 tracing::trace!(
498 target: LOG_TARGET,
499 ?peer,
500 ?e,
501 "inbound substream closed",
502 );
503 self.inbound.remove(&peer);
504 },
505 }
506 }
507 }
508 }
509 }
510}
511
512async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) -> Result<(), Error> {
513 let request = schema::bitswap::Message {
514 wantlist: Some(schema::bitswap::Wantlist {
515 entries: cids
516 .into_iter()
517 .map(|(cid, want_type)| schema::bitswap::wantlist::Entry {
518 block: cid.to_bytes(),
519 priority: 1,
520 cancel: false,
521 want_type: want_type as i32,
522 send_dont_have: false,
523 })
524 .collect(),
525 full: false,
526 }),
527 ..Default::default()
528 };
529
530 let message = request.encode_to_vec().into();
531 match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
532 Err(_) => Err(Error::Timeout),
533 Ok(Err(e)) => Err(Error::SubstreamError(e)),
534 Ok(Ok(())) => Ok(()),
535 }
536}
537
538async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) -> Result<(), Error> {
539 if let Some((message, cid_count)) =
541 presences_message(entries.iter().filter_map(|entry| match entry {
542 ResponseType::Presence { cid, presence } => Some((*cid, *presence)),
543 ResponseType::Block { .. } => None,
544 }))
545 {
546 if message.len() <= config::MAX_MESSAGE_SIZE {
547 tracing::trace!(
548 target: LOG_TARGET,
549 cid_count,
550 "sending Bitswap presence message",
551 );
552 match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
553 Err(_) => return Err(Error::Timeout),
554 Ok(Err(e)) => return Err(Error::SubstreamError(e)),
555 Ok(Ok(())) => {}
556 }
557 } else {
558 tracing::warn!(
561 target: LOG_TARGET,
562 size = message.len(),
563 max_size = config::MAX_MESSAGE_SIZE,
564 "outgoing Bitswap presence message exceeded max size",
565 );
566 }
567 }
568
569 let mut blocks = entries
571 .into_iter()
572 .filter_map(|entry| match entry {
573 ResponseType::Block { cid, block } => Some((cid, block)),
574 ResponseType::Presence { .. } => None,
575 })
576 .collect::<VecDeque<_>>();
577
578 while let Some(batch) = extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) {
579 if let Some((message, block_count)) = blocks_message(batch) {
580 if message.len() <= config::MAX_MESSAGE_SIZE {
581 tracing::trace!(
582 target: LOG_TARGET,
583 block_count,
584 "sending Bitswap blocks message",
585 );
586 match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
587 Err(_) => return Err(Error::Timeout),
588 Ok(Err(e)) => return Err(Error::SubstreamError(e)),
589 Ok(Ok(())) => {}
590 }
591 } else {
592 tracing::warn!(
595 target: LOG_TARGET,
596 size = message.len(),
597 max_size = config::MAX_MESSAGE_SIZE,
598 "outgoing Bitswap blocks message exceeded max size",
599 );
600 }
601 }
602 }
603
604 Ok(())
605}
606
607fn presences_message(
608 presences: impl IntoIterator<Item = (Cid, BlockPresenceType)>,
609) -> Option<(Bytes, usize)> {
610 let message = schema::bitswap::Message {
611 wantlist: Some(Default::default()),
613 block_presences: presences
614 .into_iter()
615 .map(|(cid, presence)| schema::bitswap::BlockPresence {
616 cid: cid.to_bytes(),
617 r#type: presence as i32,
618 })
619 .collect(),
620 ..Default::default()
621 };
622
623 let count = message.block_presences.len();
624
625 (count > 0).then(|| (message.encode_to_vec().into(), count))
626}
627
628fn blocks_message(blocks: impl IntoIterator<Item = (Cid, Vec<u8>)>) -> Option<(Bytes, usize)> {
629 let message = schema::bitswap::Message {
630 wantlist: Some(Default::default()),
632 payload: blocks
633 .into_iter()
634 .map(|(cid, block)| {
635 let prefix = Prefix {
636 version: cid.version(),
637 codec: cid.codec(),
638 multihash_type: cid.hash().code(),
639 multihash_len: cid.hash().size(),
640 }
641 .to_bytes();
642
643 schema::bitswap::Block {
644 prefix,
645 data: block,
646 }
647 })
648 .collect(),
649 ..Default::default()
650 };
651
652 let count = message.payload.len();
653
654 (count > 0).then(|| (message.encode_to_vec().into(), count))
655}
656
657fn extract_next_batch<'a>(
660 blocks: &'a mut VecDeque<(Cid, Vec<u8>)>,
661 max_batch_size: usize,
662) -> Option<Drain<'a, (Cid, Vec<u8>)>> {
663 loop {
665 let block = blocks.front()?;
666 if block.1.len() > max_batch_size {
667 tracing::warn!(
668 target: LOG_TARGET,
669 cid = block.0.to_string(),
670 size = block.1.len(),
671 max_batch_size,
672 "outgoing Bitswap block exceeded max batch size",
673 );
674 blocks.pop_front();
675 } else {
676 break;
677 }
678 }
679
680 let mut total_size = 0;
683 let mut block_count = 0;
684
685 for b in blocks.iter() {
686 let next_block_size = b.1.len();
687 if total_size + next_block_size > max_batch_size {
688 break;
689 }
690 total_size += next_block_size;
691 block_count += 1;
692 }
693
694 Some(blocks.drain(..block_count))
695}
696
697#[cfg(test)]
698mod tests {
699 use cid::multihash::Multihash;
700
701 use super::*;
702
703 fn cid(block: &[u8]) -> Cid {
704 let codec = 0x55;
705 let multihash = Code::Sha2_256.digest(block);
706 let multihash =
707 Multihash::wrap(multihash.code(), multihash.digest()).expect("to be valid multihash");
708
709 Cid::new_v1(codec, multihash)
710 }
711
712 #[test]
713 fn extract_next_batch_fits_max_size() {
714 let max_size = 100;
715
716 let block1 = vec![0x01; 10];
717 let block2 = vec![0x02; 10];
718 let block3 = vec![0x03; 10];
719
720 let blocks = vec![
721 (cid(&block1), block1),
722 (cid(&block2), block2),
723 (cid(&block3), block3),
724 ];
725 let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
726
727 let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
728 assert_eq!(batch.collect::<Vec<_>>(), blocks);
729
730 assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
731 }
732
733 #[test]
734 fn extract_next_batch_chunking_exact() {
735 let max_size = 20;
736
737 let block1 = vec![0x01; 10];
738 let block2 = vec![0x02; 10];
739 let block3 = vec![0x03; 10];
740
741 let blocks = [
742 (cid(&block1), block1.clone()),
743 (cid(&block2), block2.clone()),
744 (cid(&block3), block3.clone()),
745 ];
746 let chunk1 = vec![
747 (cid(&block1), block1.clone()),
748 (cid(&block2), block2.clone()),
749 ];
750 let chunk2 = vec![(cid(&block3), block3.clone())];
751 let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
752
753 let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
754 assert_eq!(batch.collect::<Vec<_>>(), chunk1);
755
756 let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
757 assert_eq!(batch.collect::<Vec<_>>(), chunk2);
758
759 assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
760 }
761
762 #[test]
763 fn extract_next_batch_chunking_less_than() {
764 let max_size = 20;
765
766 let block1 = vec![0x01; 10];
767 let block2 = vec![0x02; 9];
768 let block3 = vec![0x03; 10];
769
770 let blocks = [
771 (cid(&block1), block1.clone()),
772 (cid(&block2), block2.clone()),
773 (cid(&block3), block3.clone()),
774 ];
775 let chunk1 = vec![
776 (cid(&block1), block1.clone()),
777 (cid(&block2), block2.clone()),
778 ];
779 let chunk2 = vec![(cid(&block3), block3.clone())];
780 let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
781
782 let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
783 assert_eq!(batch.collect::<Vec<_>>(), chunk1);
784
785 let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
786 assert_eq!(batch.collect::<Vec<_>>(), chunk2);
787
788 assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
789 }
790
791 #[test]
792 fn extract_next_batch_oversized_blocks_discarded() {
793 let max_size = 20;
794
795 let block1 = vec![0x01; 10];
796 let block2 = vec![0x02; 101];
797 let block3 = vec![0x03; 10];
798
799 let blocks = [
800 (cid(&block1), block1.clone()),
801 (cid(&block2), block2.clone()),
802 (cid(&block3), block3.clone()),
803 ];
804 let chunk1 = vec![(cid(&block1), block1.clone())];
805 let chunk2 = vec![(cid(&block3), block3.clone())];
806 let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
807
808 let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
809 assert_eq!(batch.collect::<Vec<_>>(), chunk1);
810
811 let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
812 assert_eq!(batch.collect::<Vec<_>>(), chunk2);
813
814 assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
815 }
816}