referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_common/
parachain_consensus.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use cumulus_relay_chain_streams::{finalized_heads, new_best_heads};
19use sc_client_api::{
20	Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
21};
22use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
23use schnellru::{ByLength, LruMap};
24use sp_blockchain::Error as ClientError;
25use sp_consensus::{BlockOrigin, BlockStatus};
26use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
27
28use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest};
29use cumulus_relay_chain_interface::RelayChainInterface;
30
31use polkadot_primitives::Id as ParaId;
32
33use codec::Decode;
34use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, StreamExt};
35
36use std::sync::Arc;
37
38const LOG_TARGET: &str = "cumulus-consensus";
39const FINALIZATION_CACHE_SIZE: u32 = 40;
40
41fn handle_new_finalized_head<P, Block, B>(
42	parachain: &Arc<P>,
43	finalized_head: Vec<u8>,
44	last_seen_finalized_hashes: &mut LruMap<Block::Hash, ()>,
45) where
46	Block: BlockT,
47	B: Backend<Block>,
48	P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
49{
50	let header = match Block::Header::decode(&mut &finalized_head[..]) {
51		Ok(header) => header,
52		Err(err) => {
53			tracing::debug!(
54				target: LOG_TARGET,
55				error = ?err,
56				"Could not decode parachain header while following finalized heads.",
57			);
58			return
59		},
60	};
61
62	let hash = header.hash();
63
64	last_seen_finalized_hashes.insert(hash, ());
65
66	// Only finalize if we are below the incoming finalized parachain head
67	if parachain.usage_info().chain.finalized_number < *header.number() {
68		tracing::debug!(
69			target: LOG_TARGET,
70			block_hash = ?hash,
71			"Attempting to finalize header.",
72		);
73		if let Err(e) = parachain.finalize_block(hash, None, true) {
74			match e {
75				ClientError::UnknownBlock(_) => tracing::debug!(
76					target: LOG_TARGET,
77					block_hash = ?hash,
78					"Could not finalize block because it is unknown.",
79				),
80				_ => tracing::warn!(
81					target: LOG_TARGET,
82					error = ?e,
83					block_hash = ?hash,
84					"Failed to finalize block",
85				),
86			}
87		}
88	}
89}
90
91/// Follow the finalized head of the given parachain.
92///
93/// For every finalized block of the relay chain, it will get the included parachain header
94/// corresponding to `para_id` and will finalize it in the parachain.
95async fn follow_finalized_head<P, Block, B, R>(para_id: ParaId, parachain: Arc<P>, relay_chain: R)
96where
97	Block: BlockT,
98	P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
99	R: RelayChainInterface + Clone,
100	B: Backend<Block>,
101{
102	let finalized_heads = match finalized_heads(relay_chain, para_id).await {
103		Ok(finalized_heads_stream) => finalized_heads_stream.fuse(),
104		Err(err) => {
105			tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
106			return
107		},
108	};
109
110	let mut imported_blocks = parachain.import_notification_stream().fuse();
111
112	pin_mut!(finalized_heads);
113
114	// We use this cache to finalize blocks that are imported late.
115	// For example, a block that has been recovered via PoV-Recovery
116	// on a full node can have several minutes delay. With this cache
117	// we have some "memory" of recently finalized blocks.
118	let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE));
119
120	loop {
121		select! {
122			fin = finalized_heads.next() => {
123				match fin {
124					Some((finalized_head, _)) =>
125						handle_new_finalized_head(&parachain, finalized_head, &mut last_seen_finalized_hashes),
126					None => {
127						tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
128						return
129					}
130				}
131			},
132			imported = imported_blocks.next() => {
133				match imported {
134					Some(imported_block) => {
135						// When we see a block import that is already finalized, we immediately finalize it.
136						if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() {
137							tracing::debug!(
138								target: LOG_TARGET,
139								block_hash = ?imported_block.hash,
140								"Setting newly imported block as finalized.",
141							);
142
143							if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) {
144								match e {
145									ClientError::UnknownBlock(_) => tracing::debug!(
146										target: LOG_TARGET,
147										block_hash = ?imported_block.hash,
148										"Could not finalize block because it is unknown.",
149									),
150									_ => tracing::warn!(
151										target: LOG_TARGET,
152										error = ?e,
153										block_hash = ?imported_block.hash,
154										"Failed to finalize block",
155									),
156								}
157							}
158						}
159					},
160					None => {
161						tracing::debug!(
162							target: LOG_TARGET,
163							"Stopping following imported blocks.",
164						);
165						return
166					}
167				}
168			}
169		}
170	}
171}
172
173/// Run the parachain consensus.
174///
175/// This will follow the given `relay_chain` to act as consensus for the parachain that corresponds
176/// to the given `para_id`. It will set the new best block of the parachain as it gets aware of it.
177/// The same happens for the finalized block.
178///
179/// # Note
180///
181/// This will access the backend of the parachain and thus, this future should be spawned as
182/// blocking task.
183pub async fn run_parachain_consensus<P, R, Block, B>(
184	para_id: ParaId,
185	parachain: Arc<P>,
186	relay_chain: R,
187	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
188	recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
189) where
190	Block: BlockT,
191	P: Finalizer<Block, B>
192		+ UsageProvider<Block>
193		+ Send
194		+ Sync
195		+ BlockBackend<Block>
196		+ BlockchainEvents<Block>,
197	for<'a> &'a P: BlockImport<Block>,
198	R: RelayChainInterface + Clone,
199	B: Backend<Block>,
200{
201	let follow_new_best = follow_new_best(
202		para_id,
203		parachain.clone(),
204		relay_chain.clone(),
205		announce_block,
206		recovery_chan_tx,
207	);
208	let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
209	select! {
210		_ = follow_new_best.fuse() => {},
211		_ = follow_finalized_head.fuse() => {},
212	}
213}
214
215/// Follow the relay chain new best head, to update the Parachain new best head.
216async fn follow_new_best<P, R, Block, B>(
217	para_id: ParaId,
218	parachain: Arc<P>,
219	relay_chain: R,
220	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
221	mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
222) where
223	Block: BlockT,
224	P: Finalizer<Block, B>
225		+ UsageProvider<Block>
226		+ Send
227		+ Sync
228		+ BlockBackend<Block>
229		+ BlockchainEvents<Block>,
230	for<'a> &'a P: BlockImport<Block>,
231	R: RelayChainInterface + Clone,
232	B: Backend<Block>,
233{
234	let new_best_heads = match new_best_heads(relay_chain, para_id).await {
235		Ok(best_heads_stream) => best_heads_stream.fuse(),
236		Err(err) => {
237			tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
238			return
239		},
240	};
241
242	pin_mut!(new_best_heads);
243
244	let mut imported_blocks = parachain.import_notification_stream().fuse();
245	// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
246	// block before the associated parachain block. In this case we need to wait for this block to
247	// be imported to set it as new best.
248	let mut unset_best_header = None;
249
250	loop {
251		select! {
252			h = new_best_heads.next() => {
253				match h {
254					Some(h) => handle_new_best_parachain_head(
255						h,
256						&*parachain,
257						&mut unset_best_header,
258						recovery_chan_tx.as_mut(),
259					).await,
260					None => {
261						tracing::debug!(
262							target: LOG_TARGET,
263							"Stopping following new best.",
264						);
265						return
266					}
267				}
268			},
269			i = imported_blocks.next() => {
270				match i {
271					Some(i) => handle_new_block_imported(
272						i,
273						&mut unset_best_header,
274						&*parachain,
275						&*announce_block,
276					).await,
277					None => {
278						tracing::debug!(
279							target: LOG_TARGET,
280							"Stopping following imported blocks.",
281						);
282						return
283					}
284				}
285			},
286		}
287	}
288}
289
290/// Handle a new import block of the parachain.
291async fn handle_new_block_imported<Block, P>(
292	notification: BlockImportNotification<Block>,
293	unset_best_header_opt: &mut Option<Block::Header>,
294	parachain: &P,
295	announce_block: &(dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync),
296) where
297	Block: BlockT,
298	P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
299	for<'a> &'a P: BlockImport<Block>,
300{
301	// HACK
302	//
303	// Remove after https://github.com/paritytech/substrate/pull/8052 or similar is merged
304	if notification.origin != BlockOrigin::Own {
305		announce_block(notification.hash, None);
306	}
307
308	let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) {
309		// If this is the new best block or we don't have any unset block, we can end it here.
310		(true, _) | (_, None) => return,
311		(false, Some(ref u)) => u,
312	};
313
314	let unset_hash = if notification.header.number() < unset_best_header.number() {
315		return
316	} else if notification.header.number() == unset_best_header.number() {
317		let unset_hash = unset_best_header.hash();
318
319		if unset_hash != notification.hash {
320			return
321		} else {
322			unset_hash
323		}
324	} else {
325		unset_best_header.hash()
326	};
327
328	match parachain.block_status(unset_hash) {
329		Ok(BlockStatus::InChainWithState) => {
330			let unset_best_header = unset_best_header_opt
331				.take()
332				.expect("We checked above that the value is set; qed");
333			tracing::debug!(
334				target: LOG_TARGET,
335				?unset_hash,
336				"Importing block as new best for parachain.",
337			);
338			import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
339		},
340		state => tracing::debug!(
341			target: LOG_TARGET,
342			?unset_best_header,
343			?notification.header,
344			?state,
345			"Unexpected state for unset best header.",
346		),
347	}
348}
349
350/// Handle the new best parachain head as extracted from the new best relay chain.
351async fn handle_new_best_parachain_head<Block, P>(
352	head: Vec<u8>,
353	parachain: &P,
354	unset_best_header: &mut Option<Block::Header>,
355	mut recovery_chan_tx: Option<&mut Sender<RecoveryRequest<Block>>>,
356) where
357	Block: BlockT,
358	P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
359	for<'a> &'a P: BlockImport<Block>,
360{
361	let parachain_head = match <<Block as BlockT>::Header>::decode(&mut &head[..]) {
362		Ok(header) => header,
363		Err(err) => {
364			tracing::debug!(
365				target: LOG_TARGET,
366				error = ?err,
367				"Could not decode Parachain header while following best heads.",
368			);
369			return
370		},
371	};
372
373	let hash = parachain_head.hash();
374
375	if parachain.usage_info().chain.best_hash == hash {
376		tracing::debug!(
377			target: LOG_TARGET,
378			block_hash = ?hash,
379			"Skipping set new best block, because block is already the best.",
380		);
381		return;
382	}
383
384	// Make sure the block is already known or otherwise we skip setting new best.
385	match parachain.block_status(hash) {
386		Ok(BlockStatus::InChainWithState) => {
387			unset_best_header.take();
388			tracing::debug!(
389				target: LOG_TARGET,
390				included = ?hash,
391				"Importing block as new best for parachain.",
392			);
393			import_block_as_new_best(hash, parachain_head, parachain).await;
394		},
395		Ok(BlockStatus::InChainPruned) => {
396			tracing::error!(
397				target: LOG_TARGET,
398				block_hash = ?hash,
399				"Trying to set pruned block as new best!",
400			);
401		},
402		Ok(BlockStatus::Unknown) => {
403			*unset_best_header = Some(parachain_head);
404
405			tracing::debug!(
406				target: LOG_TARGET,
407				block_hash = ?hash,
408				"Parachain block not yet imported, waiting for import to enact as best block.",
409			);
410
411			if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
412				// Best effort channel to actively encourage block recovery.
413				// An error here is not fatal; the relay chain continuously re-announces
414				// the best block, thus we will have other opportunities to retry.
415				let req = RecoveryRequest { hash, kind: RecoveryKind::Full };
416				if let Err(err) = recovery_chan_tx.try_send(req) {
417					tracing::warn!(
418						target: LOG_TARGET,
419						block_hash = ?hash,
420						error = ?err,
421						"Unable to notify block recovery subsystem"
422					)
423				}
424			}
425		},
426		Err(e) => {
427			tracing::error!(
428				target: LOG_TARGET,
429				block_hash = ?hash,
430				error = ?e,
431				"Failed to get block status of block.",
432			);
433		},
434		_ => {},
435	}
436}
437
438async fn import_block_as_new_best<Block, P>(hash: Block::Hash, header: Block::Header, parachain: &P)
439where
440	Block: BlockT,
441	P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
442	for<'a> &'a P: BlockImport<Block>,
443{
444	let best_number = parachain.usage_info().chain.best_number;
445	if *header.number() < best_number {
446		tracing::debug!(
447			target: LOG_TARGET,
448			%best_number,
449			block_number = %header.number(),
450			"Skipping importing block as new best block, because there already exists a \
451			 best block with an higher number",
452		);
453		return
454	}
455
456	// Make it the new best block
457	let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header);
458	block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
459	block_import_params.import_existing = true;
460
461	if let Err(err) = parachain.import_block(block_import_params).await {
462		tracing::warn!(
463			target: LOG_TARGET,
464			block_hash = ?hash,
465			error = ?err,
466			"Failed to set new best block.",
467		);
468	}
469}