1use cumulus_relay_chain_streams::{finalized_heads, new_best_heads};
19use sc_client_api::{
20 Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
21};
22use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
23use schnellru::{ByLength, LruMap};
24use sp_blockchain::Error as ClientError;
25use sp_consensus::{BlockOrigin, BlockStatus};
26use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
27
28use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest};
29use cumulus_relay_chain_interface::RelayChainInterface;
30
31use polkadot_primitives::Id as ParaId;
32
33use codec::Decode;
34use futures::{
35 channel::mpsc::{Sender, UnboundedSender},
36 pin_mut, select, FutureExt, SinkExt, Stream, StreamExt,
37};
38use sp_core::traits::SpawnEssentialNamed;
39
40use std::sync::Arc;
41
42const LOG_TARGET: &str = "cumulus-consensus";
43const FINALIZATION_CACHE_SIZE: u32 = 40;
44
45fn handle_new_finalized_head<P, Block, B>(
46 parachain: &Arc<P>,
47 header: Block::Header,
48 last_seen_finalized_hashes: &mut LruMap<Block::Hash, ()>,
49) where
50 Block: BlockT,
51 B: Backend<Block>,
52 P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
53{
54 let hash = header.hash();
55
56 last_seen_finalized_hashes.insert(hash, ());
57
58 if parachain.usage_info().chain.finalized_number < *header.number() {
60 tracing::debug!(
61 target: LOG_TARGET,
62 block_hash = ?hash,
63 "Attempting to finalize header.",
64 );
65 if let Err(e) = parachain.finalize_block(hash, None, true) {
66 match e {
67 ClientError::UnknownBlock(_) => tracing::debug!(
68 target: LOG_TARGET,
69 block_hash = ?hash,
70 "Could not finalize block because it is unknown.",
71 ),
72 _ => tracing::warn!(
73 target: LOG_TARGET,
74 error = ?e,
75 block_hash = ?hash,
76 "Failed to finalize block",
77 ),
78 }
79 }
80 }
81}
82
83pub async fn finalized_head_stream_worker<R: RelayChainInterface + Clone, Block: BlockT>(
93 mut tx: UnboundedSender<Block::Header>,
94 para_id: ParaId,
95 relay_chain: R,
96) {
97 let finalized_heads = match finalized_heads(relay_chain.clone(), para_id).await {
98 Ok(finalized_heads_stream) => finalized_heads_stream.fuse(),
99 Err(err) => {
100 tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
101 return
102 },
103 };
104
105 pin_mut!(finalized_heads);
106 loop {
107 if let Some((head_data, _)) = finalized_heads.next().await {
108 let header = match Block::Header::decode(&mut &head_data[..]) {
109 Ok(header) => header,
110 Err(err) => {
111 tracing::debug!(
112 target: LOG_TARGET,
113 error = ?err,
114 "Could not decode parachain header while following finalized heads.",
115 );
116 continue
117 },
118 };
119 if let Err(e) = tx.send(header).await {
120 tracing::error!(target: LOG_TARGET, ?e, "Error while sending finalized head.");
121 return;
122 };
123 }
124 }
125}
126
127async fn follow_finalized_head<P, Block, B>(
132 parachain: Arc<P>,
133 finalized_head_stream: Box<impl Stream<Item = Block::Header> + Unpin + Send>,
134) where
135 Block: BlockT,
136 P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
137 B: Backend<Block>,
138{
139 let mut imported_blocks = parachain.import_notification_stream().fuse();
140 let mut finalized_head_stream = finalized_head_stream.fuse();
141
142 let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE));
147
148 loop {
149 select! {
150 fin = finalized_head_stream.next() => {
151 match fin {
152 Some(finalized_head) =>
153 handle_new_finalized_head(¶chain, finalized_head, &mut last_seen_finalized_hashes),
154 None => {
155 tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
156 return
157 }
158 }
159 },
160 imported = imported_blocks.next() => {
161 match imported {
162 Some(imported_block) => {
163 if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() {
165 tracing::debug!(
166 target: LOG_TARGET,
167 block_hash = ?imported_block.hash,
168 "Setting newly imported block as finalized.",
169 );
170
171 if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) {
172 match e {
173 ClientError::UnknownBlock(_) => tracing::debug!(
174 target: LOG_TARGET,
175 block_hash = ?imported_block.hash,
176 "Could not finalize block because it is unknown.",
177 ),
178 _ => tracing::warn!(
179 target: LOG_TARGET,
180 error = ?e,
181 block_hash = ?imported_block.hash,
182 "Failed to finalize block",
183 ),
184 }
185 }
186 }
187 },
188 None => {
189 tracing::debug!(
190 target: LOG_TARGET,
191 "Stopping following imported blocks.",
192 );
193 return
194 }
195 }
196 }
197 }
198 }
199}
200
201pub fn spawn_parachain_consensus_tasks<P, R, Block, B, S>(
208 para_id: ParaId,
209 parachain: Arc<P>,
210 relay_chain: R,
211 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
212 recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
213 spawn_handle: S,
214) where
215 Block: BlockT,
216 P: Finalizer<Block, B>
217 + UsageProvider<Block>
218 + Send
219 + Sync
220 + BlockBackend<Block>
221 + BlockchainEvents<Block>
222 + 'static,
223 for<'a> &'a P: BlockImport<Block>,
224 R: RelayChainInterface + Clone + 'static,
225 S: SpawnEssentialNamed + 'static,
226 B: Backend<Block> + 'static,
227{
228 let (tx, rx) = futures::channel::mpsc::unbounded();
229 let worker = finalized_head_stream_worker::<_, Block>(tx, para_id, relay_chain.clone());
230 let consensus = run_parachain_consensus(
231 para_id,
232 parachain,
233 relay_chain,
234 announce_block,
235 Box::new(rx),
236 recovery_chan_tx,
237 );
238
239 spawn_handle.spawn_essential_blocking("cumulus-consensus", None, Box::pin(consensus));
240 spawn_handle.spawn_essential_blocking(
241 "cumulus-consensus-finality-stream",
242 None,
243 Box::pin(worker),
244 );
245}
246
247pub async fn run_parachain_consensus<P, R, Block, B>(
258 para_id: ParaId,
259 parachain: Arc<P>,
260 relay_chain: R,
261 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
262 finalized_head_stream: Box<impl Stream<Item = Block::Header> + Unpin + Send>,
263 recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
264) where
265 Block: BlockT,
266 P: Finalizer<Block, B>
267 + UsageProvider<Block>
268 + Send
269 + Sync
270 + BlockBackend<Block>
271 + BlockchainEvents<Block>,
272 for<'a> &'a P: BlockImport<Block>,
273 R: RelayChainInterface + Clone,
274 B: Backend<Block>,
275{
276 let follow_new_best = follow_new_best(
277 para_id,
278 parachain.clone(),
279 relay_chain.clone(),
280 announce_block,
281 recovery_chan_tx,
282 );
283 let follow_finalized_head = follow_finalized_head(parachain, finalized_head_stream);
284 select! {
285 _ = follow_new_best.fuse() => {},
286 _ = follow_finalized_head.fuse() => {},
287 }
288}
289
290async fn follow_new_best<P, R, Block, B>(
292 para_id: ParaId,
293 parachain: Arc<P>,
294 relay_chain: R,
295 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
296 mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
297) where
298 Block: BlockT,
299 P: Finalizer<Block, B>
300 + UsageProvider<Block>
301 + Send
302 + Sync
303 + BlockBackend<Block>
304 + BlockchainEvents<Block>,
305 for<'a> &'a P: BlockImport<Block>,
306 R: RelayChainInterface + Clone,
307 B: Backend<Block>,
308{
309 let new_best_heads = match new_best_heads(relay_chain, para_id).await {
310 Ok(best_heads_stream) => best_heads_stream.fuse(),
311 Err(err) => {
312 tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
313 return
314 },
315 };
316
317 pin_mut!(new_best_heads);
318
319 let mut imported_blocks = parachain.import_notification_stream().fuse();
320 let mut unset_best_header = None;
324
325 loop {
326 select! {
327 h = new_best_heads.next() => {
328 match h {
329 Some(h) => handle_new_best_parachain_head(
330 h,
331 &*parachain,
332 &mut unset_best_header,
333 recovery_chan_tx.as_mut(),
334 ).await,
335 None => {
336 tracing::debug!(
337 target: LOG_TARGET,
338 "Stopping following new best.",
339 );
340 return
341 }
342 }
343 },
344 i = imported_blocks.next() => {
345 match i {
346 Some(i) => handle_new_block_imported(
347 i,
348 &mut unset_best_header,
349 &*parachain,
350 &*announce_block,
351 ).await,
352 None => {
353 tracing::debug!(
354 target: LOG_TARGET,
355 "Stopping following imported blocks.",
356 );
357 return
358 }
359 }
360 },
361 }
362 }
363}
364
365async fn handle_new_block_imported<Block, P>(
367 notification: BlockImportNotification<Block>,
368 unset_best_header_opt: &mut Option<Block::Header>,
369 parachain: &P,
370 announce_block: &(dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync),
371) where
372 Block: BlockT,
373 P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
374 for<'a> &'a P: BlockImport<Block>,
375{
376 if notification.origin != BlockOrigin::Own {
380 announce_block(notification.hash, None);
381 }
382
383 let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) {
384 (true, _) | (_, None) => return,
386 (false, Some(ref u)) => u,
387 };
388
389 let unset_hash = if notification.header.number() < unset_best_header.number() {
390 return
391 } else if notification.header.number() == unset_best_header.number() {
392 let unset_hash = unset_best_header.hash();
393
394 if unset_hash != notification.hash {
395 return
396 } else {
397 unset_hash
398 }
399 } else {
400 unset_best_header.hash()
401 };
402
403 match parachain.block_status(unset_hash) {
404 Ok(BlockStatus::InChainWithState) => {
405 let unset_best_header = unset_best_header_opt
406 .take()
407 .expect("We checked above that the value is set; qed");
408 tracing::debug!(
409 target: LOG_TARGET,
410 ?unset_hash,
411 "Importing block as new best for parachain.",
412 );
413 import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
414 },
415 state => tracing::debug!(
416 target: LOG_TARGET,
417 ?unset_best_header,
418 ?notification.header,
419 ?state,
420 "Unexpected state for unset best header.",
421 ),
422 }
423}
424
425async fn handle_new_best_parachain_head<Block, P>(
427 head: Vec<u8>,
428 parachain: &P,
429 unset_best_header: &mut Option<Block::Header>,
430 mut recovery_chan_tx: Option<&mut Sender<RecoveryRequest<Block>>>,
431) where
432 Block: BlockT,
433 P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
434 for<'a> &'a P: BlockImport<Block>,
435{
436 let parachain_head = match <<Block as BlockT>::Header>::decode(&mut &head[..]) {
437 Ok(header) => header,
438 Err(err) => {
439 tracing::debug!(
440 target: LOG_TARGET,
441 error = ?err,
442 "Could not decode Parachain header while following best heads.",
443 );
444 return
445 },
446 };
447
448 let hash = parachain_head.hash();
449
450 if parachain.usage_info().chain.best_hash == hash {
451 tracing::debug!(
452 target: LOG_TARGET,
453 block_hash = ?hash,
454 "Skipping set new best block, because block is already the best.",
455 );
456 return;
457 }
458
459 match parachain.block_status(hash) {
461 Ok(BlockStatus::InChainWithState) => {
462 unset_best_header.take();
463 tracing::debug!(
464 target: LOG_TARGET,
465 included = ?hash,
466 "Importing block as new best for parachain.",
467 );
468 import_block_as_new_best(hash, parachain_head, parachain).await;
469 },
470 Ok(BlockStatus::InChainPruned) => {
471 tracing::error!(
472 target: LOG_TARGET,
473 block_hash = ?hash,
474 "Trying to set pruned block as new best!",
475 );
476 },
477 Ok(BlockStatus::Unknown) => {
478 *unset_best_header = Some(parachain_head);
479
480 tracing::debug!(
481 target: LOG_TARGET,
482 block_hash = ?hash,
483 "Parachain block not yet imported, waiting for import to enact as best block.",
484 );
485
486 if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
487 let req = RecoveryRequest { hash, kind: RecoveryKind::Full };
491 if let Err(err) = recovery_chan_tx.try_send(req) {
492 tracing::warn!(
493 target: LOG_TARGET,
494 block_hash = ?hash,
495 error = ?err,
496 "Unable to notify block recovery subsystem"
497 )
498 }
499 }
500 },
501 Err(e) => {
502 tracing::error!(
503 target: LOG_TARGET,
504 block_hash = ?hash,
505 error = ?e,
506 "Failed to get block status of block.",
507 );
508 },
509 _ => {},
510 }
511}
512
513async fn import_block_as_new_best<Block, P>(hash: Block::Hash, header: Block::Header, parachain: &P)
514where
515 Block: BlockT,
516 P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
517 for<'a> &'a P: BlockImport<Block>,
518{
519 let best_number = parachain.usage_info().chain.best_number;
520 if *header.number() < best_number {
521 tracing::debug!(
522 target: LOG_TARGET,
523 %best_number,
524 block_number = %header.number(),
525 "Skipping importing block as new best block, because there already exists a \
526 best block with an higher number",
527 );
528 return
529 }
530
531 let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header);
533 block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
534 block_import_params.import_existing = true;
535
536 if let Err(err) = parachain.import_block(block_import_params).await {
537 tracing::warn!(
538 target: LOG_TARGET,
539 block_hash = ?hash,
540 error = ?err,
541 "Failed to set new best block.",
542 );
543 }
544}