sc_rpc_spec_v2/chain_head/
chain_head_follow.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Implementation of the `chainHead_follow` method.
20
21use crate::chain_head::{
22	chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
23	event::{
24		BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
25		RuntimeVersionEvent,
26	},
27	subscription::{SubscriptionManagement, SubscriptionManagementError},
28};
29use futures::{
30	channel::oneshot,
31	stream::{self, Stream, StreamExt},
32};
33use futures_util::future::Either;
34use log::debug;
35use sc_client_api::{
36	Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
37};
38use sc_rpc::utils::Subscription;
39use schnellru::{ByLength, LruMap};
40use sp_api::CallApiAt;
41use sp_blockchain::{
42	Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
43};
44use sp_runtime::{
45	traits::{Block as BlockT, Header as HeaderT, NumberFor},
46	SaturatedConversion, Saturating,
47};
48use std::{
49	collections::{HashSet, VecDeque},
50	sync::Arc,
51};
52/// The maximum number of finalized blocks provided by the
53/// `Initialized` event.
54const MAX_FINALIZED_BLOCKS: usize = 16;
55
56use super::subscription::InsertedSubscriptionData;
57
58/// Generates the events of the `chainHead_follow` method.
59pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
60	/// Substrate client.
61	client: Arc<Client>,
62	/// Backend of the chain.
63	backend: Arc<BE>,
64	/// Subscriptions handle.
65	sub_handle: SubscriptionManagement<Block, BE>,
66	/// Subscription was started with the runtime updates flag.
67	with_runtime: bool,
68	/// Subscription ID.
69	sub_id: String,
70	/// The best reported block by this subscription.
71	current_best_block: Option<Block::Hash>,
72	/// LRU cache of pruned blocks.
73	pruned_blocks: LruMap<Block::Hash, ()>,
74	/// Stop all subscriptions if the distance between the leaves and the current finalized
75	/// block is larger than this value.
76	max_lagging_distance: usize,
77}
78
79impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
80	/// Create a new [`ChainHeadFollower`].
81	pub fn new(
82		client: Arc<Client>,
83		backend: Arc<BE>,
84		sub_handle: SubscriptionManagement<Block, BE>,
85		with_runtime: bool,
86		sub_id: String,
87		max_lagging_distance: usize,
88	) -> Self {
89		Self {
90			client,
91			backend,
92			sub_handle,
93			with_runtime,
94			sub_id,
95			current_best_block: None,
96			pruned_blocks: LruMap::new(ByLength::new(
97				MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
98			)),
99			max_lagging_distance,
100		}
101	}
102}
103
104/// A block notification.
105enum NotificationType<Block: BlockT> {
106	/// The initial events generated from the node's memory.
107	InitialEvents(Vec<FollowEvent<Block::Hash>>),
108	/// The new block notification obtained from `import_notification_stream`.
109	NewBlock(BlockImportNotification<Block>),
110	/// The finalized block notification obtained from `finality_notification_stream`.
111	Finalized(FinalityNotification<Block>),
112	/// The response of `chainHead` method calls.
113	MethodResponse(FollowEvent<Block::Hash>),
114}
115
116/// The initial blocks that should be reported or ignored by the chainHead.
117#[derive(Clone, Debug)]
118struct InitialBlocks<Block: BlockT> {
119	/// Children of the latest finalized block, for which the `NewBlock`
120	/// event must be generated.
121	///
122	/// It is a tuple of (block hash, parent hash).
123	finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
124	/// Hashes of the last finalized blocks
125	finalized_block_hashes: VecDeque<Block::Hash>,
126	/// Blocks that should not be reported as pruned by the `Finalized` event.
127	///
128	/// Substrate database will perform the pruning of height N at
129	/// the finalization N + 1. We could have the following block tree
130	/// when the user subscribes to the `follow` method:
131	///   [A] - [A1] - [A2] - [A3]
132	///                 ^^ finalized
133	///       - [A1] - [B1]
134	///
135	/// When the A3 block is finalized, B1 is reported as pruned, however
136	/// B1 was never reported as `NewBlock` (and as such was never pinned).
137	/// This is because the `NewBlock` events are generated for children of
138	/// the finalized hash.
139	pruned_forks: HashSet<Block::Hash>,
140}
141
142/// The startup point from which chainHead started to generate events.
143struct StartupPoint<Block: BlockT> {
144	/// Best block hash.
145	pub best_hash: Block::Hash,
146	/// The head of the finalized chain.
147	pub finalized_hash: Block::Hash,
148	/// Last finalized block number.
149	pub finalized_number: NumberFor<Block>,
150}
151
152impl<Block: BlockT> From<Info<Block>> for StartupPoint<Block> {
153	fn from(info: Info<Block>) -> Self {
154		StartupPoint::<Block> {
155			best_hash: info.best_hash,
156			finalized_hash: info.finalized_hash,
157			finalized_number: info.finalized_number,
158		}
159	}
160}
161
162impl<BE, Block, Client> ChainHeadFollower<BE, Block, Client>
163where
164	Block: BlockT + 'static,
165	BE: Backend<Block> + 'static,
166	Client: BlockBackend<Block>
167		+ HeaderBackend<Block>
168		+ HeaderMetadata<Block, Error = BlockChainError>
169		+ BlockchainEvents<Block>
170		+ CallApiAt<Block>
171		+ 'static,
172{
173	/// Conditionally generate the runtime event of the given block.
174	fn generate_runtime_event(
175		&self,
176		block: Block::Hash,
177		parent: Option<Block::Hash>,
178	) -> Option<RuntimeEvent> {
179		// No runtime versions should be reported.
180		if !self.with_runtime {
181			return None
182		}
183
184		let block_rt = match self.client.runtime_version_at(block) {
185			Ok(rt) => rt,
186			Err(err) => return Some(err.into()),
187		};
188
189		let parent = match parent {
190			Some(parent) => parent,
191			// Nothing to compare against, always report.
192			None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() })),
193		};
194
195		let parent_rt = match self.client.runtime_version_at(parent) {
196			Ok(rt) => rt,
197			Err(err) => return Some(err.into()),
198		};
199
200		// Report the runtime version change.
201		if block_rt != parent_rt {
202			Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() }))
203		} else {
204			None
205		}
206	}
207
208	/// Check the distance between the provided blocks does not exceed a
209	/// a reasonable range.
210	///
211	/// When the blocks are too far apart (potentially millions of blocks):
212	///  - Tree route is expensive to calculate.
213	///  - The RPC layer will not be able to generate the `NewBlock` events for all blocks.
214	///
215	/// This edge-case can happen for parachains where the relay chain syncs slower to
216	/// the head of the chain than the parachain node that is synced already.
217	fn distace_within_reason(
218		&self,
219		block: Block::Hash,
220		finalized: Block::Hash,
221	) -> Result<(), SubscriptionManagementError> {
222		let Some(block_num) = self.client.number(block)? else {
223			return Err(SubscriptionManagementError::BlockHashAbsent)
224		};
225		let Some(finalized_num) = self.client.number(finalized)? else {
226			return Err(SubscriptionManagementError::BlockHashAbsent)
227		};
228
229		let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
230		if distance > self.max_lagging_distance {
231			return Err(SubscriptionManagementError::BlockDistanceTooLarge);
232		}
233
234		Ok(())
235	}
236
237	/// Get the in-memory blocks of the client, starting from the provided finalized hash.
238	///
239	/// The reported blocks are pinned by this function.
240	fn get_init_blocks_with_forks(
241		&self,
242		finalized: Block::Hash,
243	) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
244		let blockchain = self.backend.blockchain();
245		let leaves = blockchain.leaves()?;
246		let mut pruned_forks = HashSet::new();
247		let mut finalized_block_descendants = Vec::new();
248		let mut unique_descendants = HashSet::new();
249
250		// Ensure all leaves are within a reasonable distance from the finalized block,
251		// before traversing the tree.
252		for leaf in &leaves {
253			self.distace_within_reason(*leaf, finalized)?;
254		}
255
256		for leaf in leaves {
257			let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;
258
259			let blocks = tree_route.enacted().iter().map(|block| block.hash);
260			if !tree_route.retracted().is_empty() {
261				pruned_forks.extend(blocks);
262			} else {
263				// Ensure a `NewBlock` event is generated for all children of the
264				// finalized block. Describe the tree route as (child_node, parent_node)
265				// Note: the order of elements matters here.
266				let mut parent = finalized;
267				for child in blocks {
268					let pair = (child, parent);
269
270					if unique_descendants.insert(pair) {
271						// The finalized block is pinned below.
272						self.sub_handle.pin_block(&self.sub_id, child)?;
273						finalized_block_descendants.push(pair);
274					}
275
276					parent = child;
277				}
278			}
279		}
280
281		let mut current_block = finalized;
282		// The header of the finalized block must not be pruned.
283		let Some(header) = blockchain.header(current_block)? else {
284			return Err(SubscriptionManagementError::BlockHeaderAbsent);
285		};
286
287		// Report at most `MAX_FINALIZED_BLOCKS`. Note: The node might not have that many blocks.
288		let mut finalized_block_hashes = VecDeque::with_capacity(MAX_FINALIZED_BLOCKS);
289
290		// Pin the finalized block.
291		self.sub_handle.pin_block(&self.sub_id, current_block)?;
292		finalized_block_hashes.push_front(current_block);
293		current_block = *header.parent_hash();
294
295		for _ in 0..MAX_FINALIZED_BLOCKS - 1 {
296			let Ok(Some(header)) = blockchain.header(current_block) else { break };
297			// Block cannot be reported if pinning fails.
298			if self.sub_handle.pin_block(&self.sub_id, current_block).is_err() {
299				break
300			};
301
302			finalized_block_hashes.push_front(current_block);
303			current_block = *header.parent_hash();
304		}
305
306		Ok(InitialBlocks { finalized_block_descendants, finalized_block_hashes, pruned_forks })
307	}
308
309	/// Generate the initial events reported by the RPC `follow` method.
310	///
311	/// Returns the initial events that should be reported directly.
312	fn generate_init_events(
313		&mut self,
314		startup_point: &StartupPoint<Block>,
315	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
316		let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;
317
318		// The initialized event is the first one sent.
319		let initial_blocks = init.finalized_block_descendants;
320		let finalized_block_hashes = init.finalized_block_hashes;
321		// These are the pruned blocks that we should not report again.
322		for pruned in init.pruned_forks {
323			self.pruned_blocks.insert(pruned, ());
324		}
325
326		let finalized_block_hash = startup_point.finalized_hash;
327		let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
328
329		let initialized_event = FollowEvent::Initialized(Initialized {
330			finalized_block_hashes: finalized_block_hashes.into(),
331			finalized_block_runtime,
332			with_runtime: self.with_runtime,
333		});
334
335		let mut finalized_block_descendants = Vec::with_capacity(initial_blocks.len() + 1);
336
337		finalized_block_descendants.push(initialized_event);
338		for (child, parent) in initial_blocks.into_iter() {
339			let new_runtime = self.generate_runtime_event(child, Some(parent));
340
341			let event = FollowEvent::NewBlock(NewBlock {
342				block_hash: child,
343				parent_block_hash: parent,
344				new_runtime,
345				with_runtime: self.with_runtime,
346			});
347
348			finalized_block_descendants.push(event);
349		}
350
351		// Generate a new best block event.
352		let best_block_hash = startup_point.best_hash;
353		if best_block_hash != finalized_block_hash {
354			let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
355			self.current_best_block = Some(best_block_hash);
356			finalized_block_descendants.push(best_block);
357		};
358
359		Ok(finalized_block_descendants)
360	}
361
362	/// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for the
363	/// given block hash.
364	fn generate_import_events(
365		&mut self,
366		block_hash: Block::Hash,
367		parent_block_hash: Block::Hash,
368		is_best_block: bool,
369	) -> Vec<FollowEvent<Block::Hash>> {
370		let new_runtime = self.generate_runtime_event(block_hash, Some(parent_block_hash));
371
372		let new_block = FollowEvent::NewBlock(NewBlock {
373			block_hash,
374			parent_block_hash,
375			new_runtime,
376			with_runtime: self.with_runtime,
377		});
378
379		if !is_best_block {
380			return vec![new_block]
381		}
382
383		// If this is the new best block, then we need to generate two events.
384		let best_block_event =
385			FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });
386
387		match self.current_best_block {
388			Some(block_cache) => {
389				// The RPC layer has not reported this block as best before.
390				// Note: This handles the race with the finalized branch.
391				if block_cache != block_hash {
392					self.current_best_block = Some(block_hash);
393					vec![new_block, best_block_event]
394				} else {
395					vec![new_block]
396				}
397			},
398			None => {
399				self.current_best_block = Some(block_hash);
400				vec![new_block, best_block_event]
401			},
402		}
403	}
404
405	/// Handle the import of new blocks by generating the appropriate events.
406	fn handle_import_blocks(
407		&mut self,
408		notification: BlockImportNotification<Block>,
409		startup_point: &StartupPoint<Block>,
410	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
411		// The block was already pinned by the initial block events or by the finalized event.
412		if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? {
413			return Ok(Default::default())
414		}
415
416		// Ensure we are only reporting blocks after the starting point.
417		if *notification.header.number() < startup_point.finalized_number {
418			return Ok(Default::default())
419		}
420
421		Ok(self.generate_import_events(
422			notification.hash,
423			*notification.header.parent_hash(),
424			notification.is_new_best,
425		))
426	}
427
428	/// Generates new block events from the given finalized hashes.
429	///
430	/// It may be possible that the `Finalized` event fired before the `NewBlock`
431	/// event. Only in that case we generate:
432	/// - `NewBlock` event for all finalized hashes.
433	/// - `BestBlock` event for the last finalized hash.
434	///
435	/// This function returns an empty list if all finalized hashes were already reported
436	/// and are pinned.
437	fn generate_finalized_events(
438		&mut self,
439		finalized_block_hashes: &[Block::Hash],
440	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
441		let mut events = Vec::new();
442
443		// Nothing to be done if no finalized hashes are provided.
444		let Some(first_hash) = finalized_block_hashes.get(0) else { return Ok(Default::default()) };
445
446		// Find the parent header.
447		let Some(first_header) = self.client.header(*first_hash)? else {
448			return Err(SubscriptionManagementError::BlockHeaderAbsent)
449		};
450
451		let parents =
452			std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
453		for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
454			// Check if the block was already reported and thus, is already pinned.
455			if !self.sub_handle.pin_block(&self.sub_id, *hash)? {
456				continue
457			}
458
459			// Generate `NewBlock` events for all blocks beside the last block in the list
460			let is_last = i + 1 == finalized_block_hashes.len();
461			if !is_last {
462				// Generate only the `NewBlock` event for this block.
463				events.extend(self.generate_import_events(*hash, *parent, false));
464				continue;
465			}
466
467			if let Some(best_block_hash) = self.current_best_block {
468				let ancestor =
469					sp_blockchain::lowest_common_ancestor(&*self.client, *hash, best_block_hash)?;
470
471				// If we end up here and the `best_block` is a descendent of the finalized block
472				// (last block in the list), it means that there were skipped notifications.
473				// Otherwise `pin_block` would had returned `false`.
474				//
475				// When the node falls out of sync and then syncs up to the tip of the chain, it can
476				// happen that we skip notifications. Then it is better to terminate the connection
477				// instead of trying to send notifications for all missed blocks.
478				if ancestor.hash == *hash {
479					return Err(SubscriptionManagementError::Custom(
480						"A descendent of the finalized block was already reported".into(),
481					))
482				}
483			}
484
485			// Let's generate the `NewBlock` and `NewBestBlock` events for the block.
486			events.extend(self.generate_import_events(*hash, *parent, true))
487		}
488
489		Ok(events)
490	}
491
492	/// Get all pruned block hashes from the provided stale heads.
493	fn get_pruned_hashes(
494		&mut self,
495		stale_heads: &[Block::Hash],
496		last_finalized: Block::Hash,
497	) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
498		let blockchain = self.backend.blockchain();
499		let mut pruned = Vec::new();
500
501		for stale_head in stale_heads {
502			let tree_route = sp_blockchain::tree_route(blockchain, last_finalized, *stale_head)?;
503
504			// Collect only blocks that are not part of the canonical chain.
505			pruned.extend(tree_route.enacted().iter().filter_map(|block| {
506				if self.pruned_blocks.get(&block.hash).is_some() {
507					// The block was already reported as pruned.
508					return None
509				}
510
511				self.pruned_blocks.insert(block.hash, ());
512				Some(block.hash)
513			}))
514		}
515
516		Ok(pruned)
517	}
518
519	/// Handle the finalization notification by generating the `Finalized` event.
520	///
521	/// If the block of the notification was not reported yet, this method also
522	/// generates the events similar to `handle_import_blocks`.
523	fn handle_finalized_blocks(
524		&mut self,
525		notification: FinalityNotification<Block>,
526		startup_point: &StartupPoint<Block>,
527	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
528		let last_finalized = notification.hash;
529
530		// Ensure we are only reporting blocks after the starting point.
531		if *notification.header.number() < startup_point.finalized_number {
532			return Ok(Default::default())
533		}
534
535		// The tree route contains the exclusive path from the last finalized block to the block
536		// reported by the notification. Ensure the finalized block is also reported.
537		let mut finalized_block_hashes = notification.tree_route.to_vec();
538		finalized_block_hashes.push(last_finalized);
539
540		// If the finalized hashes were not reported yet, generate the `NewBlock` events.
541		let mut events = self.generate_finalized_events(&finalized_block_hashes)?;
542
543		// Report all pruned blocks from the notification that are not
544		// part of the fork we need to ignore.
545		let pruned_block_hashes =
546			self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;
547
548		let finalized_event = FollowEvent::Finalized(Finalized {
549			finalized_block_hashes,
550			pruned_block_hashes: pruned_block_hashes.clone(),
551		});
552
553		if let Some(current_best_block) = self.current_best_block {
554			// We need to generate a new best block if the best block is in the pruned list.
555			let is_in_pruned_list =
556				pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
557			if is_in_pruned_list {
558				self.current_best_block = Some(last_finalized);
559				events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
560					best_block_hash: last_finalized,
561				}));
562			} else {
563				// The pruning logic ensures that when the finalized block is announced,
564				// all blocks on forks that have the common ancestor lower or equal
565				// to the finalized block are reported.
566				//
567				// However, we double check if the best block is a descendant of the last finalized
568				// block to ensure we don't miss any events.
569				let ancestor = sp_blockchain::lowest_common_ancestor(
570					&*self.client,
571					last_finalized,
572					current_best_block,
573				)?;
574				let is_descendant = ancestor.hash == last_finalized;
575				if !is_descendant {
576					self.current_best_block = Some(last_finalized);
577					events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
578						best_block_hash: last_finalized,
579					}));
580				}
581			}
582		}
583
584		events.push(finalized_event);
585		Ok(events)
586	}
587
588	/// Submit the events from the provided stream to the RPC client
589	/// for as long as the `rx_stop` event was not called.
590	async fn submit_events<EventStream>(
591		&mut self,
592		startup_point: &StartupPoint<Block>,
593		mut stream: EventStream,
594		sink: Subscription,
595		rx_stop: oneshot::Receiver<()>,
596	) -> Result<(), SubscriptionManagementError>
597	where
598		EventStream: Stream<Item = NotificationType<Block>> + Unpin,
599	{
600		let mut stream_item = stream.next();
601
602		// The stop event can be triggered by the chainHead logic when the pinned
603		// block guarantee cannot be hold. Or when the client is disconnected.
604		let connection_closed = sink.closed();
605		tokio::pin!(connection_closed);
606		let mut stop_event = futures_util::future::select(rx_stop, connection_closed);
607
608		while let Either::Left((Some(event), next_stop_event)) =
609			futures_util::future::select(stream_item, stop_event).await
610		{
611			let events = match event {
612				NotificationType::InitialEvents(events) => Ok(events),
613				NotificationType::NewBlock(notification) =>
614					self.handle_import_blocks(notification, &startup_point),
615				NotificationType::Finalized(notification) =>
616					self.handle_finalized_blocks(notification, &startup_point),
617				NotificationType::MethodResponse(notification) => Ok(vec![notification]),
618			};
619
620			let events = match events {
621				Ok(events) => events,
622				Err(err) => {
623					debug!(
624						target: LOG_TARGET,
625						"[follow][id={:?}] Failed to handle stream notification {:?}",
626						self.sub_id,
627						err
628					);
629					_ = sink.send(&FollowEvent::<String>::Stop).await;
630					return Err(err)
631				},
632			};
633
634			for event in events {
635				if let Err(err) = sink.send(&event).await {
636					// Failed to submit event.
637					debug!(
638						target: LOG_TARGET,
639						"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
640					);
641
642					let _ = sink.send(&FollowEvent::<String>::Stop).await;
643					// No need to propagate this error further, the client disconnected.
644					return Ok(())
645				}
646			}
647
648			stream_item = stream.next();
649			stop_event = next_stop_event;
650		}
651
652		// If we got here either:
653		// - the substrate streams have closed
654		// - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee)
655		// - the client disconnected.
656		let _ = sink.send(&FollowEvent::<String>::Stop).await;
657		Ok(())
658	}
659
660	/// Generate the block events for the `chainHead_follow` method.
661	pub async fn generate_events(
662		&mut self,
663		sink: Subscription,
664		sub_data: InsertedSubscriptionData<Block>,
665	) -> Result<(), SubscriptionManagementError> {
666		// Register for the new block and finalized notifications.
667		let stream_import = self
668			.client
669			.import_notification_stream()
670			.map(|notification| NotificationType::NewBlock(notification));
671
672		let stream_finalized = self
673			.client
674			.finality_notification_stream()
675			.map(|notification| NotificationType::Finalized(notification));
676
677		let stream_responses = sub_data
678			.response_receiver
679			.map(|response| NotificationType::MethodResponse(response));
680
681		let startup_point = StartupPoint::from(self.client.info());
682		let initial_events = match self.generate_init_events(&startup_point) {
683			Ok(blocks) => blocks,
684			Err(err) => {
685				debug!(
686					target: LOG_TARGET,
687					"[follow][id={:?}] Failed to generate the initial events {:?}",
688					self.sub_id,
689					err
690				);
691				let _ = sink.send(&FollowEvent::<String>::Stop).await;
692				return Err(err)
693			},
694		};
695
696		let initial = NotificationType::InitialEvents(initial_events);
697		let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
698		let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
699		let stream = stream::once(futures::future::ready(initial)).chain(merged);
700
701		self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await
702	}
703}