litep2p/protocol/libp2p/bitswap/
mod.rs1use crate::{
24 error::Error,
25 protocol::{
26 libp2p::bitswap::handle::BitswapCommand, Direction, TransportEvent, TransportService,
27 },
28 substream::Substream,
29 types::SubstreamId,
30 PeerId,
31};
32
33use cid::{multihash::Code, Version};
34use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
35use prost::Message;
36use tokio::sync::mpsc::{Receiver, Sender};
37
38use std::collections::HashMap;
39
40pub use cid::Cid;
41pub use config::Config;
42pub use handle::{BitswapEvent, BitswapHandle, ResponseType};
43pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
44
45mod config;
46mod handle;
47
48mod schema {
49 pub(super) mod bitswap {
50 include!(concat!(env!("OUT_DIR"), "/bitswap.rs"));
51 }
52}
53
54const LOG_TARGET: &str = "litep2p::ipfs::bitswap";
56
57#[derive(Debug)]
59struct Prefix {
60 version: Version,
62
63 codec: u64,
65
66 multihash_type: u64,
68
69 multihash_len: u8,
71}
72
73impl Prefix {
74 pub fn to_bytes(&self) -> Vec<u8> {
76 let mut res = Vec::with_capacity(4 * 10);
77
78 let mut buf = unsigned_varint::encode::u64_buffer();
79 let version = unsigned_varint::encode::u64(self.version.into(), &mut buf);
80 res.extend_from_slice(version);
81
82 let mut buf = unsigned_varint::encode::u64_buffer();
83 let codec = unsigned_varint::encode::u64(self.codec, &mut buf);
84 res.extend_from_slice(codec);
85
86 let mut buf = unsigned_varint::encode::u64_buffer();
87 let multihash_type = unsigned_varint::encode::u64(self.multihash_type, &mut buf);
88 res.extend_from_slice(multihash_type);
89
90 let mut buf = unsigned_varint::encode::u64_buffer();
91 let multihash_len = unsigned_varint::encode::u64(self.multihash_len as u64, &mut buf);
92 res.extend_from_slice(multihash_len);
93 res
94 }
95}
96
97pub(crate) struct Bitswap {
99 service: TransportService,
101
102 event_tx: Sender<BitswapEvent>,
104
105 cmd_rx: Receiver<BitswapCommand>,
107
108 pending_outbound: HashMap<SubstreamId, Vec<ResponseType>>,
110
111 pending_inbound:
113 FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Vec<(Cid, WantType)>)>>>,
114}
115
116impl Bitswap {
117 pub(crate) fn new(service: TransportService, config: Config) -> Self {
119 Self {
120 service,
121 cmd_rx: config.cmd_rx,
122 event_tx: config.event_tx,
123 pending_outbound: HashMap::new(),
124 pending_inbound: FuturesUnordered::new(),
125 }
126 }
127
128 fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
130 tracing::debug!(target: LOG_TARGET, ?peer, "handle inbound substream");
131
132 self.pending_inbound.push(Box::pin(async move {
133 let message = substream.next().await.ok_or(Error::ConnectionClosed)??;
134 let message = schema::bitswap::Message::decode(message)?;
135
136 let Some(wantlist) = message.wantlist else {
137 tracing::debug!(target: LOG_TARGET, "bitswap message doesn't contain `WantList`");
138 return Err(Error::InvalidData);
139 };
140
141 Ok((
142 peer,
143 wantlist
144 .entries
145 .into_iter()
146 .filter_map(|entry| {
147 let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
148
149 let want_type = match entry.want_type {
150 0 => WantType::Block,
151 1 => WantType::Have,
152 _ => return None,
153 };
154
155 (cid.version() == cid::Version::V1
156 && cid.hash().code() == u64::from(Code::Blake2b256)
157 && cid.hash().size() == 32)
158 .then_some((cid, want_type))
159 })
160 .collect::<Vec<_>>(),
161 ))
162 }));
163 }
164
165 async fn on_outbound_substream(
167 &mut self,
168 peer: PeerId,
169 substream_id: SubstreamId,
170 mut substream: Substream,
171 ) {
172 let Some(entries) = self.pending_outbound.remove(&substream_id) else {
173 tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist");
174 return;
175 };
176
177 let mut response = schema::bitswap::Message::default();
178
179 for entry in entries {
180 match entry {
181 ResponseType::Block { cid, block } => {
182 let prefix = Prefix {
183 version: cid.version(),
184 codec: cid.codec(),
185 multihash_type: cid.hash().code(),
186 multihash_len: cid.hash().size(),
187 }
188 .to_bytes();
189
190 response.payload.push(schema::bitswap::Block {
191 prefix,
192 data: block,
193 });
194 }
195 ResponseType::Presence { cid, presence } => {
196 response.block_presences.push(schema::bitswap::BlockPresence {
197 cid: cid.to_bytes(),
198 r#type: presence as i32,
199 });
200 }
201 }
202 }
203
204 let _ = substream.send_framed(response.encode_to_vec().into()).await;
205 }
206
207 fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
209 match self.service.open_substream(peer) {
210 Err(error) => {
211 tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream to peer")
212 }
213 Ok(substream_id) => {
214 self.pending_outbound.insert(substream_id, responses);
215 }
216 }
217 }
218
219 pub async fn run(mut self) {
221 tracing::debug!(target: LOG_TARGET, "starting bitswap event loop");
222
223 loop {
224 tokio::select! {
225 event = self.service.next() => match event {
226 Some(TransportEvent::SubstreamOpened {
227 peer,
228 substream,
229 direction,
230 ..
231 }) => match direction {
232 Direction::Inbound => self.on_inbound_substream(peer, substream),
233 Direction::Outbound(substream_id) =>
234 self.on_outbound_substream(peer, substream_id, substream).await,
235 },
236 None => return,
237 event => tracing::trace!(target: LOG_TARGET, ?event, "unhandled event"),
238 },
239 command = self.cmd_rx.recv() => match command {
240 Some(BitswapCommand::SendResponse { peer, responses }) => {
241 self.on_bitswap_response(peer, responses);
242 }
243 None => return,
244 },
245 event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {
246 if let Some(Ok((peer, cids))) = event {
247 let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
248 }
249 }
250 }
251 }
252 }
253}