1use crate::chain_head::{
22 chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
23 event::{
24 BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
25 RuntimeVersionEvent,
26 },
27 subscription::{SubscriptionManagement, SubscriptionManagementError},
28};
29use futures::{
30 channel::oneshot,
31 stream::{self, Stream, StreamExt},
32};
33use futures_util::future::Either;
34use log::debug;
35use sc_client_api::{
36 Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
37};
38use sc_rpc::utils::Subscription;
39use schnellru::{ByLength, LruMap};
40use sp_api::CallApiAt;
41use sp_blockchain::{
42 Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
43};
44use sp_runtime::{
45 traits::{Block as BlockT, Header as HeaderT, NumberFor},
46 SaturatedConversion, Saturating,
47};
48use std::{
49 collections::{HashSet, VecDeque},
50 sync::Arc,
51};
52const MAX_FINALIZED_BLOCKS: usize = 16;
55
56use super::subscription::InsertedSubscriptionData;
57
58pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
60 client: Arc<Client>,
62 backend: Arc<BE>,
64 sub_handle: SubscriptionManagement<Block, BE>,
66 with_runtime: bool,
68 sub_id: String,
70 current_best_block: Option<Block::Hash>,
72 pruned_blocks: LruMap<Block::Hash, ()>,
74 max_lagging_distance: usize,
77}
78
79impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
80 pub fn new(
82 client: Arc<Client>,
83 backend: Arc<BE>,
84 sub_handle: SubscriptionManagement<Block, BE>,
85 with_runtime: bool,
86 sub_id: String,
87 max_lagging_distance: usize,
88 ) -> Self {
89 Self {
90 client,
91 backend,
92 sub_handle,
93 with_runtime,
94 sub_id,
95 current_best_block: None,
96 pruned_blocks: LruMap::new(ByLength::new(
97 MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
98 )),
99 max_lagging_distance,
100 }
101 }
102}
103
104enum NotificationType<Block: BlockT> {
106 InitialEvents(Vec<FollowEvent<Block::Hash>>),
108 NewBlock(BlockImportNotification<Block>),
110 Finalized(FinalityNotification<Block>),
112 MethodResponse(FollowEvent<Block::Hash>),
114}
115
116#[derive(Clone, Debug)]
118struct InitialBlocks<Block: BlockT> {
119 finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
124 finalized_block_hashes: VecDeque<Block::Hash>,
126 pruned_forks: HashSet<Block::Hash>,
140}
141
142struct StartupPoint<Block: BlockT> {
144 pub best_hash: Block::Hash,
146 pub finalized_hash: Block::Hash,
148 pub finalized_number: NumberFor<Block>,
150}
151
152impl<Block: BlockT> From<Info<Block>> for StartupPoint<Block> {
153 fn from(info: Info<Block>) -> Self {
154 StartupPoint::<Block> {
155 best_hash: info.best_hash,
156 finalized_hash: info.finalized_hash,
157 finalized_number: info.finalized_number,
158 }
159 }
160}
161
162impl<BE, Block, Client> ChainHeadFollower<BE, Block, Client>
163where
164 Block: BlockT + 'static,
165 BE: Backend<Block> + 'static,
166 Client: BlockBackend<Block>
167 + HeaderBackend<Block>
168 + HeaderMetadata<Block, Error = BlockChainError>
169 + BlockchainEvents<Block>
170 + CallApiAt<Block>
171 + 'static,
172{
173 fn generate_runtime_event(
175 &self,
176 block: Block::Hash,
177 parent: Option<Block::Hash>,
178 ) -> Option<RuntimeEvent> {
179 if !self.with_runtime {
181 return None
182 }
183
184 let block_rt = match self.client.runtime_version_at(block) {
185 Ok(rt) => rt,
186 Err(err) => return Some(err.into()),
187 };
188
189 let parent = match parent {
190 Some(parent) => parent,
191 None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() })),
193 };
194
195 let parent_rt = match self.client.runtime_version_at(parent) {
196 Ok(rt) => rt,
197 Err(err) => return Some(err.into()),
198 };
199
200 if block_rt != parent_rt {
202 Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() }))
203 } else {
204 None
205 }
206 }
207
208 fn distace_within_reason(
218 &self,
219 block: Block::Hash,
220 finalized: Block::Hash,
221 ) -> Result<(), SubscriptionManagementError> {
222 let Some(block_num) = self.client.number(block)? else {
223 return Err(SubscriptionManagementError::BlockHashAbsent)
224 };
225 let Some(finalized_num) = self.client.number(finalized)? else {
226 return Err(SubscriptionManagementError::BlockHashAbsent)
227 };
228
229 let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
230 if distance > self.max_lagging_distance {
231 return Err(SubscriptionManagementError::BlockDistanceTooLarge);
232 }
233
234 Ok(())
235 }
236
237 fn get_init_blocks_with_forks(
241 &self,
242 finalized: Block::Hash,
243 ) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
244 let blockchain = self.backend.blockchain();
245 let leaves = blockchain.leaves()?;
246 let mut pruned_forks = HashSet::new();
247 let mut finalized_block_descendants = Vec::new();
248 let mut unique_descendants = HashSet::new();
249
250 for leaf in &leaves {
253 self.distace_within_reason(*leaf, finalized)?;
254 }
255
256 for leaf in leaves {
257 let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;
258
259 let blocks = tree_route.enacted().iter().map(|block| block.hash);
260 if !tree_route.retracted().is_empty() {
261 pruned_forks.extend(blocks);
262 } else {
263 let mut parent = finalized;
267 for child in blocks {
268 let pair = (child, parent);
269
270 if unique_descendants.insert(pair) {
271 self.sub_handle.pin_block(&self.sub_id, child)?;
273 finalized_block_descendants.push(pair);
274 }
275
276 parent = child;
277 }
278 }
279 }
280
281 let mut current_block = finalized;
282 let Some(header) = blockchain.header(current_block)? else {
284 return Err(SubscriptionManagementError::BlockHeaderAbsent);
285 };
286
287 let mut finalized_block_hashes = VecDeque::with_capacity(MAX_FINALIZED_BLOCKS);
289
290 self.sub_handle.pin_block(&self.sub_id, current_block)?;
292 finalized_block_hashes.push_front(current_block);
293 current_block = *header.parent_hash();
294
295 for _ in 0..MAX_FINALIZED_BLOCKS - 1 {
296 let Ok(Some(header)) = blockchain.header(current_block) else { break };
297 if self.sub_handle.pin_block(&self.sub_id, current_block).is_err() {
299 break
300 };
301
302 finalized_block_hashes.push_front(current_block);
303 current_block = *header.parent_hash();
304 }
305
306 Ok(InitialBlocks { finalized_block_descendants, finalized_block_hashes, pruned_forks })
307 }
308
309 fn generate_init_events(
313 &mut self,
314 startup_point: &StartupPoint<Block>,
315 ) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
316 let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;
317
318 let initial_blocks = init.finalized_block_descendants;
320 let finalized_block_hashes = init.finalized_block_hashes;
321 for pruned in init.pruned_forks {
323 self.pruned_blocks.insert(pruned, ());
324 }
325
326 let finalized_block_hash = startup_point.finalized_hash;
327 let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
328
329 let initialized_event = FollowEvent::Initialized(Initialized {
330 finalized_block_hashes: finalized_block_hashes.into(),
331 finalized_block_runtime,
332 with_runtime: self.with_runtime,
333 });
334
335 let mut finalized_block_descendants = Vec::with_capacity(initial_blocks.len() + 1);
336
337 finalized_block_descendants.push(initialized_event);
338 for (child, parent) in initial_blocks.into_iter() {
339 let new_runtime = self.generate_runtime_event(child, Some(parent));
340
341 let event = FollowEvent::NewBlock(NewBlock {
342 block_hash: child,
343 parent_block_hash: parent,
344 new_runtime,
345 with_runtime: self.with_runtime,
346 });
347
348 finalized_block_descendants.push(event);
349 }
350
351 let best_block_hash = startup_point.best_hash;
353 if best_block_hash != finalized_block_hash {
354 let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
355 self.current_best_block = Some(best_block_hash);
356 finalized_block_descendants.push(best_block);
357 };
358
359 Ok(finalized_block_descendants)
360 }
361
362 fn generate_import_events(
365 &mut self,
366 block_hash: Block::Hash,
367 parent_block_hash: Block::Hash,
368 is_best_block: bool,
369 ) -> Vec<FollowEvent<Block::Hash>> {
370 let new_runtime = self.generate_runtime_event(block_hash, Some(parent_block_hash));
371
372 let new_block = FollowEvent::NewBlock(NewBlock {
373 block_hash,
374 parent_block_hash,
375 new_runtime,
376 with_runtime: self.with_runtime,
377 });
378
379 if !is_best_block {
380 return vec![new_block]
381 }
382
383 let best_block_event =
385 FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });
386
387 match self.current_best_block {
388 Some(block_cache) => {
389 if block_cache != block_hash {
392 self.current_best_block = Some(block_hash);
393 vec![new_block, best_block_event]
394 } else {
395 vec![new_block]
396 }
397 },
398 None => {
399 self.current_best_block = Some(block_hash);
400 vec![new_block, best_block_event]
401 },
402 }
403 }
404
405 fn handle_import_blocks(
407 &mut self,
408 notification: BlockImportNotification<Block>,
409 startup_point: &StartupPoint<Block>,
410 ) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
411 if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? {
413 return Ok(Default::default())
414 }
415
416 if *notification.header.number() < startup_point.finalized_number {
418 return Ok(Default::default())
419 }
420
421 Ok(self.generate_import_events(
422 notification.hash,
423 *notification.header.parent_hash(),
424 notification.is_new_best,
425 ))
426 }
427
428 fn generate_finalized_events(
438 &mut self,
439 finalized_block_hashes: &[Block::Hash],
440 ) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
441 let mut events = Vec::new();
442
443 let Some(first_hash) = finalized_block_hashes.get(0) else { return Ok(Default::default()) };
445
446 let Some(first_header) = self.client.header(*first_hash)? else {
448 return Err(SubscriptionManagementError::BlockHeaderAbsent)
449 };
450
451 let parents =
452 std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
453 for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
454 if !self.sub_handle.pin_block(&self.sub_id, *hash)? {
456 continue
457 }
458
459 let is_last = i + 1 == finalized_block_hashes.len();
461 if !is_last {
462 events.extend(self.generate_import_events(*hash, *parent, false));
464 continue;
465 }
466
467 if let Some(best_block_hash) = self.current_best_block {
468 let ancestor =
469 sp_blockchain::lowest_common_ancestor(&*self.client, *hash, best_block_hash)?;
470
471 if ancestor.hash == *hash {
479 return Err(SubscriptionManagementError::Custom(
480 "A descendent of the finalized block was already reported".into(),
481 ))
482 }
483 }
484
485 events.extend(self.generate_import_events(*hash, *parent, true))
487 }
488
489 Ok(events)
490 }
491
492 fn get_pruned_hashes(
494 &mut self,
495 stale_heads: &[Block::Hash],
496 last_finalized: Block::Hash,
497 ) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
498 let blockchain = self.backend.blockchain();
499 let mut pruned = Vec::new();
500
501 for stale_head in stale_heads {
502 let tree_route = sp_blockchain::tree_route(blockchain, last_finalized, *stale_head)?;
503
504 pruned.extend(tree_route.enacted().iter().filter_map(|block| {
506 if self.pruned_blocks.get(&block.hash).is_some() {
507 return None
509 }
510
511 self.pruned_blocks.insert(block.hash, ());
512 Some(block.hash)
513 }))
514 }
515
516 Ok(pruned)
517 }
518
519 fn handle_finalized_blocks(
524 &mut self,
525 notification: FinalityNotification<Block>,
526 startup_point: &StartupPoint<Block>,
527 ) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
528 let last_finalized = notification.hash;
529
530 if *notification.header.number() < startup_point.finalized_number {
532 return Ok(Default::default())
533 }
534
535 let mut finalized_block_hashes = notification.tree_route.to_vec();
538 finalized_block_hashes.push(last_finalized);
539
540 let mut events = self.generate_finalized_events(&finalized_block_hashes)?;
542
543 let pruned_block_hashes =
546 self.get_pruned_hashes(¬ification.stale_heads, last_finalized)?;
547
548 let finalized_event = FollowEvent::Finalized(Finalized {
549 finalized_block_hashes,
550 pruned_block_hashes: pruned_block_hashes.clone(),
551 });
552
553 if let Some(current_best_block) = self.current_best_block {
554 let is_in_pruned_list =
556 pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
557 if is_in_pruned_list {
558 self.current_best_block = Some(last_finalized);
559 events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
560 best_block_hash: last_finalized,
561 }));
562 } else {
563 let ancestor = sp_blockchain::lowest_common_ancestor(
570 &*self.client,
571 last_finalized,
572 current_best_block,
573 )?;
574 let is_descendant = ancestor.hash == last_finalized;
575 if !is_descendant {
576 self.current_best_block = Some(last_finalized);
577 events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
578 best_block_hash: last_finalized,
579 }));
580 }
581 }
582 }
583
584 events.push(finalized_event);
585 Ok(events)
586 }
587
588 async fn submit_events<EventStream>(
591 &mut self,
592 startup_point: &StartupPoint<Block>,
593 mut stream: EventStream,
594 sink: Subscription,
595 rx_stop: oneshot::Receiver<()>,
596 ) -> Result<(), SubscriptionManagementError>
597 where
598 EventStream: Stream<Item = NotificationType<Block>> + Unpin,
599 {
600 let mut stream_item = stream.next();
601
602 let connection_closed = sink.closed();
605 tokio::pin!(connection_closed);
606 let mut stop_event = futures_util::future::select(rx_stop, connection_closed);
607
608 while let Either::Left((Some(event), next_stop_event)) =
609 futures_util::future::select(stream_item, stop_event).await
610 {
611 let events = match event {
612 NotificationType::InitialEvents(events) => Ok(events),
613 NotificationType::NewBlock(notification) =>
614 self.handle_import_blocks(notification, &startup_point),
615 NotificationType::Finalized(notification) =>
616 self.handle_finalized_blocks(notification, &startup_point),
617 NotificationType::MethodResponse(notification) => Ok(vec![notification]),
618 };
619
620 let events = match events {
621 Ok(events) => events,
622 Err(err) => {
623 debug!(
624 target: LOG_TARGET,
625 "[follow][id={:?}] Failed to handle stream notification {:?}",
626 self.sub_id,
627 err
628 );
629 _ = sink.send(&FollowEvent::<String>::Stop).await;
630 return Err(err)
631 },
632 };
633
634 for event in events {
635 if let Err(err) = sink.send(&event).await {
636 debug!(
638 target: LOG_TARGET,
639 "[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
640 );
641
642 let _ = sink.send(&FollowEvent::<String>::Stop).await;
643 return Ok(())
645 }
646 }
647
648 stream_item = stream.next();
649 stop_event = next_stop_event;
650 }
651
652 let _ = sink.send(&FollowEvent::<String>::Stop).await;
657 Ok(())
658 }
659
660 pub async fn generate_events(
662 &mut self,
663 sink: Subscription,
664 sub_data: InsertedSubscriptionData<Block>,
665 ) -> Result<(), SubscriptionManagementError> {
666 let stream_import = self
668 .client
669 .import_notification_stream()
670 .map(|notification| NotificationType::NewBlock(notification));
671
672 let stream_finalized = self
673 .client
674 .finality_notification_stream()
675 .map(|notification| NotificationType::Finalized(notification));
676
677 let stream_responses = sub_data
678 .response_receiver
679 .map(|response| NotificationType::MethodResponse(response));
680
681 let startup_point = StartupPoint::from(self.client.info());
682 let initial_events = match self.generate_init_events(&startup_point) {
683 Ok(blocks) => blocks,
684 Err(err) => {
685 debug!(
686 target: LOG_TARGET,
687 "[follow][id={:?}] Failed to generate the initial events {:?}",
688 self.sub_id,
689 err
690 );
691 let _ = sink.send(&FollowEvent::<String>::Stop).await;
692 return Err(err)
693 },
694 };
695
696 let initial = NotificationType::InitialEvents(initial_events);
697 let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
698 let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
699 let stream = stream::once(futures::future::ready(initial)).chain(merged);
700
701 self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await
702 }
703}