referrerpolicy=no-referrer-when-downgrade

sc_network_sync/
blocks.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::LOG_TARGET;
20use log::trace;
21use sc_network_common::sync::message;
22use sc_network_types::PeerId;
23use sp_arithmetic::traits::Saturating;
24use sp_runtime::traits::{Block as BlockT, NumberFor, One};
25use std::{
26	cmp,
27	collections::{BTreeMap, HashMap},
28	ops::Range,
29};
30
31/// Block data with origin.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct BlockData<B: BlockT> {
34	/// The Block Message from the wire
35	pub block: message::BlockData<B>,
36	/// The peer, we received this from
37	pub origin: Option<PeerId>,
38}
39
40#[derive(Debug)]
41enum BlockRangeState<B: BlockT> {
42	Downloading { len: NumberFor<B>, downloading: u32 },
43	Complete(Vec<BlockData<B>>),
44	Queued { len: NumberFor<B> },
45}
46
47impl<B: BlockT> BlockRangeState<B> {
48	pub fn len(&self) -> NumberFor<B> {
49		match *self {
50			Self::Downloading { len, .. } => len,
51			Self::Complete(ref blocks) => (blocks.len() as u32).into(),
52			Self::Queued { len } => len,
53		}
54	}
55}
56
57/// A collection of blocks being downloaded.
58#[derive(Default)]
59pub struct BlockCollection<B: BlockT> {
60	/// Downloaded blocks.
61	blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
62	peer_requests: HashMap<PeerId, NumberFor<B>>,
63	/// Block ranges downloaded and queued for import.
64	/// Maps start_hash => (start_num, end_num).
65	queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
66}
67
68impl<B: BlockT> BlockCollection<B> {
69	/// Create a new instance.
70	pub fn new() -> Self {
71		Self {
72			blocks: BTreeMap::new(),
73			peer_requests: HashMap::new(),
74			queued_blocks: HashMap::new(),
75		}
76	}
77
78	/// Clear everything.
79	pub fn clear(&mut self) {
80		self.blocks.clear();
81		self.peer_requests.clear();
82	}
83
84	/// Insert a set of blocks into collection.
85	pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
86		if blocks.is_empty() {
87			return;
88		}
89
90		match self.blocks.get(&start) {
91			Some(&BlockRangeState::Downloading { .. }) => {
92				trace!(target: LOG_TARGET, "Inserting block data still marked as being downloaded: {}", start);
93			},
94			Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
95				trace!(target: LOG_TARGET, "Ignored block data already downloaded: {}", start);
96				return;
97			},
98			_ => (),
99		}
100
101		self.blocks.insert(
102			start,
103			BlockRangeState::Complete(
104				blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
105			),
106		);
107	}
108
109	/// Returns a set of block hashes that require a header download. The returned set is marked as
110	/// being downloaded.
111	pub fn needed_blocks(
112		&mut self,
113		who: PeerId,
114		count: u32,
115		peer_best: NumberFor<B>,
116		common: NumberFor<B>,
117		max_parallel: u32,
118		max_ahead: u32,
119	) -> Option<Range<NumberFor<B>>> {
120		if peer_best <= common {
121			// Bail out early
122			return None;
123		}
124		// First block number that we need to download
125		let first_different = common + <NumberFor<B>>::one();
126		let count = (count as u32).into();
127		let (mut range, downloading) = {
128			// Iterate through the ranges in `self.blocks` looking for a range to download
129			let mut downloading_iter = self.blocks.iter().peekable();
130			let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
131			loop {
132				let next = downloading_iter.next();
133				break match (prev, next) {
134					// If we are already downloading this range, request it from `max_parallel`
135					// peers (`max_parallel = 5` by default).
136					// Do not request already downloading range from peers with common number above
137					// the range start.
138					(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
139						if downloading < max_parallel && *start >= first_different =>
140					{
141						(*start..*start + *len, downloading)
142					},
143					// If there is a gap between ranges requested, download this gap unless the peer
144					// has common number above the gap start
145					(Some((start, r)), Some((next_start, _)))
146						if *start + r.len() < *next_start &&
147							*start + r.len() >= first_different =>
148					{
149						(*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0)
150					},
151					// Download `count` blocks after the last range requested unless the peer
152					// has common number above this new range
153					(Some((start, r)), None) if *start + r.len() >= first_different => {
154						(*start + r.len()..*start + r.len() + count, 0)
155					},
156					// If there are no ranges currently requested, download `count` blocks after
157					// `common` number
158					(None, None) => (first_different..first_different + count, 0),
159					// If the first range starts above `common + 1`, download the gap at the start
160					(None, Some((start, _))) if *start > first_different => {
161						(first_different..cmp::min(first_different + count, *start), 0)
162					},
163					// Move on to the next range pair
164					_ => {
165						prev = next;
166						continue;
167					},
168				};
169			}
170		};
171		// crop to peers best
172		if range.start > peer_best {
173			trace!(target: LOG_TARGET, "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
174			return None;
175		}
176
177		range.end = cmp::min(peer_best.saturating_add(One::one()), range.end);
178
179		if self
180			.blocks
181			.iter()
182			.next()
183			.map_or(false, |(n, _)| range.start > *n + max_ahead.into())
184		{
185			trace!(target: LOG_TARGET, "Too far ahead for peer {} ({})", who, range.start);
186			return None;
187		}
188
189		if range.end <= range.start {
190			debug_assert!(
191				false,
192				"Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
193				range, count, peer_best, common, self.blocks
194			);
195			trace!(
196				target: LOG_TARGET,
197				"Empty range for peer {who}: {range:?}, count={count}, peer_best={peer_best}, common={common}",
198			);
199			return None;
200		}
201
202		self.peer_requests.insert(who, range.start);
203		self.blocks.insert(
204			range.start,
205			BlockRangeState::Downloading {
206				len: range.end - range.start,
207				downloading: downloading + 1,
208			},
209		);
210
211		Some(range)
212	}
213
214	/// Get a valid chain of blocks ordered in descending order and ready for importing into
215	/// the blockchain.
216	/// `from` is the maximum block number for the start of the range that we are interested in.
217	/// The function will return empty Vec if the first block ready is higher than `from`.
218	/// For each returned block hash `clear_queued` must be called at some later stage.
219	pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
220		let mut ready = Vec::new();
221
222		let mut prev = from;
223		for (&start, range_data) in &mut self.blocks {
224			if start > prev {
225				break;
226			}
227			let len = match range_data {
228				BlockRangeState::Complete(blocks) => {
229					let len = (blocks.len() as u32).into();
230					prev = start + len;
231					if let Some(BlockData { block, .. }) = blocks.first() {
232						self.queued_blocks
233							.insert(block.hash, (start, start + (blocks.len() as u32).into()));
234					}
235					// Remove all elements from `blocks` and add them to `ready`
236					ready.append(blocks);
237					len
238				},
239				BlockRangeState::Queued { .. } => continue,
240				_ => break,
241			};
242			*range_data = BlockRangeState::Queued { len };
243		}
244		trace!(target: LOG_TARGET, "{} blocks ready for import", ready.len());
245		ready
246	}
247
248	pub fn clear_queued(&mut self, hash: &B::Hash) {
249		if let Some((from, to)) = self.queued_blocks.remove(hash) {
250			let mut block_num = from;
251			while block_num < to {
252				self.blocks.remove(&block_num);
253				block_num += One::one();
254			}
255			trace!(target: LOG_TARGET, "Cleared blocks from {:?} to {:?}", from, to);
256		}
257	}
258
259	pub fn clear_peer_download(&mut self, who: &PeerId) {
260		if let Some(start) = self.peer_requests.remove(who) {
261			let remove = match self.blocks.get_mut(&start) {
262				Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
263					if *downloading > 1 =>
264				{
265					*downloading -= 1;
266					false
267				},
268				Some(&mut BlockRangeState::Downloading { .. }) => true,
269				_ => false,
270			};
271			if remove {
272				self.blocks.remove(&start);
273			}
274		}
275	}
276}
277
278#[cfg(test)]
279mod test {
280	use super::{BlockCollection, BlockData, BlockRangeState};
281	use sc_network_common::sync::message;
282	use sc_network_types::PeerId;
283	use sp_core::H256;
284	use sp_runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
285
286	type Block = RawBlock<TestXt<MockCallU64, ()>>;
287
288	fn is_empty(bc: &BlockCollection<Block>) -> bool {
289		bc.blocks.is_empty() && bc.peer_requests.is_empty()
290	}
291
292	fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
293		(0..n)
294			.map(|_| message::generic::BlockData {
295				hash: H256::random(),
296				header: None,
297				body: None,
298				indexed_body: None,
299				message_queue: None,
300				receipt: None,
301				justification: None,
302				justifications: None,
303			})
304			.collect()
305	}
306
307	#[test]
308	fn create_clear() {
309		let mut bc = BlockCollection::new();
310		assert!(is_empty(&bc));
311		bc.insert(1, generate_blocks(100), PeerId::random());
312		assert!(!is_empty(&bc));
313		bc.clear();
314		assert!(is_empty(&bc));
315	}
316
317	#[test]
318	fn insert_blocks() {
319		let mut bc = BlockCollection::new();
320		assert!(is_empty(&bc));
321		let peer0 = PeerId::random();
322		let peer1 = PeerId::random();
323		let peer2 = PeerId::random();
324
325		let blocks = generate_blocks(150);
326		assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
327		assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
328		assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
329
330		bc.clear_peer_download(&peer1);
331		bc.insert(41, blocks[41..81].to_vec(), peer1);
332		assert_eq!(bc.ready_blocks(1), vec![]);
333		assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
334		bc.clear_peer_download(&peer0);
335		bc.insert(1, blocks[1..11].to_vec(), peer0);
336
337		assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
338		assert_eq!(
339			bc.ready_blocks(1),
340			blocks[1..11]
341				.iter()
342				.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
343				.collect::<Vec<_>>()
344		);
345
346		bc.clear_peer_download(&peer0);
347		bc.insert(11, blocks[11..41].to_vec(), peer0);
348
349		let ready = bc.ready_blocks(12);
350		assert_eq!(
351			ready[..30],
352			blocks[11..41]
353				.iter()
354				.map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
355				.collect::<Vec<_>>()[..]
356		);
357		assert_eq!(
358			ready[30..],
359			blocks[41..81]
360				.iter()
361				.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
362				.collect::<Vec<_>>()[..]
363		);
364
365		bc.clear_peer_download(&peer2);
366		assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
367		bc.clear_peer_download(&peer2);
368		bc.insert(81, blocks[81..121].to_vec(), peer2);
369		bc.clear_peer_download(&peer1);
370		bc.insert(121, blocks[121..150].to_vec(), peer1);
371
372		assert_eq!(bc.ready_blocks(80), vec![]);
373		let ready = bc.ready_blocks(81);
374		assert_eq!(
375			ready[..40],
376			blocks[81..121]
377				.iter()
378				.map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
379				.collect::<Vec<_>>()[..]
380		);
381		assert_eq!(
382			ready[40..],
383			blocks[121..150]
384				.iter()
385				.map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
386				.collect::<Vec<_>>()[..]
387		);
388	}
389
390	#[test]
391	fn large_gap() {
392		let mut bc: BlockCollection<Block> = BlockCollection::new();
393		bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
394		let blocks = generate_blocks(10)
395			.into_iter()
396			.map(|b| BlockData { block: b, origin: None })
397			.collect();
398		bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
399
400		let peer0 = PeerId::random();
401		assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), Some(1..100));
402		assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), None); // too far ahead
403		assert_eq!(
404			bc.needed_blocks(peer0, 128, 10000, 0, 1, 200000),
405			Some(100 + 128..100 + 128 + 128)
406		);
407	}
408
409	#[test]
410	fn no_duplicate_requests_on_fork() {
411		let mut bc = BlockCollection::new();
412		assert!(is_empty(&bc));
413		let peer = PeerId::random();
414
415		let blocks = generate_blocks(10);
416
417		// count = 5, peer_best = 50, common = 39, max_parallel = 0, max_ahead = 200
418		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
419
420		// got a response on the request for `40..45`
421		bc.clear_peer_download(&peer);
422		bc.insert(40, blocks[..5].to_vec(), peer);
423
424		// our "node" started on a fork, with its current best = 47, which is > common
425		let ready = bc.ready_blocks(48);
426		assert_eq!(
427			ready,
428			blocks[..5]
429				.iter()
430				.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
431				.collect::<Vec<_>>()
432		);
433
434		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
435	}
436
437	#[test]
438	fn clear_queued_subsequent_ranges() {
439		let mut bc = BlockCollection::new();
440		assert!(is_empty(&bc));
441		let peer = PeerId::random();
442
443		let blocks = generate_blocks(10);
444
445		// Request 2 ranges
446		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
447		assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
448
449		// got a response on the request for `40..50`
450		bc.clear_peer_download(&peer);
451		bc.insert(40, blocks.to_vec(), peer);
452
453		// request any blocks starting from 1000 or lower.
454		let ready = bc.ready_blocks(1000);
455		assert_eq!(
456			ready,
457			blocks
458				.iter()
459				.map(|b| BlockData { block: b.clone(), origin: Some(peer) })
460				.collect::<Vec<_>>()
461		);
462
463		bc.clear_queued(&blocks[0].hash);
464		assert!(bc.blocks.is_empty());
465		assert!(bc.queued_blocks.is_empty());
466	}
467
468	#[test]
469	fn downloaded_range_is_requested_from_max_parallel_peers() {
470		let mut bc = BlockCollection::new();
471		assert!(is_empty(&bc));
472
473		let count = 5;
474		// identical ranges requested from 2 peers
475		let max_parallel = 2;
476		let max_ahead = 200;
477
478		let peer1 = PeerId::random();
479		let peer2 = PeerId::random();
480		let peer3 = PeerId::random();
481
482		// common for all peers
483		let best = 100;
484		let common = 10;
485
486		assert_eq!(
487			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
488			Some(11..16)
489		);
490		assert_eq!(
491			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
492			Some(11..16)
493		);
494		assert_eq!(
495			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
496			Some(16..21)
497		);
498	}
499	#[test]
500	fn downloaded_range_not_requested_from_peers_with_higher_common_number() {
501		// A peer connects with a common number falling behind our best number
502		// (either a fork or lagging behind).
503		// We request a range from this peer starting at its common number + 1.
504		// Even though we have less than `max_parallel` downloads, we do not request
505		// this range from peers with a common number above the start of this range.
506
507		let mut bc = BlockCollection::new();
508		assert!(is_empty(&bc));
509
510		let count = 5;
511		let max_parallel = 2;
512		let max_ahead = 200;
513
514		let peer1 = PeerId::random();
515		let peer1_best = 20;
516		let peer1_common = 10;
517
518		// `peer2` has first different above the start of the range downloaded from `peer1`
519		let peer2 = PeerId::random();
520		let peer2_best = 20;
521		let peer2_common = 11; // first_different = 12
522
523		assert_eq!(
524			bc.needed_blocks(peer1, count, peer1_best, peer1_common, max_parallel, max_ahead),
525			Some(11..16),
526		);
527		assert_eq!(
528			bc.needed_blocks(peer2, count, peer2_best, peer2_common, max_parallel, max_ahead),
529			Some(16..21),
530		);
531	}
532
533	#[test]
534	fn gap_above_common_number_requested() {
535		let mut bc = BlockCollection::new();
536		assert!(is_empty(&bc));
537
538		let count = 5;
539		let best = 30;
540		// We need at least 3 ranges requested to have a gap, so to minimize the number of peers
541		// set `max_parallel = 1`
542		let max_parallel = 1;
543		let max_ahead = 200;
544
545		let peer1 = PeerId::random();
546		let peer2 = PeerId::random();
547		let peer3 = PeerId::random();
548
549		let common = 10;
550		assert_eq!(
551			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
552			Some(11..16),
553		);
554		assert_eq!(
555			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
556			Some(16..21),
557		);
558		assert_eq!(
559			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
560			Some(21..26),
561		);
562
563		// For some reason there is now a gap at 16..21. We just disconnect `peer2`, but it might
564		// also happen that 16..21 received first and got imported if our best is actually >= 15.
565		bc.clear_peer_download(&peer2);
566
567		// Some peer connects with common number below the gap. The gap is requested from it.
568		assert_eq!(
569			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
570			Some(16..21),
571		);
572	}
573
574	#[test]
575	fn gap_below_common_number_not_requested() {
576		let mut bc = BlockCollection::new();
577		assert!(is_empty(&bc));
578
579		let count = 5;
580		let best = 30;
581		// We need at least 3 ranges requested to have a gap, so to minimize the number of peers
582		// set `max_parallel = 1`
583		let max_parallel = 1;
584		let max_ahead = 200;
585
586		let peer1 = PeerId::random();
587		let peer2 = PeerId::random();
588		let peer3 = PeerId::random();
589
590		let common = 10;
591		assert_eq!(
592			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
593			Some(11..16),
594		);
595		assert_eq!(
596			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
597			Some(16..21),
598		);
599		assert_eq!(
600			bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
601			Some(21..26),
602		);
603
604		// For some reason there is now a gap at 16..21. We just disconnect `peer2`, but it might
605		// also happen that 16..21 received first and got imported if our best is actually >= 15.
606		bc.clear_peer_download(&peer2);
607
608		// Some peer connects with common number above the gap. The gap is not requested from it.
609		let common = 23;
610		assert_eq!(
611			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
612			Some(26..31), // not 16..21
613		);
614	}
615
616	#[test]
617	fn range_at_the_end_above_common_number_requested() {
618		let mut bc = BlockCollection::new();
619		assert!(is_empty(&bc));
620
621		let count = 5;
622		let best = 30;
623		let max_parallel = 1;
624		let max_ahead = 200;
625
626		let peer1 = PeerId::random();
627		let peer2 = PeerId::random();
628
629		let common = 10;
630		assert_eq!(
631			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
632			Some(11..16),
633		);
634		assert_eq!(
635			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
636			Some(16..21),
637		);
638	}
639
640	#[test]
641	fn range_at_the_end_below_common_number_not_requested() {
642		let mut bc = BlockCollection::new();
643		assert!(is_empty(&bc));
644
645		let count = 5;
646		let best = 30;
647		let max_parallel = 1;
648		let max_ahead = 200;
649
650		let peer1 = PeerId::random();
651		let peer2 = PeerId::random();
652
653		let common = 10;
654		assert_eq!(
655			bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
656			Some(11..16),
657		);
658
659		let common = 20;
660		assert_eq!(
661			bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
662			Some(21..26), // not 16..21
663		);
664	}
665}