litep2p/protocol/libp2p/bitswap/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/bitswap/1.2.0`](https://github.com/ipfs/specs/blob/main/BITSWAP.md) implementation.
22
23use crate::{
24    error::Error,
25    protocol::{
26        libp2p::bitswap::handle::BitswapCommand, Direction, TransportEvent, TransportService,
27    },
28    substream::Substream,
29    types::SubstreamId,
30    PeerId,
31};
32
33use cid::{multihash::Code, Version};
34use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
35use prost::Message;
36use tokio::sync::mpsc::{Receiver, Sender};
37
38use std::collections::HashMap;
39
40pub use cid::Cid;
41pub use config::Config;
42pub use handle::{BitswapEvent, BitswapHandle, ResponseType};
43pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
44
45mod config;
46mod handle;
47
48mod schema {
49    pub(super) mod bitswap {
50        include!(concat!(env!("OUT_DIR"), "/bitswap.rs"));
51    }
52}
53
54/// Log target for the file.
55const LOG_TARGET: &str = "litep2p::ipfs::bitswap";
56
57/// Bitswap metadata.
58#[derive(Debug)]
59struct Prefix {
60    /// CID version.
61    version: Version,
62
63    /// CID codec.
64    codec: u64,
65
66    /// CID multihash type.
67    multihash_type: u64,
68
69    /// CID multihash length.
70    multihash_len: u8,
71}
72
73impl Prefix {
74    /// Convert the prefix to encoded bytes.
75    pub fn to_bytes(&self) -> Vec<u8> {
76        let mut res = Vec::with_capacity(4 * 10);
77
78        let mut buf = unsigned_varint::encode::u64_buffer();
79        let version = unsigned_varint::encode::u64(self.version.into(), &mut buf);
80        res.extend_from_slice(version);
81
82        let mut buf = unsigned_varint::encode::u64_buffer();
83        let codec = unsigned_varint::encode::u64(self.codec, &mut buf);
84        res.extend_from_slice(codec);
85
86        let mut buf = unsigned_varint::encode::u64_buffer();
87        let multihash_type = unsigned_varint::encode::u64(self.multihash_type, &mut buf);
88        res.extend_from_slice(multihash_type);
89
90        let mut buf = unsigned_varint::encode::u64_buffer();
91        let multihash_len = unsigned_varint::encode::u64(self.multihash_len as u64, &mut buf);
92        res.extend_from_slice(multihash_len);
93        res
94    }
95}
96
97/// Bitswap protocol.
98pub(crate) struct Bitswap {
99    // Connection service.
100    service: TransportService,
101
102    /// TX channel for sending events to the user protocol.
103    event_tx: Sender<BitswapEvent>,
104
105    /// RX channel for receiving commands from `BitswapHandle`.
106    cmd_rx: Receiver<BitswapCommand>,
107
108    /// Pending outbound substreams.
109    pending_outbound: HashMap<SubstreamId, Vec<ResponseType>>,
110
111    /// Pending inbound substreams.
112    pending_inbound:
113        FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Vec<(Cid, WantType)>)>>>,
114}
115
116impl Bitswap {
117    /// Create new [`Bitswap`] protocol.
118    pub(crate) fn new(service: TransportService, config: Config) -> Self {
119        Self {
120            service,
121            cmd_rx: config.cmd_rx,
122            event_tx: config.event_tx,
123            pending_outbound: HashMap::new(),
124            pending_inbound: FuturesUnordered::new(),
125        }
126    }
127
128    /// Substream opened to remote peer.
129    fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
130        tracing::debug!(target: LOG_TARGET, ?peer, "handle inbound substream");
131
132        self.pending_inbound.push(Box::pin(async move {
133            let message = substream.next().await.ok_or(Error::ConnectionClosed)??;
134            let message = schema::bitswap::Message::decode(message)?;
135
136            let Some(wantlist) = message.wantlist else {
137                tracing::debug!(target: LOG_TARGET, "bitswap message doesn't contain `WantList`");
138                return Err(Error::InvalidData);
139            };
140
141            Ok((
142                peer,
143                wantlist
144                    .entries
145                    .into_iter()
146                    .filter_map(|entry| {
147                        let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
148
149                        let want_type = match entry.want_type {
150                            0 => WantType::Block,
151                            1 => WantType::Have,
152                            _ => return None,
153                        };
154
155                        (cid.version() == cid::Version::V1
156                            && cid.hash().code() == u64::from(Code::Blake2b256)
157                            && cid.hash().size() == 32)
158                            .then_some((cid, want_type))
159                    })
160                    .collect::<Vec<_>>(),
161            ))
162        }));
163    }
164
165    /// Send response to bitswap request.
166    async fn on_outbound_substream(
167        &mut self,
168        peer: PeerId,
169        substream_id: SubstreamId,
170        mut substream: Substream,
171    ) {
172        let Some(entries) = self.pending_outbound.remove(&substream_id) else {
173            tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist");
174            return;
175        };
176
177        let mut response = schema::bitswap::Message::default();
178
179        for entry in entries {
180            match entry {
181                ResponseType::Block { cid, block } => {
182                    let prefix = Prefix {
183                        version: cid.version(),
184                        codec: cid.codec(),
185                        multihash_type: cid.hash().code(),
186                        multihash_len: cid.hash().size(),
187                    }
188                    .to_bytes();
189
190                    response.payload.push(schema::bitswap::Block {
191                        prefix,
192                        data: block,
193                    });
194                }
195                ResponseType::Presence { cid, presence } => {
196                    response.block_presences.push(schema::bitswap::BlockPresence {
197                        cid: cid.to_bytes(),
198                        r#type: presence as i32,
199                    });
200                }
201            }
202        }
203
204        let _ = substream.send_framed(response.encode_to_vec().into()).await;
205    }
206
207    /// Handle bitswap response.
208    fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
209        match self.service.open_substream(peer) {
210            Err(error) => {
211                tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream to peer")
212            }
213            Ok(substream_id) => {
214                self.pending_outbound.insert(substream_id, responses);
215            }
216        }
217    }
218
219    /// Start [`Bitswap`] event loop.
220    pub async fn run(mut self) {
221        tracing::debug!(target: LOG_TARGET, "starting bitswap event loop");
222
223        loop {
224            tokio::select! {
225                event = self.service.next() => match event {
226                    Some(TransportEvent::SubstreamOpened {
227                        peer,
228                        substream,
229                        direction,
230                        ..
231                    }) => match direction {
232                        Direction::Inbound => self.on_inbound_substream(peer, substream),
233                        Direction::Outbound(substream_id) =>
234                            self.on_outbound_substream(peer, substream_id, substream).await,
235                    },
236                    None => return,
237                    event => tracing::trace!(target: LOG_TARGET, ?event, "unhandled event"),
238                },
239                command = self.cmd_rx.recv() => match command {
240                    Some(BitswapCommand::SendResponse { peer, responses }) => {
241                        self.on_bitswap_response(peer, responses);
242                    }
243                    None => return,
244                },
245                event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {
246                    if let Some(Ok((peer, cids))) = event {
247                        let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
248                    }
249                }
250            }
251        }
252    }
253}