1use crate::LOG_TARGET;
20use log::trace;
21use sc_network_common::sync::message;
22use sc_network_types::PeerId;
23use sp_arithmetic::traits::Saturating;
24use sp_runtime::traits::{Block as BlockT, NumberFor, One};
25use std::{
26 cmp,
27 collections::{BTreeMap, HashMap},
28 ops::Range,
29};
30
31#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct BlockData<B: BlockT> {
34 pub block: message::BlockData<B>,
36 pub origin: Option<PeerId>,
38}
39
40#[derive(Debug)]
41enum BlockRangeState<B: BlockT> {
42 Downloading { len: NumberFor<B>, downloading: u32 },
43 Complete(Vec<BlockData<B>>),
44 Queued { len: NumberFor<B> },
45}
46
47impl<B: BlockT> BlockRangeState<B> {
48 pub fn len(&self) -> NumberFor<B> {
49 match *self {
50 Self::Downloading { len, .. } => len,
51 Self::Complete(ref blocks) => (blocks.len() as u32).into(),
52 Self::Queued { len } => len,
53 }
54 }
55}
56
57#[derive(Default)]
59pub struct BlockCollection<B: BlockT> {
60 blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
62 peer_requests: HashMap<PeerId, NumberFor<B>>,
63 queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
66}
67
68impl<B: BlockT> BlockCollection<B> {
69 pub fn new() -> Self {
71 Self {
72 blocks: BTreeMap::new(),
73 peer_requests: HashMap::new(),
74 queued_blocks: HashMap::new(),
75 }
76 }
77
78 pub fn clear(&mut self) {
80 self.blocks.clear();
81 self.peer_requests.clear();
82 }
83
84 pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
86 if blocks.is_empty() {
87 return;
88 }
89
90 match self.blocks.get(&start) {
91 Some(&BlockRangeState::Downloading { .. }) => {
92 trace!(target: LOG_TARGET, "Inserting block data still marked as being downloaded: {}", start);
93 },
94 Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
95 trace!(target: LOG_TARGET, "Ignored block data already downloaded: {}", start);
96 return;
97 },
98 _ => (),
99 }
100
101 self.blocks.insert(
102 start,
103 BlockRangeState::Complete(
104 blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
105 ),
106 );
107 }
108
109 pub fn needed_blocks(
112 &mut self,
113 who: PeerId,
114 count: u32,
115 peer_best: NumberFor<B>,
116 common: NumberFor<B>,
117 max_parallel: u32,
118 max_ahead: u32,
119 ) -> Option<Range<NumberFor<B>>> {
120 if peer_best <= common {
121 return None;
123 }
124 let first_different = common + <NumberFor<B>>::one();
126 let count = (count as u32).into();
127 let (mut range, downloading) = {
128 let mut downloading_iter = self.blocks.iter().peekable();
130 let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
131 loop {
132 let next = downloading_iter.next();
133 break match (prev, next) {
134 (Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
139 if downloading < max_parallel && *start >= first_different =>
140 {
141 (*start..*start + *len, downloading)
142 },
143 (Some((start, r)), Some((next_start, _)))
146 if *start + r.len() < *next_start &&
147 *start + r.len() >= first_different =>
148 {
149 (*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0)
150 },
151 (Some((start, r)), None) if *start + r.len() >= first_different => {
154 (*start + r.len()..*start + r.len() + count, 0)
155 },
156 (None, None) => (first_different..first_different + count, 0),
159 (None, Some((start, _))) if *start > first_different => {
161 (first_different..cmp::min(first_different + count, *start), 0)
162 },
163 _ => {
165 prev = next;
166 continue;
167 },
168 };
169 }
170 };
171 if range.start > peer_best {
173 trace!(target: LOG_TARGET, "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
174 return None;
175 }
176
177 range.end = cmp::min(peer_best.saturating_add(One::one()), range.end);
178
179 if self
180 .blocks
181 .iter()
182 .next()
183 .map_or(false, |(n, _)| range.start > *n + max_ahead.into())
184 {
185 trace!(target: LOG_TARGET, "Too far ahead for peer {} ({})", who, range.start);
186 return None;
187 }
188
189 if range.end <= range.start {
190 debug_assert!(
191 false,
192 "Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
193 range, count, peer_best, common, self.blocks
194 );
195 trace!(
196 target: LOG_TARGET,
197 "Empty range for peer {who}: {range:?}, count={count}, peer_best={peer_best}, common={common}",
198 );
199 return None;
200 }
201
202 self.peer_requests.insert(who, range.start);
203 self.blocks.insert(
204 range.start,
205 BlockRangeState::Downloading {
206 len: range.end - range.start,
207 downloading: downloading + 1,
208 },
209 );
210
211 Some(range)
212 }
213
214 pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
220 let mut ready = Vec::new();
221
222 let mut prev = from;
223 for (&start, range_data) in &mut self.blocks {
224 if start > prev {
225 break;
226 }
227 let len = match range_data {
228 BlockRangeState::Complete(blocks) => {
229 let len = (blocks.len() as u32).into();
230 prev = start + len;
231 if let Some(BlockData { block, .. }) = blocks.first() {
232 self.queued_blocks
233 .insert(block.hash, (start, start + (blocks.len() as u32).into()));
234 }
235 ready.append(blocks);
237 len
238 },
239 BlockRangeState::Queued { .. } => continue,
240 _ => break,
241 };
242 *range_data = BlockRangeState::Queued { len };
243 }
244 trace!(target: LOG_TARGET, "{} blocks ready for import", ready.len());
245 ready
246 }
247
248 pub fn clear_queued(&mut self, hash: &B::Hash) {
249 if let Some((from, to)) = self.queued_blocks.remove(hash) {
250 let mut block_num = from;
251 while block_num < to {
252 self.blocks.remove(&block_num);
253 block_num += One::one();
254 }
255 trace!(target: LOG_TARGET, "Cleared blocks from {:?} to {:?}", from, to);
256 }
257 }
258
259 pub fn clear_peer_download(&mut self, who: &PeerId) {
260 if let Some(start) = self.peer_requests.remove(who) {
261 let remove = match self.blocks.get_mut(&start) {
262 Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
263 if *downloading > 1 =>
264 {
265 *downloading -= 1;
266 false
267 },
268 Some(&mut BlockRangeState::Downloading { .. }) => true,
269 _ => false,
270 };
271 if remove {
272 self.blocks.remove(&start);
273 }
274 }
275 }
276}
277
278#[cfg(test)]
279mod test {
280 use super::{BlockCollection, BlockData, BlockRangeState};
281 use sc_network_common::sync::message;
282 use sc_network_types::PeerId;
283 use sp_core::H256;
284 use sp_runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
285
286 type Block = RawBlock<TestXt<MockCallU64, ()>>;
287
288 fn is_empty(bc: &BlockCollection<Block>) -> bool {
289 bc.blocks.is_empty() && bc.peer_requests.is_empty()
290 }
291
292 fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
293 (0..n)
294 .map(|_| message::generic::BlockData {
295 hash: H256::random(),
296 header: None,
297 body: None,
298 indexed_body: None,
299 message_queue: None,
300 receipt: None,
301 justification: None,
302 justifications: None,
303 })
304 .collect()
305 }
306
307 #[test]
308 fn create_clear() {
309 let mut bc = BlockCollection::new();
310 assert!(is_empty(&bc));
311 bc.insert(1, generate_blocks(100), PeerId::random());
312 assert!(!is_empty(&bc));
313 bc.clear();
314 assert!(is_empty(&bc));
315 }
316
317 #[test]
318 fn insert_blocks() {
319 let mut bc = BlockCollection::new();
320 assert!(is_empty(&bc));
321 let peer0 = PeerId::random();
322 let peer1 = PeerId::random();
323 let peer2 = PeerId::random();
324
325 let blocks = generate_blocks(150);
326 assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
327 assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
328 assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
329
330 bc.clear_peer_download(&peer1);
331 bc.insert(41, blocks[41..81].to_vec(), peer1);
332 assert_eq!(bc.ready_blocks(1), vec![]);
333 assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
334 bc.clear_peer_download(&peer0);
335 bc.insert(1, blocks[1..11].to_vec(), peer0);
336
337 assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
338 assert_eq!(
339 bc.ready_blocks(1),
340 blocks[1..11]
341 .iter()
342 .map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
343 .collect::<Vec<_>>()
344 );
345
346 bc.clear_peer_download(&peer0);
347 bc.insert(11, blocks[11..41].to_vec(), peer0);
348
349 let ready = bc.ready_blocks(12);
350 assert_eq!(
351 ready[..30],
352 blocks[11..41]
353 .iter()
354 .map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
355 .collect::<Vec<_>>()[..]
356 );
357 assert_eq!(
358 ready[30..],
359 blocks[41..81]
360 .iter()
361 .map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
362 .collect::<Vec<_>>()[..]
363 );
364
365 bc.clear_peer_download(&peer2);
366 assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
367 bc.clear_peer_download(&peer2);
368 bc.insert(81, blocks[81..121].to_vec(), peer2);
369 bc.clear_peer_download(&peer1);
370 bc.insert(121, blocks[121..150].to_vec(), peer1);
371
372 assert_eq!(bc.ready_blocks(80), vec![]);
373 let ready = bc.ready_blocks(81);
374 assert_eq!(
375 ready[..40],
376 blocks[81..121]
377 .iter()
378 .map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
379 .collect::<Vec<_>>()[..]
380 );
381 assert_eq!(
382 ready[40..],
383 blocks[121..150]
384 .iter()
385 .map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
386 .collect::<Vec<_>>()[..]
387 );
388 }
389
390 #[test]
391 fn large_gap() {
392 let mut bc: BlockCollection<Block> = BlockCollection::new();
393 bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
394 let blocks = generate_blocks(10)
395 .into_iter()
396 .map(|b| BlockData { block: b, origin: None })
397 .collect();
398 bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
399
400 let peer0 = PeerId::random();
401 assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), Some(1..100));
402 assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), None); assert_eq!(
404 bc.needed_blocks(peer0, 128, 10000, 0, 1, 200000),
405 Some(100 + 128..100 + 128 + 128)
406 );
407 }
408
409 #[test]
410 fn no_duplicate_requests_on_fork() {
411 let mut bc = BlockCollection::new();
412 assert!(is_empty(&bc));
413 let peer = PeerId::random();
414
415 let blocks = generate_blocks(10);
416
417 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
419
420 bc.clear_peer_download(&peer);
422 bc.insert(40, blocks[..5].to_vec(), peer);
423
424 let ready = bc.ready_blocks(48);
426 assert_eq!(
427 ready,
428 blocks[..5]
429 .iter()
430 .map(|b| BlockData { block: b.clone(), origin: Some(peer) })
431 .collect::<Vec<_>>()
432 );
433
434 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
435 }
436
437 #[test]
438 fn clear_queued_subsequent_ranges() {
439 let mut bc = BlockCollection::new();
440 assert!(is_empty(&bc));
441 let peer = PeerId::random();
442
443 let blocks = generate_blocks(10);
444
445 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
447 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
448
449 bc.clear_peer_download(&peer);
451 bc.insert(40, blocks.to_vec(), peer);
452
453 let ready = bc.ready_blocks(1000);
455 assert_eq!(
456 ready,
457 blocks
458 .iter()
459 .map(|b| BlockData { block: b.clone(), origin: Some(peer) })
460 .collect::<Vec<_>>()
461 );
462
463 bc.clear_queued(&blocks[0].hash);
464 assert!(bc.blocks.is_empty());
465 assert!(bc.queued_blocks.is_empty());
466 }
467
468 #[test]
469 fn downloaded_range_is_requested_from_max_parallel_peers() {
470 let mut bc = BlockCollection::new();
471 assert!(is_empty(&bc));
472
473 let count = 5;
474 let max_parallel = 2;
476 let max_ahead = 200;
477
478 let peer1 = PeerId::random();
479 let peer2 = PeerId::random();
480 let peer3 = PeerId::random();
481
482 let best = 100;
484 let common = 10;
485
486 assert_eq!(
487 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
488 Some(11..16)
489 );
490 assert_eq!(
491 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
492 Some(11..16)
493 );
494 assert_eq!(
495 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
496 Some(16..21)
497 );
498 }
499 #[test]
500 fn downloaded_range_not_requested_from_peers_with_higher_common_number() {
501 let mut bc = BlockCollection::new();
508 assert!(is_empty(&bc));
509
510 let count = 5;
511 let max_parallel = 2;
512 let max_ahead = 200;
513
514 let peer1 = PeerId::random();
515 let peer1_best = 20;
516 let peer1_common = 10;
517
518 let peer2 = PeerId::random();
520 let peer2_best = 20;
521 let peer2_common = 11; assert_eq!(
524 bc.needed_blocks(peer1, count, peer1_best, peer1_common, max_parallel, max_ahead),
525 Some(11..16),
526 );
527 assert_eq!(
528 bc.needed_blocks(peer2, count, peer2_best, peer2_common, max_parallel, max_ahead),
529 Some(16..21),
530 );
531 }
532
533 #[test]
534 fn gap_above_common_number_requested() {
535 let mut bc = BlockCollection::new();
536 assert!(is_empty(&bc));
537
538 let count = 5;
539 let best = 30;
540 let max_parallel = 1;
543 let max_ahead = 200;
544
545 let peer1 = PeerId::random();
546 let peer2 = PeerId::random();
547 let peer3 = PeerId::random();
548
549 let common = 10;
550 assert_eq!(
551 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
552 Some(11..16),
553 );
554 assert_eq!(
555 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
556 Some(16..21),
557 );
558 assert_eq!(
559 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
560 Some(21..26),
561 );
562
563 bc.clear_peer_download(&peer2);
566
567 assert_eq!(
569 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
570 Some(16..21),
571 );
572 }
573
574 #[test]
575 fn gap_below_common_number_not_requested() {
576 let mut bc = BlockCollection::new();
577 assert!(is_empty(&bc));
578
579 let count = 5;
580 let best = 30;
581 let max_parallel = 1;
584 let max_ahead = 200;
585
586 let peer1 = PeerId::random();
587 let peer2 = PeerId::random();
588 let peer3 = PeerId::random();
589
590 let common = 10;
591 assert_eq!(
592 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
593 Some(11..16),
594 );
595 assert_eq!(
596 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
597 Some(16..21),
598 );
599 assert_eq!(
600 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
601 Some(21..26),
602 );
603
604 bc.clear_peer_download(&peer2);
607
608 let common = 23;
610 assert_eq!(
611 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
612 Some(26..31), );
614 }
615
616 #[test]
617 fn range_at_the_end_above_common_number_requested() {
618 let mut bc = BlockCollection::new();
619 assert!(is_empty(&bc));
620
621 let count = 5;
622 let best = 30;
623 let max_parallel = 1;
624 let max_ahead = 200;
625
626 let peer1 = PeerId::random();
627 let peer2 = PeerId::random();
628
629 let common = 10;
630 assert_eq!(
631 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
632 Some(11..16),
633 );
634 assert_eq!(
635 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
636 Some(16..21),
637 );
638 }
639
640 #[test]
641 fn range_at_the_end_below_common_number_not_requested() {
642 let mut bc = BlockCollection::new();
643 assert!(is_empty(&bc));
644
645 let count = 5;
646 let best = 30;
647 let max_parallel = 1;
648 let max_ahead = 200;
649
650 let peer1 = PeerId::random();
651 let peer2 = PeerId::random();
652
653 let common = 10;
654 assert_eq!(
655 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
656 Some(11..16),
657 );
658
659 let common = 20;
660 assert_eq!(
661 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
662 Some(21..26), );
664 }
665}