1use crate::chain_head::{
22 chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
23 event::{
24 BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
25 RuntimeVersionEvent,
26 },
27 subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError},
28};
29use futures::{
30 channel::oneshot,
31 stream::{self, Stream, StreamExt, TryStreamExt},
32};
33use log::debug;
34use sc_client_api::{
35 Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
36};
37use sc_rpc::utils::Subscription;
38use schnellru::{ByLength, LruMap};
39use sp_api::CallApiAt;
40use sp_blockchain::{
41 Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
42};
43use sp_runtime::{
44 traits::{Block as BlockT, Header as HeaderT, NumberFor},
45 SaturatedConversion, Saturating,
46};
47use std::{
48 collections::{HashSet, VecDeque},
49 sync::Arc,
50};
51const MAX_FINALIZED_BLOCKS: usize = 16;
54
55pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
57 client: Arc<Client>,
59 backend: Arc<BE>,
61 sub_handle: SubscriptionManagement<Block, BE>,
63 with_runtime: bool,
65 sub_id: String,
67 current_best_block: Option<Block::Hash>,
69 pruned_blocks: LruMap<Block::Hash, ()>,
71 announced_blocks: AnnouncedBlocks<Block>,
73 max_lagging_distance: usize,
76 pub subscription_buffer_cap: usize,
78}
79
80struct AnnouncedBlocks<Block: BlockT> {
81 blocks: LruMap<Block::Hash, ()>,
83 finalized: MostRecentFinalizedBlocks<Block>,
85}
86
87struct MostRecentFinalizedBlocks<Block: BlockT>(LruMap<Block::Hash, ()>);
95
96impl<Block: BlockT> MostRecentFinalizedBlocks<Block> {
97 fn insert(&mut self, block: Block::Hash) {
99 self.0.insert(block, ());
100 }
101
102 fn contains(&mut self, block: &Block::Hash) -> Option<&()> {
104 self.0.peek(block)
105 }
106}
107
108impl<Block: BlockT> AnnouncedBlocks<Block> {
109 fn new() -> Self {
111 Self {
112 blocks: LruMap::new(ByLength::new((MAX_PINNED_BLOCKS - MAX_FINALIZED_BLOCKS) as u32)),
115 finalized: MostRecentFinalizedBlocks(LruMap::new(ByLength::new(
118 MAX_FINALIZED_BLOCKS as u32,
119 ))),
120 }
121 }
122
123 fn insert(&mut self, block: Block::Hash, finalized: bool) {
125 if finalized {
126 self.blocks.remove(&block);
131 self.finalized.insert(block);
132 } else {
133 self.blocks.insert(block, ());
134 }
135 }
136
137 fn was_announced(&mut self, block: &Block::Hash) -> bool {
139 self.blocks.get(block).is_some() || self.finalized.contains(block).is_some()
140 }
141}
142
143impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
144 pub fn new(
146 client: Arc<Client>,
147 backend: Arc<BE>,
148 sub_handle: SubscriptionManagement<Block, BE>,
149 with_runtime: bool,
150 sub_id: String,
151 max_lagging_distance: usize,
152 subscription_buffer_cap: usize,
153 ) -> Self {
154 Self {
155 client,
156 backend,
157 sub_handle,
158 with_runtime,
159 sub_id,
160 current_best_block: None,
161 pruned_blocks: LruMap::new(ByLength::new(
162 MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
163 )),
164 announced_blocks: AnnouncedBlocks::new(),
165 max_lagging_distance,
166 subscription_buffer_cap,
167 }
168 }
169}
170
171enum NotificationType<Block: BlockT> {
173 InitialEvents(Vec<FollowEvent<Block::Hash>>),
175 NewBlock(BlockImportNotification<Block>),
177 Finalized(FinalityNotification<Block>),
179 MethodResponse(FollowEvent<Block::Hash>),
181}
182
183#[derive(Clone, Debug)]
185struct InitialBlocks<Block: BlockT> {
186 finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
191 finalized_block_hashes: VecDeque<Block::Hash>,
193 pruned_forks: HashSet<Block::Hash>,
207}
208
209struct StartupPoint<Block: BlockT> {
211 pub best_hash: Block::Hash,
213 pub finalized_hash: Block::Hash,
215 pub finalized_number: NumberFor<Block>,
217}
218
219impl<Block: BlockT> From<Info<Block>> for StartupPoint<Block> {
220 fn from(info: Info<Block>) -> Self {
221 StartupPoint::<Block> {
222 best_hash: info.best_hash,
223 finalized_hash: info.finalized_hash,
224 finalized_number: info.finalized_number,
225 }
226 }
227}
228
229impl<BE, Block, Client> ChainHeadFollower<BE, Block, Client>
230where
231 Block: BlockT + 'static,
232 BE: Backend<Block> + 'static,
233 Client: BlockBackend<Block>
234 + HeaderBackend<Block>
235 + HeaderMetadata<Block, Error = BlockChainError>
236 + BlockchainEvents<Block>
237 + CallApiAt<Block>
238 + 'static,
239{
240 fn generate_runtime_event(
242 &self,
243 block: Block::Hash,
244 parent: Option<Block::Hash>,
245 ) -> Option<RuntimeEvent> {
246 if !self.with_runtime {
248 return None
249 }
250
251 let block_rt = match self.client.runtime_version_at(block) {
252 Ok(rt) => rt,
253 Err(err) => return Some(err.into()),
254 };
255
256 let parent = match parent {
257 Some(parent) => parent,
258 None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() })),
260 };
261
262 let parent_rt = match self.client.runtime_version_at(parent) {
263 Ok(rt) => rt,
264 Err(err) => return Some(err.into()),
265 };
266
267 if block_rt != parent_rt {
269 Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() }))
270 } else {
271 None
272 }
273 }
274
275 fn distance_within_reason(
285 &self,
286 block: Block::Hash,
287 finalized: Block::Hash,
288 ) -> Result<(), SubscriptionManagementError> {
289 let Some(block_num) = self.client.number(block)? else {
290 return Err(SubscriptionManagementError::BlockHashAbsent)
291 };
292 let Some(finalized_num) = self.client.number(finalized)? else {
293 return Err(SubscriptionManagementError::BlockHashAbsent)
294 };
295
296 let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
297 if distance > self.max_lagging_distance {
298 return Err(SubscriptionManagementError::BlockDistanceTooLarge);
299 }
300
301 Ok(())
302 }
303
304 fn get_init_blocks_with_forks(
308 &self,
309 finalized: Block::Hash,
310 ) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
311 let blockchain = self.backend.blockchain();
312 let leaves = blockchain.leaves()?;
313 let mut pruned_forks = HashSet::new();
314 let mut finalized_block_descendants = Vec::new();
315 let mut unique_descendants = HashSet::new();
316
317 for leaf in &leaves {
320 self.distance_within_reason(*leaf, finalized)?;
321 }
322
323 for leaf in leaves {
324 let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;
325
326 let blocks = tree_route.enacted().iter().map(|block| block.hash);
327 if !tree_route.retracted().is_empty() {
328 pruned_forks.extend(blocks);
329 } else {
330 let mut parent = finalized;
334 for child in blocks {
335 let pair = (child, parent);
336
337 if unique_descendants.insert(pair) {
338 self.sub_handle.pin_block(&self.sub_id, child)?;
340 finalized_block_descendants.push(pair);
341 }
342
343 parent = child;
344 }
345 }
346 }
347
348 let mut current_block = finalized;
349 let Some(header) = blockchain.header(current_block)? else {
351 return Err(SubscriptionManagementError::BlockHeaderAbsent);
352 };
353
354 let mut finalized_block_hashes = VecDeque::with_capacity(MAX_FINALIZED_BLOCKS);
356
357 self.sub_handle.pin_block(&self.sub_id, current_block)?;
359 finalized_block_hashes.push_front(current_block);
360 current_block = *header.parent_hash();
361
362 for _ in 0..MAX_FINALIZED_BLOCKS - 1 {
363 let Ok(Some(header)) = blockchain.header(current_block) else { break };
364 if self.sub_handle.pin_block(&self.sub_id, current_block).is_err() {
366 break
367 };
368
369 finalized_block_hashes.push_front(current_block);
370 current_block = *header.parent_hash();
371 }
372
373 Ok(InitialBlocks { finalized_block_descendants, finalized_block_hashes, pruned_forks })
374 }
375
376 fn generate_init_events(
380 &mut self,
381 startup_point: &StartupPoint<Block>,
382 ) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
383 let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;
384
385 let initial_blocks = init.finalized_block_descendants;
387 let finalized_block_hashes = init.finalized_block_hashes;
388 for pruned in init.pruned_forks {
390 self.pruned_blocks.insert(pruned, ());
391 }
392
393 let finalized_block_hash = startup_point.finalized_hash;
394 let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
395
396 for finalized in &finalized_block_hashes {
397 self.announced_blocks.insert(*finalized, true);
398 }
399
400 let initialized_event = FollowEvent::Initialized(Initialized {
401 finalized_block_hashes: finalized_block_hashes.into(),
402 finalized_block_runtime,
403 with_runtime: self.with_runtime,
404 });
405
406 let mut finalized_block_descendants = Vec::with_capacity(initial_blocks.len() + 1);
407
408 finalized_block_descendants.push(initialized_event);
409 for (child, parent) in initial_blocks.into_iter() {
410 if !self.announced_blocks.was_announced(&parent) {
413 return Err(SubscriptionManagementError::BlockHeaderAbsent);
414 }
415 self.announced_blocks.insert(child, false);
416
417 let new_runtime = self.generate_runtime_event(child, Some(parent));
418
419 let event = FollowEvent::NewBlock(NewBlock {
420 block_hash: child,
421 parent_block_hash: parent,
422 new_runtime,
423 with_runtime: self.with_runtime,
424 });
425
426 finalized_block_descendants.push(event);
427 }
428
429 let best_block_hash = startup_point.best_hash;
431 if best_block_hash != finalized_block_hash {
432 if !self.announced_blocks.was_announced(&best_block_hash) {
433 return Err(SubscriptionManagementError::BlockHeaderAbsent);
434 }
435 self.announced_blocks.insert(best_block_hash, true);
436
437 let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
438 self.current_best_block = Some(best_block_hash);
439 finalized_block_descendants.push(best_block);
440 };
441
442 Ok(finalized_block_descendants)
443 }
444
445 fn generate_import_events(
448 &mut self,
449 block_hash: Block::Hash,
450 parent_block_hash: Block::Hash,
451 is_best_block: bool,
452 ) -> Vec<FollowEvent<Block::Hash>> {
453 let new_runtime = self.generate_runtime_event(block_hash, Some(parent_block_hash));
454
455 let new_block = FollowEvent::NewBlock(NewBlock {
456 block_hash,
457 parent_block_hash,
458 new_runtime,
459 with_runtime: self.with_runtime,
460 });
461
462 if !is_best_block {
463 return vec![new_block]
464 }
465
466 let best_block_event =
468 FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });
469
470 match self.current_best_block {
471 Some(block_cache) => {
472 if block_cache != block_hash {
475 self.current_best_block = Some(block_hash);
476 vec![new_block, best_block_event]
477 } else {
478 vec![new_block]
479 }
480 },
481 None => {
482 self.current_best_block = Some(block_hash);
483 vec![new_block, best_block_event]
484 },
485 }
486 }
487
488 fn handle_import_blocks(
490 &mut self,
491 notification: BlockImportNotification<Block>,
492 startup_point: &StartupPoint<Block>,
493 ) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
494 let block_hash = notification.hash;
495
496 if *notification.header.number() < startup_point.finalized_number {
498 return Ok(Default::default())
499 }
500
501 if !self.sub_handle.pin_block(&self.sub_id, block_hash)? {
503 return Ok(Default::default())
513 }
514
515 if self.announced_blocks.was_announced(&block_hash) {
516 return Ok(Default::default())
518 }
519
520 let parent_block_hash = *notification.header.parent_hash();
522 if !self.announced_blocks.was_announced(&parent_block_hash) {
523 return Err(SubscriptionManagementError::Custom("Parent block was not reported".into()))
525 }
526
527 self.announced_blocks.insert(block_hash, false);
528 Ok(self.generate_import_events(block_hash, parent_block_hash, notification.is_new_best))
529 }
530
531 fn generate_finalized_events(
541 &mut self,
542 finalized_block_hashes: &[Block::Hash],
543 ) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
544 let mut events = Vec::new();
545
546 let Some(first_hash) = finalized_block_hashes.get(0) else { return Ok(Default::default()) };
548
549 let Some(first_header) = self.client.header(*first_hash)? else {
551 return Err(SubscriptionManagementError::BlockHeaderAbsent)
552 };
553
554 if !self.announced_blocks.was_announced(first_header.parent_hash()) {
555 return Err(SubscriptionManagementError::Custom(
556 "Parent block was not reported for a finalized block".into(),
557 ));
558 }
559
560 let parents =
561 std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
562 for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
563 self.sub_handle.pin_block(&self.sub_id, *hash)?;
565
566 if self.announced_blocks.was_announced(hash) {
568 continue;
569 }
570
571 let is_last = i + 1 == finalized_block_hashes.len();
573 if !is_last {
574 events.extend(self.generate_import_events(*hash, *parent, false));
576 self.announced_blocks.insert(*hash, true);
577 continue;
578 }
579
580 if let Some(best_block_hash) = self.current_best_block {
581 let ancestor =
582 sp_blockchain::lowest_common_ancestor(&*self.client, *hash, best_block_hash)?;
583
584 if ancestor.hash == *hash {
592 return Err(SubscriptionManagementError::Custom(
593 "A descendent of the finalized block was already reported".into(),
594 ))
595 }
596 }
597
598 events.extend(self.generate_import_events(*hash, *parent, true));
600 self.announced_blocks.insert(*hash, true);
601 }
602
603 Ok(events)
604 }
605
606 fn get_pruned_hashes(
608 &mut self,
609 stale_heads: &[Block::Hash],
610 last_finalized: Block::Hash,
611 ) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
612 let blockchain = self.backend.blockchain();
613 let mut pruned = Vec::new();
614
615 for stale_head in stale_heads {
616 let tree_route = sp_blockchain::tree_route(blockchain, last_finalized, *stale_head)?;
617
618 pruned.extend(tree_route.enacted().iter().filter_map(|block| {
620 if self.pruned_blocks.get(&block.hash).is_some() {
621 return None
623 }
624
625 self.pruned_blocks.insert(block.hash, ());
626 Some(block.hash)
627 }))
628 }
629
630 Ok(pruned)
631 }
632
633 fn handle_finalized_blocks(
638 &mut self,
639 notification: FinalityNotification<Block>,
640 startup_point: &StartupPoint<Block>,
641 ) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
642 let last_finalized = notification.hash;
643
644 if *notification.header.number() < startup_point.finalized_number {
646 return Ok(Default::default())
647 }
648
649 let mut finalized_block_hashes = notification.tree_route.to_vec();
652 finalized_block_hashes.push(last_finalized);
653
654 let mut events = self.generate_finalized_events(&finalized_block_hashes)?;
656
657 let pruned_block_hashes =
660 self.get_pruned_hashes(¬ification.stale_heads, last_finalized)?;
661
662 for finalized in &finalized_block_hashes {
663 self.announced_blocks.insert(*finalized, true);
664 }
665
666 let finalized_event = FollowEvent::Finalized(Finalized {
667 finalized_block_hashes,
668 pruned_block_hashes: pruned_block_hashes.clone(),
669 });
670
671 if let Some(current_best_block) = self.current_best_block {
672 let is_in_pruned_list =
674 pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
675 if is_in_pruned_list {
676 self.current_best_block = Some(last_finalized);
677 events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
678 best_block_hash: last_finalized,
679 }));
680 } else {
681 let ancestor = sp_blockchain::lowest_common_ancestor(
688 &*self.client,
689 last_finalized,
690 current_best_block,
691 )?;
692 let is_descendant = ancestor.hash == last_finalized;
693 if !is_descendant {
694 self.current_best_block = Some(last_finalized);
695 events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
696 best_block_hash: last_finalized,
697 }));
698 }
699 }
700 }
701
702 events.push(finalized_event);
703 Ok(events)
704 }
705
706 async fn submit_events<EventStream>(
709 &mut self,
710 startup_point: &StartupPoint<Block>,
711 stream: EventStream,
712 sink: Subscription,
713 rx_stop: oneshot::Receiver<()>,
714 ) -> Result<(), SubscriptionManagementError>
715 where
716 EventStream: Stream<Item = NotificationType<Block>> + Unpin + Send,
717 {
718 let buffer_cap = self.subscription_buffer_cap;
719 let mut handle_events = |event| match event {
721 NotificationType::InitialEvents(events) => Ok(events),
722 NotificationType::NewBlock(notification) =>
723 self.handle_import_blocks(notification, &startup_point),
724 NotificationType::Finalized(notification) =>
725 self.handle_finalized_blocks(notification, &startup_point),
726 NotificationType::MethodResponse(notification) => Ok(vec![notification]),
727 };
728
729 let stream = stream
730 .map(|event| handle_events(event))
731 .map_ok(|items| stream::iter(items).map(Ok))
732 .try_flatten();
733
734 tokio::pin!(stream);
735
736 let sink_future =
737 sink.pipe_from_try_stream(stream, sc_rpc::utils::BoundedVecDeque::new(buffer_cap));
738
739 let result = tokio::select! {
740 _ = rx_stop => Ok(()),
741 result = sink_future => {
742 if let Err(ref e) = result {
743 debug!(
744 target: LOG_TARGET,
745 "[follow][id={:?}] Failed to handle stream notification {:?}",
746 &self.sub_id,
747 e
748 );
749 };
750 result
751 }
752 };
753 let _ = sink.send(&FollowEvent::<String>::Stop).await;
754 result
755 }
756
757 pub async fn generate_events(
759 &mut self,
760 sink: Subscription,
761 sub_data: InsertedSubscriptionData<Block>,
762 ) -> Result<(), SubscriptionManagementError> {
763 let stream_import = self
765 .client
766 .import_notification_stream()
767 .map(|notification| NotificationType::NewBlock(notification));
768
769 let stream_finalized = self
770 .client
771 .finality_notification_stream()
772 .map(|notification| NotificationType::Finalized(notification));
773
774 let stream_responses = sub_data
775 .response_receiver
776 .map(|response| NotificationType::MethodResponse(response));
777
778 let startup_point = StartupPoint::from(self.client.info());
779 let initial_events = match self.generate_init_events(&startup_point) {
780 Ok(blocks) => blocks,
781 Err(err) => {
782 debug!(
783 target: LOG_TARGET,
784 "[follow][id={:?}] Failed to generate the initial events {:?}",
785 self.sub_id,
786 err
787 );
788 let _ = sink.send(&FollowEvent::<String>::Stop).await;
789 return Err(err)
790 },
791 };
792
793 let initial = NotificationType::InitialEvents(initial_events);
794 let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
795 let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
796 let stream = stream::once(futures::future::ready(initial)).chain(merged);
797
798 self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await
799 }
800}