litep2p/protocol/libp2p/bitswap/
mod.rs1use crate::{
24 error::Error,
25 protocol::{Direction, TransportEvent, TransportService},
26 substream::Substream,
27 types::SubstreamId,
28 PeerId,
29};
30
31use cid::{Cid, Version};
32use prost::Message;
33use tokio::sync::mpsc::{Receiver, Sender};
34use tokio_stream::{StreamExt, StreamMap};
35
36pub use config::Config;
37pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType};
38pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
39use std::{collections::HashMap, time::Duration};
40
41mod config;
42mod handle;
43
44mod schema {
45 pub(super) mod bitswap {
46 include!(concat!(env!("OUT_DIR"), "/bitswap.rs"));
47 }
48}
49
50const LOG_TARGET: &str = "litep2p::ipfs::bitswap";
52
53const WRITE_TIMEOUT: Duration = Duration::from_secs(15);
55
56#[derive(Debug)]
58struct Prefix {
59 version: Version,
61
62 codec: u64,
64
65 multihash_type: u64,
67
68 multihash_len: u8,
70}
71
72impl Prefix {
73 pub fn to_bytes(&self) -> Vec<u8> {
75 let mut res = Vec::with_capacity(4 * 10);
76
77 let mut buf = unsigned_varint::encode::u64_buffer();
78 let version = unsigned_varint::encode::u64(self.version.into(), &mut buf);
79 res.extend_from_slice(version);
80
81 let mut buf = unsigned_varint::encode::u64_buffer();
82 let codec = unsigned_varint::encode::u64(self.codec, &mut buf);
83 res.extend_from_slice(codec);
84
85 let mut buf = unsigned_varint::encode::u64_buffer();
86 let multihash_type = unsigned_varint::encode::u64(self.multihash_type, &mut buf);
87 res.extend_from_slice(multihash_type);
88
89 let mut buf = unsigned_varint::encode::u64_buffer();
90 let multihash_len = unsigned_varint::encode::u64(self.multihash_len as u64, &mut buf);
91 res.extend_from_slice(multihash_len);
92 res
93 }
94}
95
96pub(crate) struct Bitswap {
98 service: TransportService,
100
101 event_tx: Sender<BitswapEvent>,
103
104 cmd_rx: Receiver<BitswapCommand>,
106
107 pending_outbound: HashMap<SubstreamId, Vec<ResponseType>>,
109
110 inbound: StreamMap<PeerId, Substream>,
112}
113
114impl Bitswap {
115 pub(crate) fn new(service: TransportService, config: Config) -> Self {
117 Self {
118 service,
119 cmd_rx: config.cmd_rx,
120 event_tx: config.event_tx,
121 pending_outbound: HashMap::new(),
122 inbound: StreamMap::new(),
123 }
124 }
125
126 fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
128 tracing::debug!(target: LOG_TARGET, ?peer, "handle inbound substream");
129
130 if self.inbound.insert(peer, substream).is_some() {
131 tracing::debug!(
133 target: LOG_TARGET,
134 ?peer,
135 "dropping inbound substream as remote opened a new one",
136 );
137 }
138 }
139
140 async fn on_message_received(
142 &mut self,
143 peer: PeerId,
144 message: bytes::BytesMut,
145 ) -> Result<(), Error> {
146 tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound message");
147
148 let message = schema::bitswap::Message::decode(message)?;
149
150 let Some(wantlist) = message.wantlist else {
151 tracing::debug!(target: LOG_TARGET, "bitswap message doesn't contain `WantList`");
152 return Err(Error::InvalidData);
153 };
154
155 let cids = wantlist
156 .entries
157 .into_iter()
158 .filter_map(|entry| {
159 let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
160
161 let want_type = match entry.want_type {
162 0 => WantType::Block,
163 1 => WantType::Have,
164 _ => return None,
165 };
166
167 Some((cid, want_type))
168 })
169 .collect::<Vec<_>>();
170
171 let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
172
173 Ok(())
174 }
175
176 async fn on_outbound_substream(
178 &mut self,
179 peer: PeerId,
180 substream_id: SubstreamId,
181 mut substream: Substream,
182 ) {
183 let Some(entries) = self.pending_outbound.remove(&substream_id) else {
184 tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist");
185 return;
186 };
187
188 let mut response = schema::bitswap::Message {
189 wantlist: Some(Default::default()),
192 ..Default::default()
193 };
194
195 for entry in entries {
196 match entry {
197 ResponseType::Block { cid, block } => {
198 let prefix = Prefix {
199 version: cid.version(),
200 codec: cid.codec(),
201 multihash_type: cid.hash().code(),
202 multihash_len: cid.hash().size(),
203 }
204 .to_bytes();
205
206 response.payload.push(schema::bitswap::Block {
207 prefix,
208 data: block,
209 });
210 }
211 ResponseType::Presence { cid, presence } => {
212 response.block_presences.push(schema::bitswap::BlockPresence {
213 cid: cid.to_bytes(),
214 r#type: presence as i32,
215 });
216 }
217 }
218 }
219
220 let message = response.encode_to_vec().into();
221 let _ = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await;
222
223 substream.close().await;
224 }
225
226 fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
228 match self.service.open_substream(peer) {
229 Err(error) => {
230 tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream to peer")
231 }
232 Ok(substream_id) => {
233 self.pending_outbound.insert(substream_id, responses);
234 }
235 }
236 }
237
238 pub async fn run(mut self) {
240 tracing::debug!(target: LOG_TARGET, "starting bitswap event loop");
241
242 loop {
243 tokio::select! {
244 event = self.service.next() => match event {
245 Some(TransportEvent::SubstreamOpened {
246 peer,
247 substream,
248 direction,
249 ..
250 }) => match direction {
251 Direction::Inbound => self.on_inbound_substream(peer, substream),
252 Direction::Outbound(substream_id) =>
253 self.on_outbound_substream(peer, substream_id, substream).await,
254 },
255 None => return,
256 event => tracing::trace!(target: LOG_TARGET, ?event, "unhandled event"),
257 },
258 command = self.cmd_rx.recv() => match command {
259 Some(BitswapCommand::SendResponse { peer, responses }) => {
260 self.on_bitswap_response(peer, responses);
261 }
262 None => return,
263 },
264 Some((peer, message)) = self.inbound.next(), if !self.inbound.is_empty() => {
265 match message {
266 Ok(message) => if let Err(e) = self.on_message_received(peer, message).await {
267 tracing::trace!(
268 target: LOG_TARGET,
269 ?peer,
270 ?e,
271 "error handling inbound message, dropping substream",
272 );
273 self.inbound.remove(&peer);
274 },
275 Err(e) => {
276 tracing::trace!(
277 target: LOG_TARGET,
278 ?peer,
279 ?e,
280 "inbound substream closed",
281 );
282 self.inbound.remove(&peer);
283 },
284 }
285 }
286 }
287 }
288 }
289}