referrerpolicy=no-referrer-when-downgrade

polkadot_node_subsystem_util/
determine_new_blocks.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! A utility for fetching all unknown blocks based on a new chain-head hash.
18
19use futures::{channel::oneshot, prelude::*};
20use polkadot_node_subsystem::{messages::ChainApiMessage, SubsystemSender};
21use polkadot_primitives::{BlockNumber, Hash, Header};
22
23/// Given a new chain-head hash, this determines the hashes of all new blocks we should track
24/// metadata for, given this head.
25///
26/// This is guaranteed to be a subset of the (inclusive) ancestry of `head` determined as all
27/// blocks above the lower bound or above the highest known block, whichever is higher.
28/// This is formatted in descending order by block height.
29///
30/// An implication of this is that if `head` itself is known or not above the lower bound,
31/// then the returned list will be empty.
32///
33/// This may be somewhat expensive when first recovering from major sync.
34pub async fn determine_new_blocks<E, Sender>(
35	sender: &mut Sender,
36	is_known: impl Fn(&Hash) -> Result<bool, E>,
37	head: Hash,
38	header: &Header,
39	lower_bound_number: BlockNumber,
40) -> Result<Vec<(Hash, Header)>, E>
41where
42	Sender: SubsystemSender<ChainApiMessage>,
43{
44	const ANCESTRY_STEP: usize = 4;
45
46	let min_block_needed = lower_bound_number + 1;
47
48	// Early exit if the block is in the DB or too early.
49	{
50		let already_known = is_known(&head)?;
51
52		let before_relevant = header.number < min_block_needed;
53
54		if already_known || before_relevant {
55			return Ok(Vec::new())
56		}
57	}
58
59	let mut ancestry = vec![(head, header.clone())];
60
61	// Early exit if the parent hash is in the DB or no further blocks
62	// are needed.
63	if is_known(&header.parent_hash)? || header.number == min_block_needed {
64		return Ok(ancestry)
65	}
66
67	'outer: loop {
68		let (last_hash, last_header) = ancestry
69			.last()
70			.expect("ancestry has length 1 at initialization and is only added to; qed");
71
72		assert!(
73			last_header.number > min_block_needed,
74			"Loop invariant: the last block in ancestry is checked to be \
75			above the minimum before the loop, and at the end of each iteration; \
76			qed"
77		);
78
79		let (tx, rx) = oneshot::channel();
80
81		// This is always non-zero as determined by the loop invariant
82		// above.
83		let ancestry_step =
84			std::cmp::min(ANCESTRY_STEP, (last_header.number - min_block_needed) as usize);
85
86		let batch_hashes = if ancestry_step == 1 {
87			vec![last_header.parent_hash]
88		} else {
89			sender
90				.send_message(
91					ChainApiMessage::Ancestors {
92						hash: *last_hash,
93						k: ancestry_step,
94						response_channel: tx,
95					}
96					.into(),
97				)
98				.await;
99
100			// Continue past these errors.
101			match rx.await {
102				Err(_) | Ok(Err(_)) => break 'outer,
103				Ok(Ok(ancestors)) => ancestors,
104			}
105		};
106
107		let batch_headers = {
108			let (batch_senders, batch_receivers) = (0..batch_hashes.len())
109				.map(|_| oneshot::channel())
110				.unzip::<_, _, Vec<_>, Vec<_>>();
111
112			for (hash, batched_sender) in batch_hashes.iter().cloned().zip(batch_senders) {
113				sender
114					.send_message(ChainApiMessage::BlockHeader(hash, batched_sender).into())
115					.await;
116			}
117
118			let mut requests = futures::stream::FuturesOrdered::new();
119			batch_receivers
120				.into_iter()
121				.map(|rx| async move {
122					match rx.await {
123						Err(_) | Ok(Err(_)) => None,
124						Ok(Ok(h)) => h,
125					}
126				})
127				.for_each(|x| requests.push_back(x));
128
129			let batch_headers: Vec<_> =
130				requests.flat_map(|x: Option<Header>| stream::iter(x)).collect().await;
131
132			// Any failed header fetch of the batch will yield a `None` result that will
133			// be skipped. Any failure at this stage means we'll just ignore those blocks
134			// as the chain DB has failed us.
135			if batch_headers.len() != batch_hashes.len() {
136				break 'outer
137			}
138			batch_headers
139		};
140
141		for (hash, header) in batch_hashes.into_iter().zip(batch_headers) {
142			let is_known = is_known(&hash)?;
143
144			let is_relevant = header.number >= min_block_needed;
145			let is_terminating = header.number == min_block_needed;
146
147			if is_known || !is_relevant {
148				break 'outer
149			}
150
151			ancestry.push((hash, header));
152
153			if is_terminating {
154				break 'outer
155			}
156		}
157	}
158
159	Ok(ancestry)
160}
161
162#[cfg(test)]
163mod tests {
164	use super::*;
165	use assert_matches::assert_matches;
166	use polkadot_node_subsystem_test_helpers::make_subsystem_context;
167	use polkadot_overseer::{AllMessages, SubsystemContext};
168	use sp_core::testing::TaskExecutor;
169	use std::collections::{HashMap, HashSet};
170
171	#[derive(Default)]
172	struct TestKnownBlocks {
173		blocks: HashSet<Hash>,
174	}
175
176	impl TestKnownBlocks {
177		fn insert(&mut self, hash: Hash) {
178			self.blocks.insert(hash);
179		}
180
181		fn is_known(&self, hash: &Hash) -> Result<bool, ()> {
182			Ok(self.blocks.contains(hash))
183		}
184	}
185
186	#[derive(Clone)]
187	struct TestChain {
188		start_number: BlockNumber,
189		headers: Vec<Header>,
190		numbers: HashMap<Hash, BlockNumber>,
191	}
192
193	impl TestChain {
194		fn new(start: BlockNumber, len: usize) -> Self {
195			assert!(len > 0, "len must be at least 1");
196
197			let base = Header {
198				digest: Default::default(),
199				extrinsics_root: Default::default(),
200				number: start,
201				state_root: Default::default(),
202				parent_hash: Default::default(),
203			};
204
205			let base_hash = base.hash();
206
207			let mut chain = TestChain {
208				start_number: start,
209				headers: vec![base],
210				numbers: vec![(base_hash, start)].into_iter().collect(),
211			};
212
213			for _ in 1..len {
214				chain.grow()
215			}
216
217			chain
218		}
219
220		fn grow(&mut self) {
221			let next = {
222				let last = self.headers.last().unwrap();
223				Header {
224					digest: Default::default(),
225					extrinsics_root: Default::default(),
226					number: last.number + 1,
227					state_root: Default::default(),
228					parent_hash: last.hash(),
229				}
230			};
231
232			self.numbers.insert(next.hash(), next.number);
233			self.headers.push(next);
234		}
235
236		fn header_by_number(&self, number: BlockNumber) -> Option<&Header> {
237			if number < self.start_number {
238				None
239			} else {
240				self.headers.get((number - self.start_number) as usize)
241			}
242		}
243
244		fn header_by_hash(&self, hash: &Hash) -> Option<&Header> {
245			self.numbers.get(hash).and_then(|n| self.header_by_number(*n))
246		}
247
248		fn hash_by_number(&self, number: BlockNumber) -> Option<Hash> {
249			self.header_by_number(number).map(|h| h.hash())
250		}
251
252		fn ancestry(&self, hash: &Hash, k: BlockNumber) -> Vec<Hash> {
253			let n = match self.numbers.get(hash) {
254				None => return Vec::new(),
255				Some(&n) => n,
256			};
257
258			(0..k)
259				.map(|i| i + 1)
260				.filter_map(|i| self.header_by_number(n - i))
261				.map(|h| h.hash())
262				.collect()
263		}
264	}
265
266	#[test]
267	fn determine_new_blocks_back_to_lower_bound() {
268		let pool = TaskExecutor::new();
269		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
270
271		let known = TestKnownBlocks::default();
272
273		let chain = TestChain::new(10, 9);
274
275		let head = chain.header_by_number(18).unwrap().clone();
276		let head_hash = head.hash();
277		let lower_bound_number = 12;
278
279		// Finalized block should be omitted. The head provided to `determine_new_blocks`
280		// should be included.
281		let expected_ancestry = (13..=18)
282			.map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
283			.rev()
284			.collect::<Vec<_>>();
285
286		let test_fut = Box::pin(async move {
287			let ancestry = determine_new_blocks(
288				ctx.sender(),
289				|h| known.is_known(h),
290				head_hash,
291				&head,
292				lower_bound_number,
293			)
294			.await
295			.unwrap();
296
297			assert_eq!(ancestry, expected_ancestry);
298		});
299
300		let aux_fut = Box::pin(async move {
301			assert_matches!(
302				handle.recv().await,
303				AllMessages::ChainApi(ChainApiMessage::Ancestors {
304					hash: h,
305					k,
306					response_channel: tx,
307				}) => {
308					assert_eq!(h, head_hash);
309					assert_eq!(k, 4);
310					let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
311				}
312			);
313
314			for _ in 0u32..4 {
315				assert_matches!(
316					handle.recv().await,
317					AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
318						let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
319					}
320				);
321			}
322
323			assert_matches!(
324				handle.recv().await,
325				AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
326					assert_eq!(h, chain.hash_by_number(13).unwrap());
327					let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
328				}
329			);
330		});
331
332		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
333	}
334
335	#[test]
336	fn determine_new_blocks_back_to_known() {
337		let pool = TaskExecutor::new();
338		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
339
340		let mut known = TestKnownBlocks::default();
341
342		let chain = TestChain::new(10, 9);
343
344		let head = chain.header_by_number(18).unwrap().clone();
345		let head_hash = head.hash();
346		let lower_bound_number = 12;
347		let known_number = 15;
348		let known_hash = chain.hash_by_number(known_number).unwrap();
349
350		known.insert(known_hash);
351
352		// Known block should be omitted. The head provided to `determine_new_blocks`
353		// should be included.
354		let expected_ancestry = (16..=18)
355			.map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
356			.rev()
357			.collect::<Vec<_>>();
358
359		let test_fut = Box::pin(async move {
360			let ancestry = determine_new_blocks(
361				ctx.sender(),
362				|h| known.is_known(h),
363				head_hash,
364				&head,
365				lower_bound_number,
366			)
367			.await
368			.unwrap();
369
370			assert_eq!(ancestry, expected_ancestry);
371		});
372
373		let aux_fut = Box::pin(async move {
374			assert_matches!(
375				handle.recv().await,
376				AllMessages::ChainApi(ChainApiMessage::Ancestors {
377					hash: h,
378					k,
379					response_channel: tx,
380				}) => {
381					assert_eq!(h, head_hash);
382					assert_eq!(k, 4);
383					let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
384				}
385			);
386
387			for _ in 0u32..4 {
388				assert_matches!(
389					handle.recv().await,
390					AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
391						let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
392					}
393				);
394			}
395		});
396
397		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
398	}
399
400	#[test]
401	fn determine_new_blocks_already_known_is_empty() {
402		let pool = TaskExecutor::new();
403		let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
404
405		let mut known = TestKnownBlocks::default();
406
407		let chain = TestChain::new(10, 9);
408
409		let head = chain.header_by_number(18).unwrap().clone();
410		let head_hash = head.hash();
411		let lower_bound_number = 0;
412
413		known.insert(head_hash);
414
415		// Known block should be omitted.
416		let expected_ancestry = Vec::new();
417
418		let test_fut = Box::pin(async move {
419			let ancestry = determine_new_blocks(
420				ctx.sender(),
421				|h| known.is_known(h),
422				head_hash,
423				&head,
424				lower_bound_number,
425			)
426			.await
427			.unwrap();
428
429			assert_eq!(ancestry, expected_ancestry);
430		});
431
432		futures::executor::block_on(test_fut);
433	}
434
435	#[test]
436	fn determine_new_blocks_parent_known_is_fast() {
437		let pool = TaskExecutor::new();
438		let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
439
440		let mut known = TestKnownBlocks::default();
441
442		let chain = TestChain::new(10, 9);
443
444		let head = chain.header_by_number(18).unwrap().clone();
445		let head_hash = head.hash();
446		let lower_bound_number = 0;
447		let parent_hash = chain.hash_by_number(17).unwrap();
448
449		known.insert(parent_hash);
450
451		// New block should be the only new one.
452		let expected_ancestry = vec![(head_hash, head.clone())];
453
454		let test_fut = Box::pin(async move {
455			let ancestry = determine_new_blocks(
456				ctx.sender(),
457				|h| known.is_known(h),
458				head_hash,
459				&head,
460				lower_bound_number,
461			)
462			.await
463			.unwrap();
464
465			assert_eq!(ancestry, expected_ancestry);
466		});
467
468		futures::executor::block_on(test_fut);
469	}
470
471	#[test]
472	fn determine_new_block_before_finality_is_empty() {
473		let pool = TaskExecutor::new();
474		let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
475
476		let chain = TestChain::new(10, 9);
477
478		let head = chain.header_by_number(18).unwrap().clone();
479		let head_hash = head.hash();
480		let parent_hash = chain.hash_by_number(17).unwrap();
481		let mut known = TestKnownBlocks::default();
482
483		known.insert(parent_hash);
484
485		let test_fut = Box::pin(async move {
486			let after_finality =
487				determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 17)
488					.await
489					.unwrap();
490
491			let at_finality =
492				determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 18)
493					.await
494					.unwrap();
495
496			let before_finality =
497				determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 19)
498					.await
499					.unwrap();
500
501			assert_eq!(after_finality, vec![(head_hash, head.clone())]);
502
503			assert_eq!(at_finality, Vec::new());
504
505			assert_eq!(before_finality, Vec::new());
506		});
507
508		futures::executor::block_on(test_fut);
509	}
510
511	#[test]
512	fn determine_new_blocks_does_not_request_genesis() {
513		let pool = TaskExecutor::new();
514		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
515
516		let chain = TestChain::new(1, 2);
517
518		let head = chain.header_by_number(2).unwrap().clone();
519		let head_hash = head.hash();
520		let known = TestKnownBlocks::default();
521
522		let expected_ancestry = (1..=2)
523			.map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
524			.rev()
525			.collect::<Vec<_>>();
526
527		let test_fut = Box::pin(async move {
528			let ancestry =
529				determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 0)
530					.await
531					.unwrap();
532
533			assert_eq!(ancestry, expected_ancestry);
534		});
535
536		let aux_fut = Box::pin(async move {
537			assert_matches!(
538				handle.recv().await,
539				AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
540					assert_eq!(h, chain.hash_by_number(1).unwrap());
541					let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
542				}
543			);
544		});
545
546		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
547	}
548
549	#[test]
550	fn determine_new_blocks_does_not_request_genesis_even_in_multi_ancestry() {
551		let pool = TaskExecutor::new();
552		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
553
554		let chain = TestChain::new(1, 3);
555
556		let head = chain.header_by_number(3).unwrap().clone();
557		let head_hash = head.hash();
558		let known = TestKnownBlocks::default();
559
560		let expected_ancestry = (1..=3)
561			.map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
562			.rev()
563			.collect::<Vec<_>>();
564
565		let test_fut = Box::pin(async move {
566			let ancestry =
567				determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 0)
568					.await
569					.unwrap();
570
571			assert_eq!(ancestry, expected_ancestry);
572		});
573
574		let aux_fut = Box::pin(async move {
575			assert_matches!(
576				handle.recv().await,
577				AllMessages::ChainApi(ChainApiMessage::Ancestors {
578					hash: h,
579					k,
580					response_channel: tx,
581				}) => {
582					assert_eq!(h, head_hash);
583					assert_eq!(k, 2);
584
585					let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
586				}
587			);
588
589			for _ in 0_u8..2 {
590				assert_matches!(
591					handle.recv().await,
592					AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
593						let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
594					}
595				);
596			}
597		});
598
599		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
600	}
601}