use crate::{
error::Error,
protocol::{
libp2p::bitswap::handle::BitswapCommand, Direction, TransportEvent, TransportService,
},
substream::Substream,
types::SubstreamId,
PeerId,
};
use cid::{multihash::Code, Version};
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use prost::Message;
use tokio::sync::mpsc::{Receiver, Sender};
use std::collections::HashMap;
pub use cid::Cid;
pub use config::Config;
pub use handle::{BitswapEvent, BitswapHandle, ResponseType};
pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
mod config;
mod handle;
mod schema {
pub(super) mod bitswap {
include!(concat!(env!("OUT_DIR"), "/bitswap.rs"));
}
}
const LOG_TARGET: &str = "litep2p::ipfs::bitswap";
#[derive(Debug)]
struct Prefix {
version: Version,
codec: u64,
multihash_type: u64,
multihash_len: u8,
}
impl Prefix {
pub fn to_bytes(&self) -> Vec<u8> {
let mut res = Vec::with_capacity(4 * 10);
let mut buf = unsigned_varint::encode::u64_buffer();
let version = unsigned_varint::encode::u64(self.version.into(), &mut buf);
res.extend_from_slice(version);
let mut buf = unsigned_varint::encode::u64_buffer();
let codec = unsigned_varint::encode::u64(self.codec, &mut buf);
res.extend_from_slice(codec);
let mut buf = unsigned_varint::encode::u64_buffer();
let multihash_type = unsigned_varint::encode::u64(self.multihash_type, &mut buf);
res.extend_from_slice(multihash_type);
let mut buf = unsigned_varint::encode::u64_buffer();
let multihash_len = unsigned_varint::encode::u64(self.multihash_len as u64, &mut buf);
res.extend_from_slice(multihash_len);
res
}
}
pub(crate) struct Bitswap {
service: TransportService,
event_tx: Sender<BitswapEvent>,
cmd_rx: Receiver<BitswapCommand>,
pending_outbound: HashMap<SubstreamId, Vec<ResponseType>>,
pending_inbound:
FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Vec<(Cid, WantType)>)>>>,
}
impl Bitswap {
pub(crate) fn new(service: TransportService, config: Config) -> Self {
Self {
service,
cmd_rx: config.cmd_rx,
event_tx: config.event_tx,
pending_outbound: HashMap::new(),
pending_inbound: FuturesUnordered::new(),
}
}
fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
tracing::debug!(target: LOG_TARGET, ?peer, "handle inbound substream");
self.pending_inbound.push(Box::pin(async move {
let message = substream.next().await.ok_or(Error::ConnectionClosed)??;
let message = schema::bitswap::Message::decode(message)?;
let Some(wantlist) = message.wantlist else {
tracing::debug!(target: LOG_TARGET, "bitswap message doesn't contain `WantList`");
return Err(Error::InvalidData);
};
Ok((
peer,
wantlist
.entries
.into_iter()
.filter_map(|entry| {
let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
let want_type = match entry.want_type {
0 => WantType::Block,
1 => WantType::Have,
_ => return None,
};
(cid.version() == cid::Version::V1
&& cid.hash().code() == u64::from(Code::Blake2b256)
&& cid.hash().size() == 32)
.then_some((cid, want_type))
})
.collect::<Vec<_>>(),
))
}));
}
async fn on_outbound_substream(
&mut self,
peer: PeerId,
substream_id: SubstreamId,
mut substream: Substream,
) {
let Some(entries) = self.pending_outbound.remove(&substream_id) else {
tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist");
return;
};
let mut response = schema::bitswap::Message::default();
for entry in entries {
match entry {
ResponseType::Block { cid, block } => {
let prefix = Prefix {
version: cid.version(),
codec: cid.codec(),
multihash_type: cid.hash().code(),
multihash_len: cid.hash().size(),
}
.to_bytes();
response.payload.push(schema::bitswap::Block {
prefix,
data: block,
});
}
ResponseType::Presence { cid, presence } => {
response.block_presences.push(schema::bitswap::BlockPresence {
cid: cid.to_bytes(),
r#type: presence as i32,
});
}
}
}
let _ = substream.send_framed(response.encode_to_vec().into()).await;
}
fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
match self.service.open_substream(peer) {
Err(error) => {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream to peer")
}
Ok(substream_id) => {
self.pending_outbound.insert(substream_id, responses);
}
}
}
pub async fn run(mut self) {
tracing::debug!(target: LOG_TARGET, "starting bitswap event loop");
loop {
tokio::select! {
event = self.service.next() => match event {
Some(TransportEvent::SubstreamOpened {
peer,
substream,
direction,
..
}) => match direction {
Direction::Inbound => self.on_inbound_substream(peer, substream),
Direction::Outbound(substream_id) =>
self.on_outbound_substream(peer, substream_id, substream).await,
},
None => return,
event => tracing::trace!(target: LOG_TARGET, ?event, "unhandled event"),
},
command = self.cmd_rx.recv() => match command {
Some(BitswapCommand::SendResponse { peer, responses }) => {
self.on_bitswap_response(peer, responses);
}
None => return,
},
event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {
if let Some(Ok((peer, cids))) = event {
let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
}
}
}
}
}
}