referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_common/
parachain_consensus.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use cumulus_relay_chain_streams::{finalized_heads, new_best_heads};
19use sc_client_api::{
20	Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
21};
22use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
23use schnellru::{ByLength, LruMap};
24use sp_blockchain::Error as ClientError;
25use sp_consensus::{BlockOrigin, BlockStatus};
26use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
27
28use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest};
29use cumulus_relay_chain_interface::RelayChainInterface;
30
31use polkadot_primitives::Id as ParaId;
32
33use codec::Decode;
34use futures::{
35	channel::mpsc::{Sender, UnboundedSender},
36	pin_mut, select, FutureExt, SinkExt, Stream, StreamExt,
37};
38use sp_core::traits::SpawnEssentialNamed;
39
40use std::sync::Arc;
41
42const LOG_TARGET: &str = "cumulus-consensus";
43const FINALIZATION_CACHE_SIZE: u32 = 40;
44
45fn handle_new_finalized_head<P, Block, B>(
46	parachain: &Arc<P>,
47	header: Block::Header,
48	last_seen_finalized_hashes: &mut LruMap<Block::Hash, ()>,
49) where
50	Block: BlockT,
51	B: Backend<Block>,
52	P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
53{
54	let hash = header.hash();
55
56	last_seen_finalized_hashes.insert(hash, ());
57
58	// Only finalize if we are below the incoming finalized parachain head
59	if parachain.usage_info().chain.finalized_number < *header.number() {
60		tracing::debug!(
61			target: LOG_TARGET,
62			block_hash = ?hash,
63			"Attempting to finalize header.",
64		);
65		if let Err(e) = parachain.finalize_block(hash, None, true) {
66			match e {
67				ClientError::UnknownBlock(_) => tracing::debug!(
68					target: LOG_TARGET,
69					block_hash = ?hash,
70					"Could not finalize block because it is unknown.",
71				),
72				_ => tracing::warn!(
73					target: LOG_TARGET,
74					error = ?e,
75					block_hash = ?hash,
76					"Failed to finalize block",
77				),
78			}
79		}
80	}
81}
82
83/// Streams finalized parachain heads from the relay chain.
84///
85/// This worker continuously monitors the relay chain for finalized blocks and extracts
86/// the corresponding parachain head data for the given `para_id`. The extracted head
87/// data is sent through the provided channel for consumption by the consensus system.
88///
89/// This is necessary because finalization of blocks can take a long
90/// time. During this blocking operation, we should not keep references to finality notifications,
91/// because that prevents the corresponding blocks from getting pruned.
92pub async fn finalized_head_stream_worker<R: RelayChainInterface + Clone, Block: BlockT>(
93	mut tx: UnboundedSender<Block::Header>,
94	para_id: ParaId,
95	relay_chain: R,
96) {
97	let finalized_heads = match finalized_heads(relay_chain.clone(), para_id).await {
98		Ok(finalized_heads_stream) => finalized_heads_stream.fuse(),
99		Err(err) => {
100			tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
101			return
102		},
103	};
104
105	pin_mut!(finalized_heads);
106	loop {
107		if let Some((head_data, _)) = finalized_heads.next().await {
108			let header = match Block::Header::decode(&mut &head_data[..]) {
109				Ok(header) => header,
110				Err(err) => {
111					tracing::debug!(
112						target: LOG_TARGET,
113						error = ?err,
114						"Could not decode parachain header while following finalized heads.",
115					);
116					continue
117				},
118			};
119			if let Err(e) = tx.send(header).await {
120				tracing::error!(target: LOG_TARGET, ?e, "Error while sending finalized head.");
121				return;
122			};
123		}
124	}
125}
126
127/// Follow the finalized head of the given parachain.
128///
129/// For every finalized block of the relay chain, it will get the included parachain header
130/// corresponding to `para_id` and will finalize it in the parachain.
131async fn follow_finalized_head<P, Block, B>(
132	parachain: Arc<P>,
133	finalized_head_stream: Box<impl Stream<Item = Block::Header> + Unpin + Send>,
134) where
135	Block: BlockT,
136	P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
137	B: Backend<Block>,
138{
139	let mut imported_blocks = parachain.import_notification_stream().fuse();
140	let mut finalized_head_stream = finalized_head_stream.fuse();
141
142	// We use this cache to finalize blocks that are imported late.
143	// For example, a block that has been recovered via PoV-Recovery
144	// on a full node can have several minutes delay. With this cache
145	// we have some "memory" of recently finalized blocks.
146	let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE));
147
148	loop {
149		select! {
150			fin = finalized_head_stream.next() => {
151				match fin {
152					Some(finalized_head) =>
153						handle_new_finalized_head(&parachain, finalized_head, &mut last_seen_finalized_hashes),
154					None => {
155						tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
156						return
157					}
158				}
159			},
160			imported = imported_blocks.next() => {
161				match imported {
162					Some(imported_block) => {
163						// When we see a block import that is already finalized, we immediately finalize it.
164						if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() {
165							tracing::debug!(
166								target: LOG_TARGET,
167								block_hash = ?imported_block.hash,
168								"Setting newly imported block as finalized.",
169							);
170
171							if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) {
172								match e {
173									ClientError::UnknownBlock(_) => tracing::debug!(
174										target: LOG_TARGET,
175										block_hash = ?imported_block.hash,
176										"Could not finalize block because it is unknown.",
177									),
178									_ => tracing::warn!(
179										target: LOG_TARGET,
180										error = ?e,
181										block_hash = ?imported_block.hash,
182										"Failed to finalize block",
183									),
184								}
185							}
186						}
187					},
188					None => {
189						tracing::debug!(
190							target: LOG_TARGET,
191							"Stopping following imported blocks.",
192						);
193						return
194					}
195				}
196			}
197		}
198	}
199}
200
201/// Spawns the essential finalization tasks for parachain consensus.
202///
203/// This function creates and spawns two critical background tasks:
204/// 1. A finalized head stream worker that monitors relay chain finality and extracts included
205///    headers
206/// 2. The main parachain consensus task that handles finalization and best block updates
207pub fn spawn_parachain_consensus_tasks<P, R, Block, B, S>(
208	para_id: ParaId,
209	parachain: Arc<P>,
210	relay_chain: R,
211	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
212	recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
213	spawn_handle: S,
214) where
215	Block: BlockT,
216	P: Finalizer<Block, B>
217		+ UsageProvider<Block>
218		+ Send
219		+ Sync
220		+ BlockBackend<Block>
221		+ BlockchainEvents<Block>
222		+ 'static,
223	for<'a> &'a P: BlockImport<Block>,
224	R: RelayChainInterface + Clone + 'static,
225	S: SpawnEssentialNamed + 'static,
226	B: Backend<Block> + 'static,
227{
228	let (tx, rx) = futures::channel::mpsc::unbounded();
229	let worker = finalized_head_stream_worker::<_, Block>(tx, para_id, relay_chain.clone());
230	let consensus = run_parachain_consensus(
231		para_id,
232		parachain,
233		relay_chain,
234		announce_block,
235		Box::new(rx),
236		recovery_chan_tx,
237	);
238
239	spawn_handle.spawn_essential_blocking("cumulus-consensus", None, Box::pin(consensus));
240	spawn_handle.spawn_essential_blocking(
241		"cumulus-consensus-finality-stream",
242		None,
243		Box::pin(worker),
244	);
245}
246
247/// Run the parachain consensus.
248///
249/// This will follow the given `relay_chain` to act as consensus for the parachain that corresponds
250/// to the given `para_id`. It will set the new best block of the parachain as it gets aware of it.
251/// The same happens for the finalized block.
252///
253/// # Note
254///
255/// This will access the backend of the parachain and thus, this future should be spawned as
256/// blocking task.
257pub async fn run_parachain_consensus<P, R, Block, B>(
258	para_id: ParaId,
259	parachain: Arc<P>,
260	relay_chain: R,
261	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
262	finalized_head_stream: Box<impl Stream<Item = Block::Header> + Unpin + Send>,
263	recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
264) where
265	Block: BlockT,
266	P: Finalizer<Block, B>
267		+ UsageProvider<Block>
268		+ Send
269		+ Sync
270		+ BlockBackend<Block>
271		+ BlockchainEvents<Block>,
272	for<'a> &'a P: BlockImport<Block>,
273	R: RelayChainInterface + Clone,
274	B: Backend<Block>,
275{
276	let follow_new_best = follow_new_best(
277		para_id,
278		parachain.clone(),
279		relay_chain.clone(),
280		announce_block,
281		recovery_chan_tx,
282	);
283	let follow_finalized_head = follow_finalized_head(parachain, finalized_head_stream);
284	select! {
285		_ = follow_new_best.fuse() => {},
286		_ = follow_finalized_head.fuse() => {},
287	}
288}
289
290/// Follow the relay chain new best head, to update the Parachain new best head.
291async fn follow_new_best<P, R, Block, B>(
292	para_id: ParaId,
293	parachain: Arc<P>,
294	relay_chain: R,
295	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
296	mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
297) where
298	Block: BlockT,
299	P: Finalizer<Block, B>
300		+ UsageProvider<Block>
301		+ Send
302		+ Sync
303		+ BlockBackend<Block>
304		+ BlockchainEvents<Block>,
305	for<'a> &'a P: BlockImport<Block>,
306	R: RelayChainInterface + Clone,
307	B: Backend<Block>,
308{
309	let new_best_heads = match new_best_heads(relay_chain, para_id).await {
310		Ok(best_heads_stream) => best_heads_stream.fuse(),
311		Err(err) => {
312			tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
313			return
314		},
315	};
316
317	pin_mut!(new_best_heads);
318
319	let mut imported_blocks = parachain.import_notification_stream().fuse();
320	// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
321	// block before the associated parachain block. In this case we need to wait for this block to
322	// be imported to set it as new best.
323	let mut unset_best_header = None;
324
325	loop {
326		select! {
327			h = new_best_heads.next() => {
328				match h {
329					Some(h) => handle_new_best_parachain_head(
330						h,
331						&*parachain,
332						&mut unset_best_header,
333						recovery_chan_tx.as_mut(),
334					).await,
335					None => {
336						tracing::debug!(
337							target: LOG_TARGET,
338							"Stopping following new best.",
339						);
340						return
341					}
342				}
343			},
344			i = imported_blocks.next() => {
345				match i {
346					Some(i) => handle_new_block_imported(
347						i,
348						&mut unset_best_header,
349						&*parachain,
350						&*announce_block,
351					).await,
352					None => {
353						tracing::debug!(
354							target: LOG_TARGET,
355							"Stopping following imported blocks.",
356						);
357						return
358					}
359				}
360			},
361		}
362	}
363}
364
365/// Handle a new import block of the parachain.
366async fn handle_new_block_imported<Block, P>(
367	notification: BlockImportNotification<Block>,
368	unset_best_header_opt: &mut Option<Block::Header>,
369	parachain: &P,
370	announce_block: &(dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync),
371) where
372	Block: BlockT,
373	P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
374	for<'a> &'a P: BlockImport<Block>,
375{
376	// HACK
377	//
378	// Remove after https://github.com/paritytech/substrate/pull/8052 or similar is merged
379	if notification.origin != BlockOrigin::Own {
380		announce_block(notification.hash, None);
381	}
382
383	let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) {
384		// If this is the new best block or we don't have any unset block, we can end it here.
385		(true, _) | (_, None) => return,
386		(false, Some(ref u)) => u,
387	};
388
389	let unset_hash = if notification.header.number() < unset_best_header.number() {
390		return
391	} else if notification.header.number() == unset_best_header.number() {
392		let unset_hash = unset_best_header.hash();
393
394		if unset_hash != notification.hash {
395			return
396		} else {
397			unset_hash
398		}
399	} else {
400		unset_best_header.hash()
401	};
402
403	match parachain.block_status(unset_hash) {
404		Ok(BlockStatus::InChainWithState) => {
405			let unset_best_header = unset_best_header_opt
406				.take()
407				.expect("We checked above that the value is set; qed");
408			tracing::debug!(
409				target: LOG_TARGET,
410				?unset_hash,
411				"Importing block as new best for parachain.",
412			);
413			import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
414		},
415		state => tracing::debug!(
416			target: LOG_TARGET,
417			?unset_best_header,
418			?notification.header,
419			?state,
420			"Unexpected state for unset best header.",
421		),
422	}
423}
424
425/// Handle the new best parachain head as extracted from the new best relay chain.
426async fn handle_new_best_parachain_head<Block, P>(
427	head: Vec<u8>,
428	parachain: &P,
429	unset_best_header: &mut Option<Block::Header>,
430	mut recovery_chan_tx: Option<&mut Sender<RecoveryRequest<Block>>>,
431) where
432	Block: BlockT,
433	P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
434	for<'a> &'a P: BlockImport<Block>,
435{
436	let parachain_head = match <<Block as BlockT>::Header>::decode(&mut &head[..]) {
437		Ok(header) => header,
438		Err(err) => {
439			tracing::debug!(
440				target: LOG_TARGET,
441				error = ?err,
442				"Could not decode Parachain header while following best heads.",
443			);
444			return
445		},
446	};
447
448	let hash = parachain_head.hash();
449
450	if parachain.usage_info().chain.best_hash == hash {
451		tracing::debug!(
452			target: LOG_TARGET,
453			block_hash = ?hash,
454			"Skipping set new best block, because block is already the best.",
455		);
456		return;
457	}
458
459	// Make sure the block is already known or otherwise we skip setting new best.
460	match parachain.block_status(hash) {
461		Ok(BlockStatus::InChainWithState) => {
462			unset_best_header.take();
463			tracing::debug!(
464				target: LOG_TARGET,
465				included = ?hash,
466				"Importing block as new best for parachain.",
467			);
468			import_block_as_new_best(hash, parachain_head, parachain).await;
469		},
470		Ok(BlockStatus::InChainPruned) => {
471			tracing::error!(
472				target: LOG_TARGET,
473				block_hash = ?hash,
474				"Trying to set pruned block as new best!",
475			);
476		},
477		Ok(BlockStatus::Unknown) => {
478			*unset_best_header = Some(parachain_head);
479
480			tracing::debug!(
481				target: LOG_TARGET,
482				block_hash = ?hash,
483				"Parachain block not yet imported, waiting for import to enact as best block.",
484			);
485
486			if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
487				// Best effort channel to actively encourage block recovery.
488				// An error here is not fatal; the relay chain continuously re-announces
489				// the best block, thus we will have other opportunities to retry.
490				let req = RecoveryRequest { hash, kind: RecoveryKind::Full };
491				if let Err(err) = recovery_chan_tx.try_send(req) {
492					tracing::warn!(
493						target: LOG_TARGET,
494						block_hash = ?hash,
495						error = ?err,
496						"Unable to notify block recovery subsystem"
497					)
498				}
499			}
500		},
501		Err(e) => {
502			tracing::error!(
503				target: LOG_TARGET,
504				block_hash = ?hash,
505				error = ?e,
506				"Failed to get block status of block.",
507			);
508		},
509		_ => {},
510	}
511}
512
513async fn import_block_as_new_best<Block, P>(hash: Block::Hash, header: Block::Header, parachain: &P)
514where
515	Block: BlockT,
516	P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
517	for<'a> &'a P: BlockImport<Block>,
518{
519	let best_number = parachain.usage_info().chain.best_number;
520	if *header.number() < best_number {
521		tracing::debug!(
522			target: LOG_TARGET,
523			%best_number,
524			block_number = %header.number(),
525			"Skipping importing block as new best block, because there already exists a \
526			 best block with an higher number",
527		);
528		return
529	}
530
531	// Make it the new best block
532	let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header);
533	block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
534	block_import_params.import_existing = true;
535
536	if let Err(err) = parachain.import_block(block_import_params).await {
537		tracing::warn!(
538			target: LOG_TARGET,
539			block_hash = ?hash,
540			error = ?err,
541			"Failed to set new best block.",
542		);
543	}
544}