sc_network/litep2p/shim/
bitswap.rs1use futures::StreamExt;
22use litep2p::protocol::libp2p::bitswap::{
23 BitswapEvent, BitswapHandle, BlockPresenceType, Config, ResponseType, WantType,
24};
25
26use sc_client_api::BlockBackend;
27use sp_runtime::traits::Block as BlockT;
28
29use std::{future::Future, pin::Pin, sync::Arc};
30
31const LOG_TARGET: &str = "sub-libp2p::bitswap";
33
34pub struct BitswapServer<Block: BlockT> {
35 handle: BitswapHandle,
37
38 client: Arc<dyn BlockBackend<Block> + Send + Sync>,
40}
41
42impl<Block: BlockT> BitswapServer<Block> {
43 pub fn new(
45 client: Arc<dyn BlockBackend<Block> + Send + Sync>,
46 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Config) {
47 let (config, handle) = Config::new();
48 let bitswap = Self { client, handle };
49
50 (Box::pin(async move { bitswap.run().await }), config)
51 }
52
53 async fn run(mut self) {
54 log::debug!(target: LOG_TARGET, "starting bitswap server");
55
56 while let Some(event) = self.handle.next().await {
57 match event {
58 BitswapEvent::Request { peer, cids } => {
59 log::debug!(target: LOG_TARGET, "handle bitswap request from {peer:?} for {cids:?}");
60
61 let response: Vec<ResponseType> = cids
62 .into_iter()
63 .map(|(cid, want_type)| {
64 let mut hash = Block::Hash::default();
65 hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
66 let transaction = match self.client.indexed_transaction(hash) {
67 Ok(ex) => ex,
68 Err(error) => {
69 log::error!(target: LOG_TARGET, "error retrieving transaction {hash}: {error}");
70 None
71 },
72 };
73
74 match transaction {
75 Some(transaction) => {
76 log::trace!(target: LOG_TARGET, "found cid {cid:?}, hash {hash:?}");
77
78 match want_type {
79 WantType::Block =>
80 ResponseType::Block { cid, block: transaction },
81 _ => ResponseType::Presence {
82 cid,
83 presence: BlockPresenceType::Have,
84 },
85 }
86 },
87 None => {
88 log::trace!(target: LOG_TARGET, "missing cid {cid:?}, hash {hash:?}");
89
90 ResponseType::Presence {
91 cid,
92 presence: BlockPresenceType::DontHave,
93 }
94 },
95 }
96 })
97 .collect();
98
99 self.handle.send_response(peer, response).await;
100 },
101 }
102 }
103 }
104}