1use cumulus_relay_chain_streams::{finalized_heads, new_best_heads};
19use sc_client_api::{
20 Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
21};
22use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
23use schnellru::{ByLength, LruMap};
24use sp_blockchain::Error as ClientError;
25use sp_consensus::{BlockOrigin, BlockStatus};
26use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
27
28use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest};
29use cumulus_relay_chain_interface::RelayChainInterface;
30
31use polkadot_primitives::Id as ParaId;
32
33use codec::Decode;
34use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, StreamExt};
35
36use std::sync::Arc;
37
38const LOG_TARGET: &str = "cumulus-consensus";
39const FINALIZATION_CACHE_SIZE: u32 = 40;
40
41fn handle_new_finalized_head<P, Block, B>(
42 parachain: &Arc<P>,
43 finalized_head: Vec<u8>,
44 last_seen_finalized_hashes: &mut LruMap<Block::Hash, ()>,
45) where
46 Block: BlockT,
47 B: Backend<Block>,
48 P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
49{
50 let header = match Block::Header::decode(&mut &finalized_head[..]) {
51 Ok(header) => header,
52 Err(err) => {
53 tracing::debug!(
54 target: LOG_TARGET,
55 error = ?err,
56 "Could not decode parachain header while following finalized heads.",
57 );
58 return
59 },
60 };
61
62 let hash = header.hash();
63
64 last_seen_finalized_hashes.insert(hash, ());
65
66 if parachain.usage_info().chain.finalized_number < *header.number() {
68 tracing::debug!(
69 target: LOG_TARGET,
70 block_hash = ?hash,
71 "Attempting to finalize header.",
72 );
73 if let Err(e) = parachain.finalize_block(hash, None, true) {
74 match e {
75 ClientError::UnknownBlock(_) => tracing::debug!(
76 target: LOG_TARGET,
77 block_hash = ?hash,
78 "Could not finalize block because it is unknown.",
79 ),
80 _ => tracing::warn!(
81 target: LOG_TARGET,
82 error = ?e,
83 block_hash = ?hash,
84 "Failed to finalize block",
85 ),
86 }
87 }
88 }
89}
90
91async fn follow_finalized_head<P, Block, B, R>(para_id: ParaId, parachain: Arc<P>, relay_chain: R)
96where
97 Block: BlockT,
98 P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
99 R: RelayChainInterface + Clone,
100 B: Backend<Block>,
101{
102 let finalized_heads = match finalized_heads(relay_chain, para_id).await {
103 Ok(finalized_heads_stream) => finalized_heads_stream.fuse(),
104 Err(err) => {
105 tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
106 return
107 },
108 };
109
110 let mut imported_blocks = parachain.import_notification_stream().fuse();
111
112 pin_mut!(finalized_heads);
113
114 let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE));
119
120 loop {
121 select! {
122 fin = finalized_heads.next() => {
123 match fin {
124 Some((finalized_head, _)) =>
125 handle_new_finalized_head(¶chain, finalized_head, &mut last_seen_finalized_hashes),
126 None => {
127 tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
128 return
129 }
130 }
131 },
132 imported = imported_blocks.next() => {
133 match imported {
134 Some(imported_block) => {
135 if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() {
137 tracing::debug!(
138 target: LOG_TARGET,
139 block_hash = ?imported_block.hash,
140 "Setting newly imported block as finalized.",
141 );
142
143 if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) {
144 match e {
145 ClientError::UnknownBlock(_) => tracing::debug!(
146 target: LOG_TARGET,
147 block_hash = ?imported_block.hash,
148 "Could not finalize block because it is unknown.",
149 ),
150 _ => tracing::warn!(
151 target: LOG_TARGET,
152 error = ?e,
153 block_hash = ?imported_block.hash,
154 "Failed to finalize block",
155 ),
156 }
157 }
158 }
159 },
160 None => {
161 tracing::debug!(
162 target: LOG_TARGET,
163 "Stopping following imported blocks.",
164 );
165 return
166 }
167 }
168 }
169 }
170 }
171}
172
173pub async fn run_parachain_consensus<P, R, Block, B>(
184 para_id: ParaId,
185 parachain: Arc<P>,
186 relay_chain: R,
187 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
188 recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
189) where
190 Block: BlockT,
191 P: Finalizer<Block, B>
192 + UsageProvider<Block>
193 + Send
194 + Sync
195 + BlockBackend<Block>
196 + BlockchainEvents<Block>,
197 for<'a> &'a P: BlockImport<Block>,
198 R: RelayChainInterface + Clone,
199 B: Backend<Block>,
200{
201 let follow_new_best = follow_new_best(
202 para_id,
203 parachain.clone(),
204 relay_chain.clone(),
205 announce_block,
206 recovery_chan_tx,
207 );
208 let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
209 select! {
210 _ = follow_new_best.fuse() => {},
211 _ = follow_finalized_head.fuse() => {},
212 }
213}
214
215async fn follow_new_best<P, R, Block, B>(
217 para_id: ParaId,
218 parachain: Arc<P>,
219 relay_chain: R,
220 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
221 mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
222) where
223 Block: BlockT,
224 P: Finalizer<Block, B>
225 + UsageProvider<Block>
226 + Send
227 + Sync
228 + BlockBackend<Block>
229 + BlockchainEvents<Block>,
230 for<'a> &'a P: BlockImport<Block>,
231 R: RelayChainInterface + Clone,
232 B: Backend<Block>,
233{
234 let new_best_heads = match new_best_heads(relay_chain, para_id).await {
235 Ok(best_heads_stream) => best_heads_stream.fuse(),
236 Err(err) => {
237 tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
238 return
239 },
240 };
241
242 pin_mut!(new_best_heads);
243
244 let mut imported_blocks = parachain.import_notification_stream().fuse();
245 let mut unset_best_header = None;
249
250 loop {
251 select! {
252 h = new_best_heads.next() => {
253 match h {
254 Some(h) => handle_new_best_parachain_head(
255 h,
256 &*parachain,
257 &mut unset_best_header,
258 recovery_chan_tx.as_mut(),
259 ).await,
260 None => {
261 tracing::debug!(
262 target: LOG_TARGET,
263 "Stopping following new best.",
264 );
265 return
266 }
267 }
268 },
269 i = imported_blocks.next() => {
270 match i {
271 Some(i) => handle_new_block_imported(
272 i,
273 &mut unset_best_header,
274 &*parachain,
275 &*announce_block,
276 ).await,
277 None => {
278 tracing::debug!(
279 target: LOG_TARGET,
280 "Stopping following imported blocks.",
281 );
282 return
283 }
284 }
285 },
286 }
287 }
288}
289
290async fn handle_new_block_imported<Block, P>(
292 notification: BlockImportNotification<Block>,
293 unset_best_header_opt: &mut Option<Block::Header>,
294 parachain: &P,
295 announce_block: &(dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync),
296) where
297 Block: BlockT,
298 P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
299 for<'a> &'a P: BlockImport<Block>,
300{
301 if notification.origin != BlockOrigin::Own {
305 announce_block(notification.hash, None);
306 }
307
308 let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) {
309 (true, _) | (_, None) => return,
311 (false, Some(ref u)) => u,
312 };
313
314 let unset_hash = if notification.header.number() < unset_best_header.number() {
315 return
316 } else if notification.header.number() == unset_best_header.number() {
317 let unset_hash = unset_best_header.hash();
318
319 if unset_hash != notification.hash {
320 return
321 } else {
322 unset_hash
323 }
324 } else {
325 unset_best_header.hash()
326 };
327
328 match parachain.block_status(unset_hash) {
329 Ok(BlockStatus::InChainWithState) => {
330 let unset_best_header = unset_best_header_opt
331 .take()
332 .expect("We checked above that the value is set; qed");
333 tracing::debug!(
334 target: LOG_TARGET,
335 ?unset_hash,
336 "Importing block as new best for parachain.",
337 );
338 import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
339 },
340 state => tracing::debug!(
341 target: LOG_TARGET,
342 ?unset_best_header,
343 ?notification.header,
344 ?state,
345 "Unexpected state for unset best header.",
346 ),
347 }
348}
349
350async fn handle_new_best_parachain_head<Block, P>(
352 head: Vec<u8>,
353 parachain: &P,
354 unset_best_header: &mut Option<Block::Header>,
355 mut recovery_chan_tx: Option<&mut Sender<RecoveryRequest<Block>>>,
356) where
357 Block: BlockT,
358 P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
359 for<'a> &'a P: BlockImport<Block>,
360{
361 let parachain_head = match <<Block as BlockT>::Header>::decode(&mut &head[..]) {
362 Ok(header) => header,
363 Err(err) => {
364 tracing::debug!(
365 target: LOG_TARGET,
366 error = ?err,
367 "Could not decode Parachain header while following best heads.",
368 );
369 return
370 },
371 };
372
373 let hash = parachain_head.hash();
374
375 if parachain.usage_info().chain.best_hash == hash {
376 tracing::debug!(
377 target: LOG_TARGET,
378 block_hash = ?hash,
379 "Skipping set new best block, because block is already the best.",
380 );
381 return;
382 }
383
384 match parachain.block_status(hash) {
386 Ok(BlockStatus::InChainWithState) => {
387 unset_best_header.take();
388 tracing::debug!(
389 target: LOG_TARGET,
390 included = ?hash,
391 "Importing block as new best for parachain.",
392 );
393 import_block_as_new_best(hash, parachain_head, parachain).await;
394 },
395 Ok(BlockStatus::InChainPruned) => {
396 tracing::error!(
397 target: LOG_TARGET,
398 block_hash = ?hash,
399 "Trying to set pruned block as new best!",
400 );
401 },
402 Ok(BlockStatus::Unknown) => {
403 *unset_best_header = Some(parachain_head);
404
405 tracing::debug!(
406 target: LOG_TARGET,
407 block_hash = ?hash,
408 "Parachain block not yet imported, waiting for import to enact as best block.",
409 );
410
411 if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
412 let req = RecoveryRequest { hash, kind: RecoveryKind::Full };
416 if let Err(err) = recovery_chan_tx.try_send(req) {
417 tracing::warn!(
418 target: LOG_TARGET,
419 block_hash = ?hash,
420 error = ?err,
421 "Unable to notify block recovery subsystem"
422 )
423 }
424 }
425 },
426 Err(e) => {
427 tracing::error!(
428 target: LOG_TARGET,
429 block_hash = ?hash,
430 error = ?e,
431 "Failed to get block status of block.",
432 );
433 },
434 _ => {},
435 }
436}
437
438async fn import_block_as_new_best<Block, P>(hash: Block::Hash, header: Block::Header, parachain: &P)
439where
440 Block: BlockT,
441 P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
442 for<'a> &'a P: BlockImport<Block>,
443{
444 let best_number = parachain.usage_info().chain.best_number;
445 if *header.number() < best_number {
446 tracing::debug!(
447 target: LOG_TARGET,
448 %best_number,
449 block_number = %header.number(),
450 "Skipping importing block as new best block, because there already exists a \
451 best block with an higher number",
452 );
453 return
454 }
455
456 let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header);
458 block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
459 block_import_params.import_existing = true;
460
461 if let Err(err) = parachain.import_block(block_import_params).await {
462 tracing::warn!(
463 target: LOG_TARGET,
464 block_hash = ?hash,
465 error = ?err,
466 "Failed to set new best block.",
467 );
468 }
469}