litep2p/protocol/libp2p/bitswap/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! [`/ipfs/bitswap/1.2.0`](https://github.com/ipfs/specs/blob/main/BITSWAP.md) implementation.
22
23use crate::{
24    error::{Error, ImmediateDialError},
25    protocol::{Direction, TransportEvent, TransportService},
26    substream::Substream,
27    types::{
28        multihash::{Code, MultihashDigest},
29        SubstreamId,
30    },
31    PeerId,
32};
33
34use bytes::Bytes;
35use cid::{Cid, Version};
36use prost::Message;
37use tokio::sync::mpsc::{Receiver, Sender};
38use tokio_stream::{StreamExt, StreamMap};
39
40pub use config::Config;
41pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType};
42pub use schema::bitswap::{wantlist::WantType, BlockPresenceType};
43use std::{
44    collections::{hash_map::Entry, vec_deque::Drain, HashMap, HashSet, VecDeque},
45    time::Duration,
46};
47
48mod config;
49mod handle;
50
51mod schema {
52    pub(super) mod bitswap {
53        include!(concat!(env!("OUT_DIR"), "/bitswap.rs"));
54    }
55}
56
57/// Log target for the file.
58const LOG_TARGET: &str = "litep2p::ipfs::bitswap";
59
60/// Write timeout for outbound messages.
61const WRITE_TIMEOUT: Duration = Duration::from_secs(15);
62
63/// Bitswap metadata.
64#[derive(Debug)]
65struct Prefix {
66    /// CID version.
67    version: Version,
68
69    /// CID codec.
70    codec: u64,
71
72    /// CID multihash type.
73    multihash_type: u64,
74
75    /// CID multihash length.
76    multihash_len: u8,
77}
78
79impl Prefix {
80    /// Convert the prefix to encoded bytes.
81    pub fn to_bytes(&self) -> Vec<u8> {
82        let mut res = Vec::with_capacity(4 * 10);
83
84        let mut buf = unsigned_varint::encode::u64_buffer();
85        let version = unsigned_varint::encode::u64(self.version.into(), &mut buf);
86        res.extend_from_slice(version);
87
88        let mut buf = unsigned_varint::encode::u64_buffer();
89        let codec = unsigned_varint::encode::u64(self.codec, &mut buf);
90        res.extend_from_slice(codec);
91
92        let mut buf = unsigned_varint::encode::u64_buffer();
93        let multihash_type = unsigned_varint::encode::u64(self.multihash_type, &mut buf);
94        res.extend_from_slice(multihash_type);
95
96        let mut buf = unsigned_varint::encode::u64_buffer();
97        let multihash_len = unsigned_varint::encode::u64(self.multihash_len as u64, &mut buf);
98        res.extend_from_slice(multihash_len);
99        res
100    }
101
102    /// Parse byte representation of prefix.
103    pub fn from_bytes(prefix_bytes: &[u8]) -> Option<Prefix> {
104        let (version, rest) = unsigned_varint::decode::u64(prefix_bytes).ok()?;
105        let (codec, rest) = unsigned_varint::decode::u64(rest).ok()?;
106        let (multihash_type, rest) = unsigned_varint::decode::u64(rest).ok()?;
107        let (multihash_len, rest) = unsigned_varint::decode::u64(rest).ok()?;
108        if !rest.is_empty() {
109            return None;
110        }
111
112        let version = Version::try_from(version).ok()?;
113        let multihash_len = u8::try_from(multihash_len).ok()?;
114
115        Some(Prefix {
116            version,
117            codec,
118            multihash_type,
119            multihash_len,
120        })
121    }
122}
123
124/// Action to perform when substream is opened.
125#[derive(Debug)]
126enum SubstreamAction {
127    /// Send a request.
128    SendRequest(Vec<(Cid, WantType)>),
129    /// Send a response.
130    SendResponse(Vec<ResponseType>),
131}
132
133/// Bitswap protocol.
134pub(crate) struct Bitswap {
135    // Connection service.
136    service: TransportService,
137
138    /// TX channel for sending events to the user protocol.
139    event_tx: Sender<BitswapEvent>,
140
141    /// RX channel for receiving commands from `BitswapHandle`.
142    cmd_rx: Receiver<BitswapCommand>,
143
144    /// Pending outbound actions.
145    pending_outbound: HashMap<PeerId, Vec<SubstreamAction>>,
146
147    /// Inbound substreams.
148    inbound: StreamMap<PeerId, Substream>,
149
150    /// Outbound substreams.
151    outbound: HashMap<PeerId, Substream>,
152
153    /// Peers waiting for dial.
154    pending_dials: HashSet<PeerId>,
155}
156
157impl Bitswap {
158    /// Create new [`Bitswap`] protocol.
159    pub(crate) fn new(service: TransportService, config: Config) -> Self {
160        Self {
161            service,
162            cmd_rx: config.cmd_rx,
163            event_tx: config.event_tx,
164            pending_outbound: HashMap::new(),
165            inbound: StreamMap::new(),
166            outbound: HashMap::new(),
167            pending_dials: HashSet::new(),
168        }
169    }
170
171    /// Substream opened to remote peer.
172    fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
173        tracing::debug!(target: LOG_TARGET, ?peer, "handle inbound substream");
174
175        if self.inbound.insert(peer, substream).is_some() {
176            // Only one inbound substream per peer is allowed in order to constrain resources.
177            tracing::debug!(
178                target: LOG_TARGET,
179                ?peer,
180                "dropping inbound substream as remote opened a new one",
181            );
182        }
183    }
184
185    /// Message received from remote peer.
186    async fn on_message_received(
187        &mut self,
188        peer: PeerId,
189        message: bytes::BytesMut,
190    ) -> Result<(), Error> {
191        tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound message");
192
193        let message = schema::bitswap::Message::decode(message)?;
194
195        // Check if this is a request (has wantlist with entries).
196        if let Some(wantlist) = &message.wantlist {
197            if !wantlist.entries.is_empty() {
198                let cids = wantlist
199                    .entries
200                    .iter()
201                    .filter_map(|entry| {
202                        let cid = Cid::read_bytes(entry.block.as_slice()).ok()?;
203
204                        let want_type = match entry.want_type {
205                            0 => WantType::Block,
206                            1 => WantType::Have,
207                            _ => return None,
208                        };
209
210                        Some((cid, want_type))
211                    })
212                    .collect::<Vec<_>>();
213
214                if !cids.is_empty() {
215                    let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await;
216                }
217            }
218        }
219
220        // Check if this is a response (has payload or block presences).
221        if !message.payload.is_empty() || !message.block_presences.is_empty() {
222            let mut responses = Vec::new();
223
224            // Process payload (blocks).
225            for block in message.payload {
226                let Some(Prefix {
227                    version,
228                    codec,
229                    multihash_type,
230                    multihash_len: _,
231                }) = Prefix::from_bytes(&block.prefix)
232                else {
233                    tracing::trace!(target: LOG_TARGET, ?peer, "invalid CID prefix received");
234                    continue;
235                };
236
237                // Create multihash from the block data.
238                let Ok(code) = Code::try_from(multihash_type) else {
239                    tracing::trace!(
240                        target: LOG_TARGET,
241                        ?peer,
242                        multihash_type,
243                        "usupported multihash type",
244                    );
245                    continue;
246                };
247
248                let multihash = code.digest(&block.data);
249
250                // We need to convert multihash to version supported by `cid` crate.
251                let Ok(multihash) =
252                    cid::multihash::Multihash::wrap(multihash.code(), multihash.digest())
253                else {
254                    tracing::trace!(
255                        target: LOG_TARGET,
256                        ?peer,
257                        multihash_type,
258                        "multihash size > 64 unsupported",
259                    );
260                    continue;
261                };
262
263                match Cid::new(version, codec, multihash) {
264                    Ok(cid) => responses.push(ResponseType::Block {
265                        cid,
266                        block: block.data,
267                    }),
268                    Err(error) => tracing::trace!(
269                        target: LOG_TARGET,
270                        ?peer,
271                        ?error,
272                        "invalid CID received",
273                    ),
274                }
275            }
276
277            // Process block presences.
278            for presence in message.block_presences {
279                if let Ok(cid) = Cid::read_bytes(&presence.cid[..]) {
280                    let presence_type = match presence.r#type {
281                        0 => BlockPresenceType::Have,
282                        1 => BlockPresenceType::DontHave,
283                        _ => continue,
284                    };
285
286                    responses.push(ResponseType::Presence {
287                        cid,
288                        presence: presence_type,
289                    });
290                }
291            }
292
293            if !responses.is_empty() {
294                let _ = self.event_tx.send(BitswapEvent::Response { peer, responses }).await;
295            }
296        }
297
298        Ok(())
299    }
300
301    /// Handle opened outbound substream.
302    async fn on_outbound_substream(
303        &mut self,
304        peer: PeerId,
305        substream_id: SubstreamId,
306        mut substream: Substream,
307    ) {
308        let Some(actions) = self.pending_outbound.remove(&peer) else {
309            tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist");
310            return;
311        };
312
313        tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");
314
315        for action in actions {
316            match action {
317                SubstreamAction::SendRequest(cids) => {
318                    if let Err(error) = send_request(&mut substream, cids).await {
319                        // Drop the substream and all actions in case of sending error.
320                        tracing::debug!(target: LOG_TARGET, ?peer, ?error, "bitswap request failed");
321                        return;
322                    }
323                }
324                SubstreamAction::SendResponse(entries) => {
325                    if let Err(error) = send_response(&mut substream, entries).await {
326                        // Drop the substream and all actions in case of sending error.
327                        tracing::debug!(target: LOG_TARGET, ?peer, ?error, "bitswap response failed");
328                        return;
329                    }
330                }
331            }
332        }
333
334        self.outbound.insert(peer, substream);
335    }
336
337    /// Handle connection established event.
338    fn on_connection_established(&mut self, peer: PeerId) {
339        // If we have pending actions for this peer, open a substream.
340        if self.pending_dials.remove(&peer) {
341            tracing::trace!(
342                target: LOG_TARGET,
343                ?peer,
344                "open substream after connection established",
345            );
346
347            if let Err(error) = self.service.open_substream(peer) {
348                tracing::debug!(
349                    target: LOG_TARGET,
350                    ?peer,
351                    ?error,
352                    "failed to open substream after connection established",
353                );
354                // Drop all pending actions; they are not going to be handled anyway, and we need
355                // the entry to be empty to properly open subsequent substreams.
356                self.pending_outbound.remove(&peer);
357            }
358        }
359    }
360
361    /// Open substream or dial a peer.
362    fn open_substream_or_dial(&mut self, peer: PeerId) {
363        tracing::trace!(target: LOG_TARGET, ?peer, "open substream");
364
365        if let Err(error) = self.service.open_substream(peer) {
366            tracing::trace!(
367                target: LOG_TARGET,
368                ?peer,
369                ?error,
370                "failed to open substream, dialing peer",
371            );
372
373            // Failed to open substream, try to dial the peer.
374            match self.service.dial(&peer) {
375                Ok(()) => {
376                    // Store the peer to open a substream once it is connected.
377                    self.pending_dials.insert(peer);
378                }
379                Err(ImmediateDialError::AlreadyConnected) => {
380                    // By the time we tried to dial peer, it got connected.
381                    if let Err(error) = self.service.open_substream(peer) {
382                        tracing::trace!(
383                            target: LOG_TARGET,
384                            ?peer,
385                            ?error,
386                            "failed to open substream for a second time",
387                        );
388                    }
389                }
390                Err(error) => {
391                    tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer");
392                }
393            }
394        }
395    }
396
397    /// Handle bitswap request.
398    async fn on_bitswap_request(&mut self, peer: PeerId, cids: Vec<(Cid, WantType)>) {
399        // Try to send request over existing substream first.
400        if let Entry::Occupied(mut entry) = self.outbound.entry(peer) {
401            if send_request(entry.get_mut(), cids.clone()).await.is_ok() {
402                return;
403            } else {
404                tracing::debug!(
405                    target: LOG_TARGET,
406                    ?peer,
407                    "failed to send request over existing substream",
408                );
409                entry.remove();
410            }
411        }
412
413        // Store pending actions for once the substream is opened.
414        let pending_actions = self.pending_outbound.entry(peer).or_default();
415        // If we inserted the default empty entry above, this means no pending substream
416        // was requested by previous calls to `on_bitswap_request`. We will request a substream
417        // in this case below.
418        let no_substream_pending = pending_actions.is_empty();
419
420        pending_actions.push(SubstreamAction::SendRequest(cids));
421
422        if no_substream_pending {
423            self.open_substream_or_dial(peer);
424        }
425    }
426
427    /// Handle bitswap response.
428    async fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec<ResponseType>) {
429        // Try to send response over existing substream first.
430        if let Entry::Occupied(mut entry) = self.outbound.entry(peer) {
431            if send_response(entry.get_mut(), responses.clone()).await.is_ok() {
432                return;
433            } else {
434                tracing::debug!(
435                    target: LOG_TARGET,
436                    ?peer,
437                    "failed to send response over existing substream",
438                );
439                entry.remove();
440            }
441        }
442
443        // Store pending actions for later and open substream if not requested already.
444        let pending_actions = self.pending_outbound.entry(peer).or_default();
445        let no_pending_substream = pending_actions.is_empty();
446        pending_actions.push(SubstreamAction::SendResponse(responses));
447
448        if no_pending_substream {
449            self.open_substream_or_dial(peer);
450        }
451    }
452
453    /// Start [`Bitswap`] event loop.
454    pub async fn run(mut self) {
455        tracing::debug!(target: LOG_TARGET, "starting bitswap event loop");
456
457        loop {
458            tokio::select! {
459                event = self.service.next() => match event {
460                    Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
461                        self.on_connection_established(peer);
462                    }
463                    Some(TransportEvent::SubstreamOpened {
464                        peer,
465                        substream,
466                        direction,
467                        ..
468                    }) => match direction {
469                        Direction::Inbound => self.on_inbound_substream(peer, substream),
470                        Direction::Outbound(substream_id) =>
471                            self.on_outbound_substream(peer, substream_id, substream).await,
472                    },
473                    None => return,
474                    event => tracing::trace!(target: LOG_TARGET, ?event, "unhandled event"),
475                },
476                command = self.cmd_rx.recv() => match command {
477                    Some(BitswapCommand::SendRequest { peer, cids }) => {
478                        self.on_bitswap_request(peer, cids).await;
479                    }
480                    Some(BitswapCommand::SendResponse { peer, responses }) => {
481                        self.on_bitswap_response(peer, responses).await;
482                    }
483                    None => return,
484                },
485                Some((peer, message)) = self.inbound.next(), if !self.inbound.is_empty() => {
486                    match message {
487                        Ok(message) => if let Err(e) = self.on_message_received(peer, message).await {
488                            tracing::trace!(
489                                target: LOG_TARGET,
490                                ?peer,
491                                ?e,
492                                "error handling inbound message, dropping substream",
493                            );
494                            self.inbound.remove(&peer);
495                        },
496                        Err(e) => {
497                            tracing::trace!(
498                                target: LOG_TARGET,
499                                ?peer,
500                                ?e,
501                                "inbound substream closed",
502                            );
503                            self.inbound.remove(&peer);
504                        },
505                    }
506                }
507            }
508        }
509    }
510}
511
512async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) -> Result<(), Error> {
513    let request = schema::bitswap::Message {
514        wantlist: Some(schema::bitswap::Wantlist {
515            entries: cids
516                .into_iter()
517                .map(|(cid, want_type)| schema::bitswap::wantlist::Entry {
518                    block: cid.to_bytes(),
519                    priority: 1,
520                    cancel: false,
521                    want_type: want_type as i32,
522                    send_dont_have: false,
523                })
524                .collect(),
525            full: false,
526        }),
527        ..Default::default()
528    };
529
530    let message = request.encode_to_vec().into();
531    match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
532        Err(_) => Err(Error::Timeout),
533        Ok(Err(e)) => Err(Error::SubstreamError(e)),
534        Ok(Ok(())) => Ok(()),
535    }
536}
537
538async fn send_response(substream: &mut Substream, entries: Vec<ResponseType>) -> Result<(), Error> {
539    // Send presences in a separate message to not deal with it when batching blocks below.
540    if let Some((message, cid_count)) =
541        presences_message(entries.iter().filter_map(|entry| match entry {
542            ResponseType::Presence { cid, presence } => Some((*cid, *presence)),
543            ResponseType::Block { .. } => None,
544        }))
545    {
546        if message.len() <= config::MAX_MESSAGE_SIZE {
547            tracing::trace!(
548                target: LOG_TARGET,
549                cid_count,
550                "sending Bitswap presence message",
551            );
552            match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
553                Err(_) => return Err(Error::Timeout),
554                Ok(Err(e)) => return Err(Error::SubstreamError(e)),
555                Ok(Ok(())) => {}
556            }
557        } else {
558            // This should never happen in practice, but log a warning if the presence message
559            // exceeded [`config::MAX_MESSAGE_SIZE`].
560            tracing::warn!(
561                target: LOG_TARGET,
562                size = message.len(),
563                max_size = config::MAX_MESSAGE_SIZE,
564                "outgoing Bitswap presence message exceeded max size",
565            );
566        }
567    }
568
569    // Send blocks in batches of up to [`config::MAX_BATCH_SIZE`] bytes.
570    let mut blocks = entries
571        .into_iter()
572        .filter_map(|entry| match entry {
573            ResponseType::Block { cid, block } => Some((cid, block)),
574            ResponseType::Presence { .. } => None,
575        })
576        .collect::<VecDeque<_>>();
577
578    while let Some(batch) = extract_next_batch(&mut blocks, config::MAX_BATCH_SIZE) {
579        if let Some((message, block_count)) = blocks_message(batch) {
580            if message.len() <= config::MAX_MESSAGE_SIZE {
581                tracing::trace!(
582                    target: LOG_TARGET,
583                    block_count,
584                    "sending Bitswap blocks message",
585                );
586                match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
587                    Err(_) => return Err(Error::Timeout),
588                    Ok(Err(e)) => return Err(Error::SubstreamError(e)),
589                    Ok(Ok(())) => {}
590                }
591            } else {
592                // This should never happen in practice, but log a warning if the blocks message
593                // exceeded [`config::MAX_MESSAGE_SIZE`].
594                tracing::warn!(
595                    target: LOG_TARGET,
596                    size = message.len(),
597                    max_size = config::MAX_MESSAGE_SIZE,
598                    "outgoing Bitswap blocks message exceeded max size",
599                );
600            }
601        }
602    }
603
604    Ok(())
605}
606
607fn presences_message(
608    presences: impl IntoIterator<Item = (Cid, BlockPresenceType)>,
609) -> Option<(Bytes, usize)> {
610    let message = schema::bitswap::Message {
611        // Set wantlist to not cause null pointer dereference in older versions of Kubo.
612        wantlist: Some(Default::default()),
613        block_presences: presences
614            .into_iter()
615            .map(|(cid, presence)| schema::bitswap::BlockPresence {
616                cid: cid.to_bytes(),
617                r#type: presence as i32,
618            })
619            .collect(),
620        ..Default::default()
621    };
622
623    let count = message.block_presences.len();
624
625    (count > 0).then(|| (message.encode_to_vec().into(), count))
626}
627
628fn blocks_message(blocks: impl IntoIterator<Item = (Cid, Vec<u8>)>) -> Option<(Bytes, usize)> {
629    let message = schema::bitswap::Message {
630        // Set wantlist to not cause null pointer dereference in older versions of Kubo.
631        wantlist: Some(Default::default()),
632        payload: blocks
633            .into_iter()
634            .map(|(cid, block)| {
635                let prefix = Prefix {
636                    version: cid.version(),
637                    codec: cid.codec(),
638                    multihash_type: cid.hash().code(),
639                    multihash_len: cid.hash().size(),
640                }
641                .to_bytes();
642
643                schema::bitswap::Block {
644                    prefix,
645                    data: block,
646                }
647            })
648            .collect(),
649        ..Default::default()
650    };
651
652    let count = message.payload.len();
653
654    (count > 0).then(|| (message.encode_to_vec().into(), count))
655}
656
657/// Extract a batch of blocks of no more than `max_size` from `blocks`.
658/// Returns `None` if no more blocks are left.
659fn extract_next_batch<'a>(
660    blocks: &'a mut VecDeque<(Cid, Vec<u8>)>,
661    max_batch_size: usize,
662) -> Option<Drain<'a, (Cid, Vec<u8>)>> {
663    // Get rid of oversized blocks to not stall the processing by not being able to queue them.
664    loop {
665        let block = blocks.front()?;
666        if block.1.len() > max_batch_size {
667            tracing::warn!(
668                target: LOG_TARGET,
669                cid = block.0.to_string(),
670                size = block.1.len(),
671                max_batch_size,
672                "outgoing Bitswap block exceeded max batch size",
673            );
674            blocks.pop_front();
675        } else {
676            break;
677        }
678    }
679
680    // Determine how many blocks we can batch. Note that we can always batch at least one
681    // block due to check above.
682    let mut total_size = 0;
683    let mut block_count = 0;
684
685    for b in blocks.iter() {
686        let next_block_size = b.1.len();
687        if total_size + next_block_size > max_batch_size {
688            break;
689        }
690        total_size += next_block_size;
691        block_count += 1;
692    }
693
694    Some(blocks.drain(..block_count))
695}
696
697#[cfg(test)]
698mod tests {
699    use cid::multihash::Multihash;
700
701    use super::*;
702
703    fn cid(block: &[u8]) -> Cid {
704        let codec = 0x55;
705        let multihash = Code::Sha2_256.digest(block);
706        let multihash =
707            Multihash::wrap(multihash.code(), multihash.digest()).expect("to be valid multihash");
708
709        Cid::new_v1(codec, multihash)
710    }
711
712    #[test]
713    fn extract_next_batch_fits_max_size() {
714        let max_size = 100;
715
716        let block1 = vec![0x01; 10];
717        let block2 = vec![0x02; 10];
718        let block3 = vec![0x03; 10];
719
720        let blocks = vec![
721            (cid(&block1), block1),
722            (cid(&block2), block2),
723            (cid(&block3), block3),
724        ];
725        let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
726
727        let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
728        assert_eq!(batch.collect::<Vec<_>>(), blocks);
729
730        assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
731    }
732
733    #[test]
734    fn extract_next_batch_chunking_exact() {
735        let max_size = 20;
736
737        let block1 = vec![0x01; 10];
738        let block2 = vec![0x02; 10];
739        let block3 = vec![0x03; 10];
740
741        let blocks = [
742            (cid(&block1), block1.clone()),
743            (cid(&block2), block2.clone()),
744            (cid(&block3), block3.clone()),
745        ];
746        let chunk1 = vec![
747            (cid(&block1), block1.clone()),
748            (cid(&block2), block2.clone()),
749        ];
750        let chunk2 = vec![(cid(&block3), block3.clone())];
751        let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
752
753        let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
754        assert_eq!(batch.collect::<Vec<_>>(), chunk1);
755
756        let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
757        assert_eq!(batch.collect::<Vec<_>>(), chunk2);
758
759        assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
760    }
761
762    #[test]
763    fn extract_next_batch_chunking_less_than() {
764        let max_size = 20;
765
766        let block1 = vec![0x01; 10];
767        let block2 = vec![0x02; 9];
768        let block3 = vec![0x03; 10];
769
770        let blocks = [
771            (cid(&block1), block1.clone()),
772            (cid(&block2), block2.clone()),
773            (cid(&block3), block3.clone()),
774        ];
775        let chunk1 = vec![
776            (cid(&block1), block1.clone()),
777            (cid(&block2), block2.clone()),
778        ];
779        let chunk2 = vec![(cid(&block3), block3.clone())];
780        let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
781
782        let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
783        assert_eq!(batch.collect::<Vec<_>>(), chunk1);
784
785        let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
786        assert_eq!(batch.collect::<Vec<_>>(), chunk2);
787
788        assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
789    }
790
791    #[test]
792    fn extract_next_batch_oversized_blocks_discarded() {
793        let max_size = 20;
794
795        let block1 = vec![0x01; 10];
796        let block2 = vec![0x02; 101];
797        let block3 = vec![0x03; 10];
798
799        let blocks = [
800            (cid(&block1), block1.clone()),
801            (cid(&block2), block2.clone()),
802            (cid(&block3), block3.clone()),
803        ];
804        let chunk1 = vec![(cid(&block1), block1.clone())];
805        let chunk2 = vec![(cid(&block3), block3.clone())];
806        let mut blocks_deque = blocks.iter().cloned().collect::<VecDeque<_>>();
807
808        let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
809        assert_eq!(batch.collect::<Vec<_>>(), chunk1);
810
811        let batch = extract_next_batch(&mut blocks_deque, max_size).unwrap();
812        assert_eq!(batch.collect::<Vec<_>>(), chunk2);
813
814        assert!(extract_next_batch(&mut blocks_deque, max_size).is_none());
815    }
816}