litep2p/protocol/libp2p/bitswap/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/bitswap/1.2.0`](https://github.com/ipfs/specs/blob/main/BITSWAP.md) implementation.
22
23use crate::{
24    error::Error,
25    protocol::{Direction, TransportEvent, TransportService},
26    substream::Substream,
27    types::SubstreamId,
28    PeerId,
29};
30
31use cid::{Cid, Version};
32use prost::Message;
33use tokio::sync::mpsc::{Receiver, Sender};
34use tokio_stream::{StreamExt, StreamMap};
35
36pub use config::Config;
37pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType};
38pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
39use std::{collections::HashMap, time::Duration};
40
41mod config;
42mod handle;
43
44mod schema {
45    pub(super) mod bitswap {
46        include!(concat!(env!("OUT_DIR"), "/bitswap.rs"));
47    }
48}
49
50/// Log target for the file.
51const LOG_TARGET: &str = "litep2p::ipfs::bitswap";
52
53/// Write timeout for outbound messages.
54const WRITE_TIMEOUT: Duration = Duration::from_secs(15);
55
56/// Bitswap metadata.
57#[derive(Debug)]
58struct Prefix {
59    /// CID version.
60    version: Version,
61
62    /// CID codec.
63    codec: u64,
64
65    /// CID multihash type.
66    multihash_type: u64,
67
68    /// CID multihash length.
69    multihash_len: u8,
70}
71
72impl Prefix {
73    /// Convert the prefix to encoded bytes.
74    pub fn to_bytes(&self) -> Vec<u8> {
75        let mut res = Vec::with_capacity(4 * 10);
76
77        let mut buf = unsigned_varint::encode::u64_buffer();
78        let version = unsigned_varint::encode::u64(self.version.into(), &mut buf);
79        res.extend_from_slice(version);
80
81        let mut buf = unsigned_varint::encode::u64_buffer();
82        let codec = unsigned_varint::encode::u64(self.codec, &mut buf);
83        res.extend_from_slice(codec);
84
85        let mut buf = unsigned_varint::encode::u64_buffer();
86        let multihash_type = unsigned_varint::encode::u64(self.multihash_type, &mut buf);
87        res.extend_from_slice(multihash_type);
88
89        let mut buf = unsigned_varint::encode::u64_buffer();
90        let multihash_len = unsigned_varint::encode::u64(self.multihash_len as u64, &mut buf);
91        res.extend_from_slice(multihash_len);
92        res
93    }
94}
95
96/// Bitswap protocol.
97pub(crate) struct Bitswap {
98    // Connection service.
99    service: TransportService,
100
101    /// TX channel for sending events to the user protocol.
102    event_tx: Sender<BitswapEvent>,
103
104    /// RX channel for receiving commands from `BitswapHandle`.
105    cmd_rx: Receiver<BitswapCommand>,
106
107    /// Pending outbound substreams.
108    pending_outbound: HashMap<SubstreamId, Vec<ResponseType>>,
109
110    /// Inbound substreams.
111    inbound: StreamMap<PeerId, Substream>,
112}
113
114impl Bitswap {
115    /// Create new [`Bitswap`] protocol.
116    pub(crate) fn new(service: TransportService, config: Config) -> Self {
117        Self {
118            service,
119            cmd_rx: config.cmd_rx,
120            event_tx: config.event_tx,
121            pending_outbound: HashMap::new(),
122            inbound: StreamMap::new(),
123        }
124    }
125
126    /// Substream opened to remote peer.
127    fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
128        tracing::debug!(target: LOG_TARGET, ?peer, "handle inbound substream");
129
130        if self.inbound.insert(peer, substream).is_some() {
131            // Only one inbound substream per peer is allowed in order to constrain resources.
132            tracing::debug!(
133                target: LOG_TARGET,
134                ?peer,
135                "dropping inbound substream as remote opened a new one",
136            );
137        }
138    }
139
140    /// Message received from remote peer.
141    async fn on_message_received(
142        &mut self,
143        peer: PeerId,
144        message: bytes::BytesMut,
145    ) -> Result<(), Error> {
146        tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound message");
147
148        let message = schema::bitswap::Message::decode(message)?;
149
150        let Some(wantlist) = message.wantlist else {
151            tracing::debug!(target: LOG_TARGET, "bitswap message doesn't contain `WantList`");
152            return Err(Error::InvalidData);
153        };
154
155        let cids = wantlist
156            .entries
157            .into_iter()
158            .filter_map(|entry| {
159                let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
160
161                let want_type = match entry.want_type {
162                    0 => WantType::Block,
163                    1 => WantType::Have,
164                    _ => return None,
165                };
166
167                Some((cid, want_type))
168            })
169            .collect::<Vec<_>>();
170
171        let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
172
173        Ok(())
174    }
175
176    /// Send response to bitswap request.
177    async fn on_outbound_substream(
178        &mut self,
179        peer: PeerId,
180        substream_id: SubstreamId,
181        mut substream: Substream,
182    ) {
183        let Some(entries) = self.pending_outbound.remove(&substream_id) else {
184            tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist");
185            return;
186        };
187
188        let mut response = schema::bitswap::Message {
189            // `wantlist` field must always be present. This is what the official Kubo IPFS
190            // implementation does.
191            wantlist: Some(Default::default()),
192            ..Default::default()
193        };
194
195        for entry in entries {
196            match entry {
197                ResponseType::Block { cid, block } => {
198                    let prefix = Prefix {
199                        version: cid.version(),
200                        codec: cid.codec(),
201                        multihash_type: cid.hash().code(),
202                        multihash_len: cid.hash().size(),
203                    }
204                    .to_bytes();
205
206                    response.payload.push(schema::bitswap::Block {
207                        prefix,
208                        data: block,
209                    });
210                }
211                ResponseType::Presence { cid, presence } => {
212                    response.block_presences.push(schema::bitswap::BlockPresence {
213                        cid: cid.to_bytes(),
214                        r#type: presence as i32,
215                    });
216                }
217            }
218        }
219
220        let message = response.encode_to_vec().into();
221        let _ = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await;
222
223        substream.close().await;
224    }
225
226    /// Handle bitswap response.
227    fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
228        match self.service.open_substream(peer) {
229            Err(error) => {
230                tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream to peer")
231            }
232            Ok(substream_id) => {
233                self.pending_outbound.insert(substream_id, responses);
234            }
235        }
236    }
237
238    /// Start [`Bitswap`] event loop.
239    pub async fn run(mut self) {
240        tracing::debug!(target: LOG_TARGET, "starting bitswap event loop");
241
242        loop {
243            tokio::select! {
244                event = self.service.next() => match event {
245                    Some(TransportEvent::SubstreamOpened {
246                        peer,
247                        substream,
248                        direction,
249                        ..
250                    }) => match direction {
251                        Direction::Inbound => self.on_inbound_substream(peer, substream),
252                        Direction::Outbound(substream_id) =>
253                            self.on_outbound_substream(peer, substream_id, substream).await,
254                    },
255                    None => return,
256                    event => tracing::trace!(target: LOG_TARGET, ?event, "unhandled event"),
257                },
258                command = self.cmd_rx.recv() => match command {
259                    Some(BitswapCommand::SendResponse { peer, responses }) => {
260                        self.on_bitswap_response(peer, responses);
261                    }
262                    None => return,
263                },
264                Some((peer, message)) = self.inbound.next(), if !self.inbound.is_empty() => {
265                    match message {
266                        Ok(message) => if let Err(e) = self.on_message_received(peer, message).await {
267                            tracing::trace!(
268                                target: LOG_TARGET,
269                                ?peer,
270                                ?e,
271                                "error handling inbound message, dropping substream",
272                            );
273                            self.inbound.remove(&peer);
274                        },
275                        Err(e) => {
276                            tracing::trace!(
277                                target: LOG_TARGET,
278                                ?peer,
279                                ?e,
280                                "inbound substream closed",
281                            );
282                            self.inbound.remove(&peer);
283                        },
284                    }
285                }
286            }
287        }
288    }
289}