referrerpolicy=no-referrer-when-downgrade

sc_rpc_spec_v2/chain_head/
chain_head_follow.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Implementation of the `chainHead_follow` method.
20
21use crate::chain_head::{
22	chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
23	event::{
24		BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
25		RuntimeVersionEvent,
26	},
27	subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError},
28};
29use futures::{
30	channel::oneshot,
31	stream::{self, Stream, StreamExt, TryStreamExt},
32};
33use log::debug;
34use sc_client_api::{
35	Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
36};
37use sc_rpc::utils::Subscription;
38use schnellru::{ByLength, LruMap};
39use sp_api::CallApiAt;
40use sp_blockchain::{
41	Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
42};
43use sp_runtime::{
44	traits::{Block as BlockT, Header as HeaderT, NumberFor},
45	SaturatedConversion, Saturating,
46};
47use std::{
48	collections::{HashSet, VecDeque},
49	sync::Arc,
50};
51/// The maximum number of finalized blocks provided by the
52/// `Initialized` event.
53const MAX_FINALIZED_BLOCKS: usize = 16;
54
55/// Generates the events of the `chainHead_follow` method.
56pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
57	/// Substrate client.
58	client: Arc<Client>,
59	/// Backend of the chain.
60	backend: Arc<BE>,
61	/// Subscriptions handle.
62	sub_handle: SubscriptionManagement<Block, BE>,
63	/// Subscription was started with the runtime updates flag.
64	with_runtime: bool,
65	/// Subscription ID.
66	sub_id: String,
67	/// The best reported block by this subscription.
68	current_best_block: Option<Block::Hash>,
69	/// LRU cache of pruned blocks.
70	pruned_blocks: LruMap<Block::Hash, ()>,
71	/// LRU cache of announced blocks.
72	announced_blocks: AnnouncedBlocks<Block>,
73	/// Stop all subscriptions if the distance between the leaves and the current finalized
74	/// block is larger than this value.
75	max_lagging_distance: usize,
76	/// The maximum number of pending messages per subscription.
77	pub subscription_buffer_cap: usize,
78}
79
80struct AnnouncedBlocks<Block: BlockT> {
81	/// Unfinalized blocks.
82	blocks: LruMap<Block::Hash, ()>,
83	/// Finalized blocks.
84	finalized: MostRecentFinalizedBlocks<Block>,
85}
86
87/// Wrapper over LRU to efficiently lookup hashes and remove elements as FIFO queue.
88///
89/// For the finalized blocks we use `peek` to avoid moving the block counter to the front.
90/// This effectively means that the LRU acts as a FIFO queue. Otherwise, we might
91/// end up with scenarios where the "finalized block" in the end of LRU is overwritten which
92/// may not necessarily be the oldest finalized block i.e, possible that "get" promotes an
93/// older finalized block because it was accessed more recently.
94struct MostRecentFinalizedBlocks<Block: BlockT>(LruMap<Block::Hash, ()>);
95
96impl<Block: BlockT> MostRecentFinalizedBlocks<Block> {
97	/// Insert the finalized block hash into the LRU cache.
98	fn insert(&mut self, block: Block::Hash) {
99		self.0.insert(block, ());
100	}
101
102	/// Check if the block is contained in the LRU cache.
103	fn contains(&mut self, block: &Block::Hash) -> Option<&()> {
104		self.0.peek(block)
105	}
106}
107
108impl<Block: BlockT> AnnouncedBlocks<Block> {
109	/// Creates a new `AnnouncedBlocks`.
110	fn new() -> Self {
111		Self {
112			// The total number of pinned blocks is `MAX_PINNED_BLOCKS`, ensure we don't
113			// exceed the limit.
114			blocks: LruMap::new(ByLength::new((MAX_PINNED_BLOCKS - MAX_FINALIZED_BLOCKS) as u32)),
115			// We are keeping a smaller number of announced finalized blocks in memory.
116			// This is because the `Finalized` event might be triggered before the `NewBlock` event.
117			finalized: MostRecentFinalizedBlocks(LruMap::new(ByLength::new(
118				MAX_FINALIZED_BLOCKS as u32,
119			))),
120		}
121	}
122
123	/// Insert the block into the announced blocks.
124	fn insert(&mut self, block: Block::Hash, finalized: bool) {
125		if finalized {
126			// When a block is declared as finalized, it is removed from the unfinalized blocks.
127			//
128			// Given that the finalized blocks are bounded to `MAX_FINALIZED_BLOCKS`,
129			// this ensures we keep the minimum number of blocks in memory.
130			self.blocks.remove(&block);
131			self.finalized.insert(block);
132		} else {
133			self.blocks.insert(block, ());
134		}
135	}
136
137	/// Check if the block was previously announced.
138	fn was_announced(&mut self, block: &Block::Hash) -> bool {
139		self.blocks.get(block).is_some() || self.finalized.contains(block).is_some()
140	}
141}
142
143impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
144	/// Create a new [`ChainHeadFollower`].
145	pub fn new(
146		client: Arc<Client>,
147		backend: Arc<BE>,
148		sub_handle: SubscriptionManagement<Block, BE>,
149		with_runtime: bool,
150		sub_id: String,
151		max_lagging_distance: usize,
152		subscription_buffer_cap: usize,
153	) -> Self {
154		Self {
155			client,
156			backend,
157			sub_handle,
158			with_runtime,
159			sub_id,
160			current_best_block: None,
161			pruned_blocks: LruMap::new(ByLength::new(
162				MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
163			)),
164			announced_blocks: AnnouncedBlocks::new(),
165			max_lagging_distance,
166			subscription_buffer_cap,
167		}
168	}
169}
170
171/// A block notification.
172enum NotificationType<Block: BlockT> {
173	/// The initial events generated from the node's memory.
174	InitialEvents(Vec<FollowEvent<Block::Hash>>),
175	/// The new block notification obtained from `import_notification_stream`.
176	NewBlock(BlockImportNotification<Block>),
177	/// The finalized block notification obtained from `finality_notification_stream`.
178	Finalized(FinalityNotification<Block>),
179	/// The response of `chainHead` method calls.
180	MethodResponse(FollowEvent<Block::Hash>),
181}
182
183/// The initial blocks that should be reported or ignored by the chainHead.
184#[derive(Clone, Debug)]
185struct InitialBlocks<Block: BlockT> {
186	/// Children of the latest finalized block, for which the `NewBlock`
187	/// event must be generated.
188	///
189	/// It is a tuple of (block hash, parent hash).
190	finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
191	/// Hashes of the last finalized blocks
192	finalized_block_hashes: VecDeque<Block::Hash>,
193	/// Blocks that should not be reported as pruned by the `Finalized` event.
194	///
195	/// Substrate database will perform the pruning of height N at
196	/// the finalization N + 1. We could have the following block tree
197	/// when the user subscribes to the `follow` method:
198	///   [A] - [A1] - [A2] - [A3]
199	///                 ^^ finalized
200	///       - [A1] - [B1]
201	///
202	/// When the A3 block is finalized, B1 is reported as pruned, however
203	/// B1 was never reported as `NewBlock` (and as such was never pinned).
204	/// This is because the `NewBlock` events are generated for children of
205	/// the finalized hash.
206	pruned_forks: HashSet<Block::Hash>,
207}
208
209/// The startup point from which chainHead started to generate events.
210struct StartupPoint<Block: BlockT> {
211	/// Best block hash.
212	pub best_hash: Block::Hash,
213	/// The head of the finalized chain.
214	pub finalized_hash: Block::Hash,
215	/// Last finalized block number.
216	pub finalized_number: NumberFor<Block>,
217}
218
219impl<Block: BlockT> From<Info<Block>> for StartupPoint<Block> {
220	fn from(info: Info<Block>) -> Self {
221		StartupPoint::<Block> {
222			best_hash: info.best_hash,
223			finalized_hash: info.finalized_hash,
224			finalized_number: info.finalized_number,
225		}
226	}
227}
228
229impl<BE, Block, Client> ChainHeadFollower<BE, Block, Client>
230where
231	Block: BlockT + 'static,
232	BE: Backend<Block> + 'static,
233	Client: BlockBackend<Block>
234		+ HeaderBackend<Block>
235		+ HeaderMetadata<Block, Error = BlockChainError>
236		+ BlockchainEvents<Block>
237		+ CallApiAt<Block>
238		+ 'static,
239{
240	/// Conditionally generate the runtime event of the given block.
241	fn generate_runtime_event(
242		&self,
243		block: Block::Hash,
244		parent: Option<Block::Hash>,
245	) -> Option<RuntimeEvent> {
246		// No runtime versions should be reported.
247		if !self.with_runtime {
248			return None
249		}
250
251		let block_rt = match self.client.runtime_version_at(block) {
252			Ok(rt) => rt,
253			Err(err) => return Some(err.into()),
254		};
255
256		let parent = match parent {
257			Some(parent) => parent,
258			// Nothing to compare against, always report.
259			None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() })),
260		};
261
262		let parent_rt = match self.client.runtime_version_at(parent) {
263			Ok(rt) => rt,
264			Err(err) => return Some(err.into()),
265		};
266
267		// Report the runtime version change.
268		if block_rt != parent_rt {
269			Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() }))
270		} else {
271			None
272		}
273	}
274
275	/// Check the distance between the provided blocks does not exceed a
276	/// a reasonable range.
277	///
278	/// When the blocks are too far apart (potentially millions of blocks):
279	///  - Tree route is expensive to calculate.
280	///  - The RPC layer will not be able to generate the `NewBlock` events for all blocks.
281	///
282	/// This edge-case can happen for parachains where the relay chain syncs slower to
283	/// the head of the chain than the parachain node that is synced already.
284	fn distance_within_reason(
285		&self,
286		block: Block::Hash,
287		finalized: Block::Hash,
288	) -> Result<(), SubscriptionManagementError> {
289		let Some(block_num) = self.client.number(block)? else {
290			return Err(SubscriptionManagementError::BlockHashAbsent)
291		};
292		let Some(finalized_num) = self.client.number(finalized)? else {
293			return Err(SubscriptionManagementError::BlockHashAbsent)
294		};
295
296		let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
297		if distance > self.max_lagging_distance {
298			return Err(SubscriptionManagementError::BlockDistanceTooLarge);
299		}
300
301		Ok(())
302	}
303
304	/// Get the in-memory blocks of the client, starting from the provided finalized hash.
305	///
306	/// The reported blocks are pinned by this function.
307	fn get_init_blocks_with_forks(
308		&self,
309		finalized: Block::Hash,
310	) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
311		let blockchain = self.backend.blockchain();
312		let leaves = blockchain.leaves()?;
313		let mut pruned_forks = HashSet::new();
314		let mut finalized_block_descendants = Vec::new();
315		let mut unique_descendants = HashSet::new();
316
317		// Ensure all leaves are within a reasonable distance from the finalized block,
318		// before traversing the tree.
319		for leaf in &leaves {
320			self.distance_within_reason(*leaf, finalized)?;
321		}
322
323		for leaf in leaves {
324			let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;
325
326			let blocks = tree_route.enacted().iter().map(|block| block.hash);
327			if !tree_route.retracted().is_empty() {
328				pruned_forks.extend(blocks);
329			} else {
330				// Ensure a `NewBlock` event is generated for all children of the
331				// finalized block. Describe the tree route as (child_node, parent_node)
332				// Note: the order of elements matters here.
333				let mut parent = finalized;
334				for child in blocks {
335					let pair = (child, parent);
336
337					if unique_descendants.insert(pair) {
338						// The finalized block is pinned below.
339						self.sub_handle.pin_block(&self.sub_id, child)?;
340						finalized_block_descendants.push(pair);
341					}
342
343					parent = child;
344				}
345			}
346		}
347
348		let mut current_block = finalized;
349		// The header of the finalized block must not be pruned.
350		let Some(header) = blockchain.header(current_block)? else {
351			return Err(SubscriptionManagementError::BlockHeaderAbsent);
352		};
353
354		// Report at most `MAX_FINALIZED_BLOCKS`. Note: The node might not have that many blocks.
355		let mut finalized_block_hashes = VecDeque::with_capacity(MAX_FINALIZED_BLOCKS);
356
357		// Pin the finalized block.
358		self.sub_handle.pin_block(&self.sub_id, current_block)?;
359		finalized_block_hashes.push_front(current_block);
360		current_block = *header.parent_hash();
361
362		for _ in 0..MAX_FINALIZED_BLOCKS - 1 {
363			let Ok(Some(header)) = blockchain.header(current_block) else { break };
364			// Block cannot be reported if pinning fails.
365			if self.sub_handle.pin_block(&self.sub_id, current_block).is_err() {
366				break
367			};
368
369			finalized_block_hashes.push_front(current_block);
370			current_block = *header.parent_hash();
371		}
372
373		Ok(InitialBlocks { finalized_block_descendants, finalized_block_hashes, pruned_forks })
374	}
375
376	/// Generate the initial events reported by the RPC `follow` method.
377	///
378	/// Returns the initial events that should be reported directly.
379	fn generate_init_events(
380		&mut self,
381		startup_point: &StartupPoint<Block>,
382	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
383		let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;
384
385		// The initialized event is the first one sent.
386		let initial_blocks = init.finalized_block_descendants;
387		let finalized_block_hashes = init.finalized_block_hashes;
388		// These are the pruned blocks that we should not report again.
389		for pruned in init.pruned_forks {
390			self.pruned_blocks.insert(pruned, ());
391		}
392
393		let finalized_block_hash = startup_point.finalized_hash;
394		let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
395
396		for finalized in &finalized_block_hashes {
397			self.announced_blocks.insert(*finalized, true);
398		}
399
400		let initialized_event = FollowEvent::Initialized(Initialized {
401			finalized_block_hashes: finalized_block_hashes.into(),
402			finalized_block_runtime,
403			with_runtime: self.with_runtime,
404		});
405
406		let mut finalized_block_descendants = Vec::with_capacity(initial_blocks.len() + 1);
407
408		finalized_block_descendants.push(initialized_event);
409		for (child, parent) in initial_blocks.into_iter() {
410			// If the parent was not announced we have a gap currently.
411			// This can happen during a WarpSync.
412			if !self.announced_blocks.was_announced(&parent) {
413				return Err(SubscriptionManagementError::BlockHeaderAbsent);
414			}
415			self.announced_blocks.insert(child, false);
416
417			let new_runtime = self.generate_runtime_event(child, Some(parent));
418
419			let event = FollowEvent::NewBlock(NewBlock {
420				block_hash: child,
421				parent_block_hash: parent,
422				new_runtime,
423				with_runtime: self.with_runtime,
424			});
425
426			finalized_block_descendants.push(event);
427		}
428
429		// Generate a new best block event.
430		let best_block_hash = startup_point.best_hash;
431		if best_block_hash != finalized_block_hash {
432			if !self.announced_blocks.was_announced(&best_block_hash) {
433				return Err(SubscriptionManagementError::BlockHeaderAbsent);
434			}
435			self.announced_blocks.insert(best_block_hash, true);
436
437			let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
438			self.current_best_block = Some(best_block_hash);
439			finalized_block_descendants.push(best_block);
440		};
441
442		Ok(finalized_block_descendants)
443	}
444
445	/// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for the
446	/// given block hash.
447	fn generate_import_events(
448		&mut self,
449		block_hash: Block::Hash,
450		parent_block_hash: Block::Hash,
451		is_best_block: bool,
452	) -> Vec<FollowEvent<Block::Hash>> {
453		let new_runtime = self.generate_runtime_event(block_hash, Some(parent_block_hash));
454
455		let new_block = FollowEvent::NewBlock(NewBlock {
456			block_hash,
457			parent_block_hash,
458			new_runtime,
459			with_runtime: self.with_runtime,
460		});
461
462		if !is_best_block {
463			return vec![new_block]
464		}
465
466		// If this is the new best block, then we need to generate two events.
467		let best_block_event =
468			FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });
469
470		match self.current_best_block {
471			Some(block_cache) => {
472				// The RPC layer has not reported this block as best before.
473				// Note: This handles the race with the finalized branch.
474				if block_cache != block_hash {
475					self.current_best_block = Some(block_hash);
476					vec![new_block, best_block_event]
477				} else {
478					vec![new_block]
479				}
480			},
481			None => {
482				self.current_best_block = Some(block_hash);
483				vec![new_block, best_block_event]
484			},
485		}
486	}
487
488	/// Handle the import of new blocks by generating the appropriate events.
489	fn handle_import_blocks(
490		&mut self,
491		notification: BlockImportNotification<Block>,
492		startup_point: &StartupPoint<Block>,
493	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
494		let block_hash = notification.hash;
495
496		// Ensure we are only reporting blocks after the starting point.
497		if *notification.header.number() < startup_point.finalized_number {
498			return Ok(Default::default())
499		}
500
501		// Ensure the block can be pinned before generating the events.
502		if !self.sub_handle.pin_block(&self.sub_id, block_hash)? {
503			// The block is already pinned, this is similar to the check above.
504			//
505			// The `SubscriptionManagement` ensures the block is tracked until (short lived):
506			// - 2 calls to `pin_block` are made (from `Finalized` and `NewBlock` branches).
507			// - the block is unpinned by the user
508			//
509			// This is rather a sanity checks for edge-cases (in theory), where
510			// [`MAX_FINALIZED_BLOCKS` + 1] finalized events are triggered before the `NewBlock`
511			// event of the first `Finalized` event.
512			return Ok(Default::default())
513		}
514
515		if self.announced_blocks.was_announced(&block_hash) {
516			// Block was already reported by the finalized branch.
517			return Ok(Default::default())
518		}
519
520		// Double check the parent hash. If the parent hash is not reported, we have a gap.
521		let parent_block_hash = *notification.header.parent_hash();
522		if !self.announced_blocks.was_announced(&parent_block_hash) {
523			// The parent block was not reported, we have a gap.
524			return Err(SubscriptionManagementError::Custom("Parent block was not reported".into()))
525		}
526
527		self.announced_blocks.insert(block_hash, false);
528		Ok(self.generate_import_events(block_hash, parent_block_hash, notification.is_new_best))
529	}
530
531	/// Generates new block events from the given finalized hashes.
532	///
533	/// It may be possible that the `Finalized` event fired before the `NewBlock`
534	/// event. Only in that case we generate:
535	/// - `NewBlock` event for all finalized hashes.
536	/// - `BestBlock` event for the last finalized hash.
537	///
538	/// This function returns an empty list if all finalized hashes were already reported
539	/// and are pinned.
540	fn generate_finalized_events(
541		&mut self,
542		finalized_block_hashes: &[Block::Hash],
543	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
544		let mut events = Vec::new();
545
546		// Nothing to be done if no finalized hashes are provided.
547		let Some(first_hash) = finalized_block_hashes.get(0) else { return Ok(Default::default()) };
548
549		// Find the parent header.
550		let Some(first_header) = self.client.header(*first_hash)? else {
551			return Err(SubscriptionManagementError::BlockHeaderAbsent)
552		};
553
554		if !self.announced_blocks.was_announced(first_header.parent_hash()) {
555			return Err(SubscriptionManagementError::Custom(
556				"Parent block was not reported for a finalized block".into(),
557			));
558		}
559
560		let parents =
561			std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
562		for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
563			// Ensure the block is pinned before generating the events.
564			self.sub_handle.pin_block(&self.sub_id, *hash)?;
565
566			// Check if the block was already reported.
567			if self.announced_blocks.was_announced(hash) {
568				continue;
569			}
570
571			// Generate `NewBlock` events for all blocks beside the last block in the list
572			let is_last = i + 1 == finalized_block_hashes.len();
573			if !is_last {
574				// Generate only the `NewBlock` event for this block.
575				events.extend(self.generate_import_events(*hash, *parent, false));
576				self.announced_blocks.insert(*hash, true);
577				continue;
578			}
579
580			if let Some(best_block_hash) = self.current_best_block {
581				let ancestor =
582					sp_blockchain::lowest_common_ancestor(&*self.client, *hash, best_block_hash)?;
583
584				// If we end up here and the `best_block` is a descendent of the finalized block
585				// (last block in the list), it means that there were skipped notifications.
586				// Otherwise `pin_block` would had returned `false`.
587				//
588				// When the node falls out of sync and then syncs up to the tip of the chain, it can
589				// happen that we skip notifications. Then it is better to terminate the connection
590				// instead of trying to send notifications for all missed blocks.
591				if ancestor.hash == *hash {
592					return Err(SubscriptionManagementError::Custom(
593						"A descendent of the finalized block was already reported".into(),
594					))
595				}
596			}
597
598			// Let's generate the `NewBlock` and `NewBestBlock` events for the block.
599			events.extend(self.generate_import_events(*hash, *parent, true));
600			self.announced_blocks.insert(*hash, true);
601		}
602
603		Ok(events)
604	}
605
606	/// Get all pruned block hashes from the provided stale heads.
607	fn get_pruned_hashes(
608		&mut self,
609		stale_heads: &[Block::Hash],
610		last_finalized: Block::Hash,
611	) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
612		let blockchain = self.backend.blockchain();
613		let mut pruned = Vec::new();
614
615		for stale_head in stale_heads {
616			let tree_route = sp_blockchain::tree_route(blockchain, last_finalized, *stale_head)?;
617
618			// Collect only blocks that are not part of the canonical chain.
619			pruned.extend(tree_route.enacted().iter().filter_map(|block| {
620				if self.pruned_blocks.get(&block.hash).is_some() {
621					// The block was already reported as pruned.
622					return None
623				}
624
625				self.pruned_blocks.insert(block.hash, ());
626				Some(block.hash)
627			}))
628		}
629
630		Ok(pruned)
631	}
632
633	/// Handle the finalization notification by generating the `Finalized` event.
634	///
635	/// If the block of the notification was not reported yet, this method also
636	/// generates the events similar to `handle_import_blocks`.
637	fn handle_finalized_blocks(
638		&mut self,
639		notification: FinalityNotification<Block>,
640		startup_point: &StartupPoint<Block>,
641	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
642		let last_finalized = notification.hash;
643
644		// Ensure we are only reporting blocks after the starting point.
645		if *notification.header.number() < startup_point.finalized_number {
646			return Ok(Default::default())
647		}
648
649		// The tree route contains the exclusive path from the last finalized block to the block
650		// reported by the notification. Ensure the finalized block is also reported.
651		let mut finalized_block_hashes = notification.tree_route.to_vec();
652		finalized_block_hashes.push(last_finalized);
653
654		// If the finalized hashes were not reported yet, generate the `NewBlock` events.
655		let mut events = self.generate_finalized_events(&finalized_block_hashes)?;
656
657		// Report all pruned blocks from the notification that are not
658		// part of the fork we need to ignore.
659		let pruned_block_hashes =
660			self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;
661
662		for finalized in &finalized_block_hashes {
663			self.announced_blocks.insert(*finalized, true);
664		}
665
666		let finalized_event = FollowEvent::Finalized(Finalized {
667			finalized_block_hashes,
668			pruned_block_hashes: pruned_block_hashes.clone(),
669		});
670
671		if let Some(current_best_block) = self.current_best_block {
672			// We need to generate a new best block if the best block is in the pruned list.
673			let is_in_pruned_list =
674				pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
675			if is_in_pruned_list {
676				self.current_best_block = Some(last_finalized);
677				events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
678					best_block_hash: last_finalized,
679				}));
680			} else {
681				// The pruning logic ensures that when the finalized block is announced,
682				// all blocks on forks that have the common ancestor lower or equal
683				// to the finalized block are reported.
684				//
685				// However, we double check if the best block is a descendant of the last finalized
686				// block to ensure we don't miss any events.
687				let ancestor = sp_blockchain::lowest_common_ancestor(
688					&*self.client,
689					last_finalized,
690					current_best_block,
691				)?;
692				let is_descendant = ancestor.hash == last_finalized;
693				if !is_descendant {
694					self.current_best_block = Some(last_finalized);
695					events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
696						best_block_hash: last_finalized,
697					}));
698				}
699			}
700		}
701
702		events.push(finalized_event);
703		Ok(events)
704	}
705
706	/// Submit the events from the provided stream to the RPC client
707	/// for as long as the `rx_stop` event was not called.
708	async fn submit_events<EventStream>(
709		&mut self,
710		startup_point: &StartupPoint<Block>,
711		stream: EventStream,
712		sink: Subscription,
713		rx_stop: oneshot::Receiver<()>,
714	) -> Result<(), SubscriptionManagementError>
715	where
716		EventStream: Stream<Item = NotificationType<Block>> + Unpin + Send,
717	{
718		let buffer_cap = self.subscription_buffer_cap;
719		// create a channel to propagate error messages
720		let mut handle_events = |event| match event {
721			NotificationType::InitialEvents(events) => Ok(events),
722			NotificationType::NewBlock(notification) =>
723				self.handle_import_blocks(notification, &startup_point),
724			NotificationType::Finalized(notification) =>
725				self.handle_finalized_blocks(notification, &startup_point),
726			NotificationType::MethodResponse(notification) => Ok(vec![notification]),
727		};
728
729		let stream = stream
730			.map(|event| handle_events(event))
731			.map_ok(|items| stream::iter(items).map(Ok))
732			.try_flatten();
733
734		tokio::pin!(stream);
735
736		let sink_future =
737			sink.pipe_from_try_stream(stream, sc_rpc::utils::BoundedVecDeque::new(buffer_cap));
738
739		let result = tokio::select! {
740			_ = rx_stop => Ok(()),
741			result = sink_future => {
742				if let Err(ref e) = result {
743					debug!(
744						target: LOG_TARGET,
745						"[follow][id={:?}] Failed to handle stream notification {:?}",
746						&self.sub_id,
747						e
748					);
749				};
750				result
751			}
752		};
753		let _ = sink.send(&FollowEvent::<String>::Stop).await;
754		result
755	}
756
757	/// Generate the block events for the `chainHead_follow` method.
758	pub async fn generate_events(
759		&mut self,
760		sink: Subscription,
761		sub_data: InsertedSubscriptionData<Block>,
762	) -> Result<(), SubscriptionManagementError> {
763		// Register for the new block and finalized notifications.
764		let stream_import = self
765			.client
766			.import_notification_stream()
767			.map(|notification| NotificationType::NewBlock(notification));
768
769		let stream_finalized = self
770			.client
771			.finality_notification_stream()
772			.map(|notification| NotificationType::Finalized(notification));
773
774		let stream_responses = sub_data
775			.response_receiver
776			.map(|response| NotificationType::MethodResponse(response));
777
778		let startup_point = StartupPoint::from(self.client.info());
779		let initial_events = match self.generate_init_events(&startup_point) {
780			Ok(blocks) => blocks,
781			Err(err) => {
782				debug!(
783					target: LOG_TARGET,
784					"[follow][id={:?}] Failed to generate the initial events {:?}",
785					self.sub_id,
786					err
787				);
788				let _ = sink.send(&FollowEvent::<String>::Stop).await;
789				return Err(err)
790			},
791		};
792
793		let initial = NotificationType::InitialEvents(initial_events);
794		let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
795		let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
796		let stream = stream::once(futures::future::ready(initial)).chain(merged);
797
798		self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await
799	}
800}