1use futures::{channel::oneshot, prelude::*};
20use polkadot_node_subsystem::{messages::ChainApiMessage, SubsystemSender};
21use polkadot_primitives::{BlockNumber, Hash, Header};
22
23pub async fn determine_new_blocks<E, Sender>(
35 sender: &mut Sender,
36 is_known: impl Fn(&Hash) -> Result<bool, E>,
37 head: Hash,
38 header: &Header,
39 lower_bound_number: BlockNumber,
40) -> Result<Vec<(Hash, Header)>, E>
41where
42 Sender: SubsystemSender<ChainApiMessage>,
43{
44 const ANCESTRY_STEP: usize = 4;
45
46 let min_block_needed = lower_bound_number + 1;
47
48 {
50 let already_known = is_known(&head)?;
51
52 let before_relevant = header.number < min_block_needed;
53
54 if already_known || before_relevant {
55 return Ok(Vec::new())
56 }
57 }
58
59 let mut ancestry = vec![(head, header.clone())];
60
61 if is_known(&header.parent_hash)? || header.number == min_block_needed {
64 return Ok(ancestry)
65 }
66
67 'outer: loop {
68 let (last_hash, last_header) = ancestry
69 .last()
70 .expect("ancestry has length 1 at initialization and is only added to; qed");
71
72 assert!(
73 last_header.number > min_block_needed,
74 "Loop invariant: the last block in ancestry is checked to be \
75 above the minimum before the loop, and at the end of each iteration; \
76 qed"
77 );
78
79 let (tx, rx) = oneshot::channel();
80
81 let ancestry_step =
84 std::cmp::min(ANCESTRY_STEP, (last_header.number - min_block_needed) as usize);
85
86 let batch_hashes = if ancestry_step == 1 {
87 vec![last_header.parent_hash]
88 } else {
89 sender
90 .send_message(
91 ChainApiMessage::Ancestors {
92 hash: *last_hash,
93 k: ancestry_step,
94 response_channel: tx,
95 }
96 .into(),
97 )
98 .await;
99
100 match rx.await {
102 Err(_) | Ok(Err(_)) => break 'outer,
103 Ok(Ok(ancestors)) => ancestors,
104 }
105 };
106
107 let batch_headers = {
108 let (batch_senders, batch_receivers) = (0..batch_hashes.len())
109 .map(|_| oneshot::channel())
110 .unzip::<_, _, Vec<_>, Vec<_>>();
111
112 for (hash, batched_sender) in batch_hashes.iter().cloned().zip(batch_senders) {
113 sender
114 .send_message(ChainApiMessage::BlockHeader(hash, batched_sender).into())
115 .await;
116 }
117
118 let mut requests = futures::stream::FuturesOrdered::new();
119 batch_receivers
120 .into_iter()
121 .map(|rx| async move {
122 match rx.await {
123 Err(_) | Ok(Err(_)) => None,
124 Ok(Ok(h)) => h,
125 }
126 })
127 .for_each(|x| requests.push_back(x));
128
129 let batch_headers: Vec<_> =
130 requests.flat_map(|x: Option<Header>| stream::iter(x)).collect().await;
131
132 if batch_headers.len() != batch_hashes.len() {
136 break 'outer
137 }
138 batch_headers
139 };
140
141 for (hash, header) in batch_hashes.into_iter().zip(batch_headers) {
142 let is_known = is_known(&hash)?;
143
144 let is_relevant = header.number >= min_block_needed;
145 let is_terminating = header.number == min_block_needed;
146
147 if is_known || !is_relevant {
148 break 'outer
149 }
150
151 ancestry.push((hash, header));
152
153 if is_terminating {
154 break 'outer
155 }
156 }
157 }
158
159 Ok(ancestry)
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use assert_matches::assert_matches;
166 use polkadot_node_subsystem_test_helpers::make_subsystem_context;
167 use polkadot_overseer::{AllMessages, SubsystemContext};
168 use sp_core::testing::TaskExecutor;
169 use std::collections::{HashMap, HashSet};
170
171 #[derive(Default)]
172 struct TestKnownBlocks {
173 blocks: HashSet<Hash>,
174 }
175
176 impl TestKnownBlocks {
177 fn insert(&mut self, hash: Hash) {
178 self.blocks.insert(hash);
179 }
180
181 fn is_known(&self, hash: &Hash) -> Result<bool, ()> {
182 Ok(self.blocks.contains(hash))
183 }
184 }
185
186 #[derive(Clone)]
187 struct TestChain {
188 start_number: BlockNumber,
189 headers: Vec<Header>,
190 numbers: HashMap<Hash, BlockNumber>,
191 }
192
193 impl TestChain {
194 fn new(start: BlockNumber, len: usize) -> Self {
195 assert!(len > 0, "len must be at least 1");
196
197 let base = Header {
198 digest: Default::default(),
199 extrinsics_root: Default::default(),
200 number: start,
201 state_root: Default::default(),
202 parent_hash: Default::default(),
203 };
204
205 let base_hash = base.hash();
206
207 let mut chain = TestChain {
208 start_number: start,
209 headers: vec![base],
210 numbers: vec![(base_hash, start)].into_iter().collect(),
211 };
212
213 for _ in 1..len {
214 chain.grow()
215 }
216
217 chain
218 }
219
220 fn grow(&mut self) {
221 let next = {
222 let last = self.headers.last().unwrap();
223 Header {
224 digest: Default::default(),
225 extrinsics_root: Default::default(),
226 number: last.number + 1,
227 state_root: Default::default(),
228 parent_hash: last.hash(),
229 }
230 };
231
232 self.numbers.insert(next.hash(), next.number);
233 self.headers.push(next);
234 }
235
236 fn header_by_number(&self, number: BlockNumber) -> Option<&Header> {
237 if number < self.start_number {
238 None
239 } else {
240 self.headers.get((number - self.start_number) as usize)
241 }
242 }
243
244 fn header_by_hash(&self, hash: &Hash) -> Option<&Header> {
245 self.numbers.get(hash).and_then(|n| self.header_by_number(*n))
246 }
247
248 fn hash_by_number(&self, number: BlockNumber) -> Option<Hash> {
249 self.header_by_number(number).map(|h| h.hash())
250 }
251
252 fn ancestry(&self, hash: &Hash, k: BlockNumber) -> Vec<Hash> {
253 let n = match self.numbers.get(hash) {
254 None => return Vec::new(),
255 Some(&n) => n,
256 };
257
258 (0..k)
259 .map(|i| i + 1)
260 .filter_map(|i| self.header_by_number(n - i))
261 .map(|h| h.hash())
262 .collect()
263 }
264 }
265
266 #[test]
267 fn determine_new_blocks_back_to_lower_bound() {
268 let pool = TaskExecutor::new();
269 let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
270
271 let known = TestKnownBlocks::default();
272
273 let chain = TestChain::new(10, 9);
274
275 let head = chain.header_by_number(18).unwrap().clone();
276 let head_hash = head.hash();
277 let lower_bound_number = 12;
278
279 let expected_ancestry = (13..=18)
282 .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
283 .rev()
284 .collect::<Vec<_>>();
285
286 let test_fut = Box::pin(async move {
287 let ancestry = determine_new_blocks(
288 ctx.sender(),
289 |h| known.is_known(h),
290 head_hash,
291 &head,
292 lower_bound_number,
293 )
294 .await
295 .unwrap();
296
297 assert_eq!(ancestry, expected_ancestry);
298 });
299
300 let aux_fut = Box::pin(async move {
301 assert_matches!(
302 handle.recv().await,
303 AllMessages::ChainApi(ChainApiMessage::Ancestors {
304 hash: h,
305 k,
306 response_channel: tx,
307 }) => {
308 assert_eq!(h, head_hash);
309 assert_eq!(k, 4);
310 let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
311 }
312 );
313
314 for _ in 0u32..4 {
315 assert_matches!(
316 handle.recv().await,
317 AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
318 let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
319 }
320 );
321 }
322
323 assert_matches!(
324 handle.recv().await,
325 AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
326 assert_eq!(h, chain.hash_by_number(13).unwrap());
327 let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
328 }
329 );
330 });
331
332 futures::executor::block_on(futures::future::join(test_fut, aux_fut));
333 }
334
335 #[test]
336 fn determine_new_blocks_back_to_known() {
337 let pool = TaskExecutor::new();
338 let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
339
340 let mut known = TestKnownBlocks::default();
341
342 let chain = TestChain::new(10, 9);
343
344 let head = chain.header_by_number(18).unwrap().clone();
345 let head_hash = head.hash();
346 let lower_bound_number = 12;
347 let known_number = 15;
348 let known_hash = chain.hash_by_number(known_number).unwrap();
349
350 known.insert(known_hash);
351
352 let expected_ancestry = (16..=18)
355 .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
356 .rev()
357 .collect::<Vec<_>>();
358
359 let test_fut = Box::pin(async move {
360 let ancestry = determine_new_blocks(
361 ctx.sender(),
362 |h| known.is_known(h),
363 head_hash,
364 &head,
365 lower_bound_number,
366 )
367 .await
368 .unwrap();
369
370 assert_eq!(ancestry, expected_ancestry);
371 });
372
373 let aux_fut = Box::pin(async move {
374 assert_matches!(
375 handle.recv().await,
376 AllMessages::ChainApi(ChainApiMessage::Ancestors {
377 hash: h,
378 k,
379 response_channel: tx,
380 }) => {
381 assert_eq!(h, head_hash);
382 assert_eq!(k, 4);
383 let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
384 }
385 );
386
387 for _ in 0u32..4 {
388 assert_matches!(
389 handle.recv().await,
390 AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
391 let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
392 }
393 );
394 }
395 });
396
397 futures::executor::block_on(futures::future::join(test_fut, aux_fut));
398 }
399
400 #[test]
401 fn determine_new_blocks_already_known_is_empty() {
402 let pool = TaskExecutor::new();
403 let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
404
405 let mut known = TestKnownBlocks::default();
406
407 let chain = TestChain::new(10, 9);
408
409 let head = chain.header_by_number(18).unwrap().clone();
410 let head_hash = head.hash();
411 let lower_bound_number = 0;
412
413 known.insert(head_hash);
414
415 let expected_ancestry = Vec::new();
417
418 let test_fut = Box::pin(async move {
419 let ancestry = determine_new_blocks(
420 ctx.sender(),
421 |h| known.is_known(h),
422 head_hash,
423 &head,
424 lower_bound_number,
425 )
426 .await
427 .unwrap();
428
429 assert_eq!(ancestry, expected_ancestry);
430 });
431
432 futures::executor::block_on(test_fut);
433 }
434
435 #[test]
436 fn determine_new_blocks_parent_known_is_fast() {
437 let pool = TaskExecutor::new();
438 let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
439
440 let mut known = TestKnownBlocks::default();
441
442 let chain = TestChain::new(10, 9);
443
444 let head = chain.header_by_number(18).unwrap().clone();
445 let head_hash = head.hash();
446 let lower_bound_number = 0;
447 let parent_hash = chain.hash_by_number(17).unwrap();
448
449 known.insert(parent_hash);
450
451 let expected_ancestry = vec![(head_hash, head.clone())];
453
454 let test_fut = Box::pin(async move {
455 let ancestry = determine_new_blocks(
456 ctx.sender(),
457 |h| known.is_known(h),
458 head_hash,
459 &head,
460 lower_bound_number,
461 )
462 .await
463 .unwrap();
464
465 assert_eq!(ancestry, expected_ancestry);
466 });
467
468 futures::executor::block_on(test_fut);
469 }
470
471 #[test]
472 fn determine_new_block_before_finality_is_empty() {
473 let pool = TaskExecutor::new();
474 let (mut ctx, _handle) = make_subsystem_context::<(), _>(pool.clone());
475
476 let chain = TestChain::new(10, 9);
477
478 let head = chain.header_by_number(18).unwrap().clone();
479 let head_hash = head.hash();
480 let parent_hash = chain.hash_by_number(17).unwrap();
481 let mut known = TestKnownBlocks::default();
482
483 known.insert(parent_hash);
484
485 let test_fut = Box::pin(async move {
486 let after_finality =
487 determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 17)
488 .await
489 .unwrap();
490
491 let at_finality =
492 determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 18)
493 .await
494 .unwrap();
495
496 let before_finality =
497 determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 19)
498 .await
499 .unwrap();
500
501 assert_eq!(after_finality, vec![(head_hash, head.clone())]);
502
503 assert_eq!(at_finality, Vec::new());
504
505 assert_eq!(before_finality, Vec::new());
506 });
507
508 futures::executor::block_on(test_fut);
509 }
510
511 #[test]
512 fn determine_new_blocks_does_not_request_genesis() {
513 let pool = TaskExecutor::new();
514 let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
515
516 let chain = TestChain::new(1, 2);
517
518 let head = chain.header_by_number(2).unwrap().clone();
519 let head_hash = head.hash();
520 let known = TestKnownBlocks::default();
521
522 let expected_ancestry = (1..=2)
523 .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
524 .rev()
525 .collect::<Vec<_>>();
526
527 let test_fut = Box::pin(async move {
528 let ancestry =
529 determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 0)
530 .await
531 .unwrap();
532
533 assert_eq!(ancestry, expected_ancestry);
534 });
535
536 let aux_fut = Box::pin(async move {
537 assert_matches!(
538 handle.recv().await,
539 AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
540 assert_eq!(h, chain.hash_by_number(1).unwrap());
541 let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
542 }
543 );
544 });
545
546 futures::executor::block_on(futures::future::join(test_fut, aux_fut));
547 }
548
549 #[test]
550 fn determine_new_blocks_does_not_request_genesis_even_in_multi_ancestry() {
551 let pool = TaskExecutor::new();
552 let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
553
554 let chain = TestChain::new(1, 3);
555
556 let head = chain.header_by_number(3).unwrap().clone();
557 let head_hash = head.hash();
558 let known = TestKnownBlocks::default();
559
560 let expected_ancestry = (1..=3)
561 .map(|n| chain.header_by_number(n).map(|h| (h.hash(), h.clone())).unwrap())
562 .rev()
563 .collect::<Vec<_>>();
564
565 let test_fut = Box::pin(async move {
566 let ancestry =
567 determine_new_blocks(ctx.sender(), |h| known.is_known(h), head_hash, &head, 0)
568 .await
569 .unwrap();
570
571 assert_eq!(ancestry, expected_ancestry);
572 });
573
574 let aux_fut = Box::pin(async move {
575 assert_matches!(
576 handle.recv().await,
577 AllMessages::ChainApi(ChainApiMessage::Ancestors {
578 hash: h,
579 k,
580 response_channel: tx,
581 }) => {
582 assert_eq!(h, head_hash);
583 assert_eq!(k, 2);
584
585 let _ = tx.send(Ok(chain.ancestry(&h, k as _)));
586 }
587 );
588
589 for _ in 0_u8..2 {
590 assert_matches!(
591 handle.recv().await,
592 AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
593 let _ = tx.send(Ok(chain.header_by_hash(&h).map(|h| h.clone())));
594 }
595 );
596 }
597 });
598
599 futures::executor::block_on(futures::future::join(test_fut, aux_fut));
600 }
601}