use libp2p::PeerId;
use log::trace;
use sc_network_common::sync::message;
use sp_runtime::traits::{Block as BlockT, NumberFor, One};
use std::{
cmp,
collections::{BTreeMap, HashMap},
ops::Range,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockData<B: BlockT> {
pub block: message::BlockData<B>,
pub origin: Option<PeerId>,
}
#[derive(Debug)]
enum BlockRangeState<B: BlockT> {
Downloading { len: NumberFor<B>, downloading: u32 },
Complete(Vec<BlockData<B>>),
Queued { len: NumberFor<B> },
}
impl<B: BlockT> BlockRangeState<B> {
pub fn len(&self) -> NumberFor<B> {
match *self {
Self::Downloading { len, .. } => len,
Self::Complete(ref blocks) => (blocks.len() as u32).into(),
Self::Queued { len } => len,
}
}
}
#[derive(Default)]
pub struct BlockCollection<B: BlockT> {
blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
peer_requests: HashMap<PeerId, NumberFor<B>>,
queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
}
impl<B: BlockT> BlockCollection<B> {
pub fn new() -> Self {
Self {
blocks: BTreeMap::new(),
peer_requests: HashMap::new(),
queued_blocks: HashMap::new(),
}
}
pub fn clear(&mut self) {
self.blocks.clear();
self.peer_requests.clear();
}
pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
if blocks.is_empty() {
return
}
match self.blocks.get(&start) {
Some(&BlockRangeState::Downloading { .. }) => {
trace!(target: "sync", "Inserting block data still marked as being downloaded: {}", start);
},
Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
return
},
_ => (),
}
self.blocks.insert(
start,
BlockRangeState::Complete(
blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
),
);
}
pub fn needed_blocks(
&mut self,
who: PeerId,
count: u32,
peer_best: NumberFor<B>,
common: NumberFor<B>,
max_parallel: u32,
max_ahead: u32,
) -> Option<Range<NumberFor<B>>> {
if peer_best <= common {
return None
}
let first_different = common + <NumberFor<B>>::one();
let count = (count as u32).into();
let (mut range, downloading) = {
let mut downloading_iter = self.blocks.iter().peekable();
let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
loop {
let next = downloading_iter.next();
break match (prev, next) {
(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
if downloading < max_parallel =>
(*start..*start + *len, downloading),
(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
(*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0), (Some((start, r)), None) => (*start + r.len()..*start + r.len() + count, 0), (None, None) => (first_different..first_different + count, 0), (None, Some((start, _))) if *start > first_different =>
(first_different..cmp::min(first_different + count, *start), 0), _ => {
prev = next;
continue
},
}
}
};
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
return None
}
range.end = cmp::min(peer_best + One::one(), range.end);
if self
.blocks
.iter()
.next()
.map_or(false, |(n, _)| range.start > *n + max_ahead.into())
{
trace!(target: "sync", "Too far ahead for peer {} ({})", who, range.start);
return None
}
self.peer_requests.insert(who, range.start);
self.blocks.insert(
range.start,
BlockRangeState::Downloading {
len: range.end - range.start,
downloading: downloading + 1,
},
);
if range.end <= range.start {
panic!(
"Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
range, count, peer_best, common, self.blocks
);
}
Some(range)
}
pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut ready = Vec::new();
let mut prev = from;
for (&start, range_data) in &mut self.blocks {
if start > prev {
break
}
let len = match range_data {
BlockRangeState::Complete(blocks) => {
let len = (blocks.len() as u32).into();
prev = start + len;
if let Some(BlockData { block, .. }) = blocks.first() {
self.queued_blocks
.insert(block.hash, (start, start + (blocks.len() as u32).into()));
}
ready.append(blocks);
len
},
BlockRangeState::Queued { .. } => continue,
_ => break,
};
*range_data = BlockRangeState::Queued { len };
}
trace!(target: "sync", "{} blocks ready for import", ready.len());
ready
}
pub fn clear_queued(&mut self, hash: &B::Hash) {
if let Some((from, to)) = self.queued_blocks.remove(hash) {
let mut block_num = from;
while block_num < to {
self.blocks.remove(&block_num);
block_num += One::one();
}
trace!(target: "sync", "Cleared blocks from {:?} to {:?}", from, to);
}
}
pub fn clear_peer_download(&mut self, who: &PeerId) {
if let Some(start) = self.peer_requests.remove(who) {
let remove = match self.blocks.get_mut(&start) {
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
if *downloading > 1 =>
{
*downloading -= 1;
false
},
Some(&mut BlockRangeState::Downloading { .. }) => true,
_ => false,
};
if remove {
self.blocks.remove(&start);
}
}
}
}
#[cfg(test)]
mod test {
use super::{BlockCollection, BlockData, BlockRangeState};
use libp2p::PeerId;
use sc_network_common::sync::message;
use sp_core::H256;
use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper};
type Block = RawBlock<ExtrinsicWrapper<u64>>;
fn is_empty(bc: &BlockCollection<Block>) -> bool {
bc.blocks.is_empty() && bc.peer_requests.is_empty()
}
fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
(0..n)
.map(|_| message::generic::BlockData {
hash: H256::random(),
header: None,
body: None,
indexed_body: None,
message_queue: None,
receipt: None,
justification: None,
justifications: None,
})
.collect()
}
#[test]
fn create_clear() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
bc.insert(1, generate_blocks(100), PeerId::random());
assert!(!is_empty(&bc));
bc.clear();
assert!(is_empty(&bc));
}
#[test]
fn insert_blocks() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let blocks = generate_blocks(150);
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
bc.clear_peer_download(&peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1);
assert_eq!(bc.ready_blocks(1), vec![]);
assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
bc.clear_peer_download(&peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0);
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
assert_eq!(
bc.ready_blocks(1),
blocks[1..11]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
.collect::<Vec<_>>()
);
bc.clear_peer_download(&peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0);
let ready = bc.ready_blocks(12);
assert_eq!(
ready[..30],
blocks[11..41]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
ready[30..],
blocks[41..81]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
.collect::<Vec<_>>()[..]
);
bc.clear_peer_download(&peer2);
assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
bc.clear_peer_download(&peer2);
bc.insert(81, blocks[81..121].to_vec(), peer2);
bc.clear_peer_download(&peer1);
bc.insert(121, blocks[121..150].to_vec(), peer1);
assert_eq!(bc.ready_blocks(80), vec![]);
let ready = bc.ready_blocks(81);
assert_eq!(
ready[..40],
blocks[81..121]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
ready[40..],
blocks[121..150]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
.collect::<Vec<_>>()[..]
);
}
#[test]
fn large_gap() {
let mut bc: BlockCollection<Block> = BlockCollection::new();
bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
let blocks = generate_blocks(10)
.into_iter()
.map(|b| BlockData { block: b, origin: None })
.collect();
bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
let peer0 = PeerId::random();
assert_eq!(bc.needed_blocks(peer0, 128, 10000, 000, 1, 200), Some(1..100));
assert_eq!(bc.needed_blocks(peer0, 128, 10000, 600, 1, 200), None); assert_eq!(
bc.needed_blocks(peer0, 128, 10000, 600, 1, 200000),
Some(100 + 128..100 + 128 + 128)
);
}
#[test]
fn no_duplicate_requests_on_fork() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer = PeerId::random();
let blocks = generate_blocks(10);
assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
bc.clear_peer_download(&peer);
bc.insert(40, blocks[..5].to_vec(), peer);
let ready = bc.ready_blocks(48);
assert_eq!(
ready,
blocks[..5]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
.collect::<Vec<_>>()
);
assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
}
#[test]
fn clear_queued_subsequent_ranges() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer = PeerId::random();
let blocks = generate_blocks(10);
assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
bc.clear_peer_download(&peer);
bc.insert(40, blocks.to_vec(), peer);
let ready = bc.ready_blocks(1000);
assert_eq!(
ready,
blocks
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
.collect::<Vec<_>>()
);
bc.clear_queued(&blocks[0].hash);
assert!(bc.blocks.is_empty());
assert!(bc.queued_blocks.is_empty());
}
}