1use crate::collator::SlotClaim;
25use codec::Codec;
26use cumulus_client_consensus_common::{self as consensus_common, ParentSearchParams};
27use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
28use cumulus_primitives_core::{relay_chain::Header as RelayHeader, BlockT};
29use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
30use polkadot_node_subsystem::messages::{CollatorProtocolMessage, RuntimeApiRequest};
31use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot;
32use polkadot_primitives::{
33 Hash as RelayHash, Id as ParaId, OccupiedCoreAssumption, ValidationCodeHash,
34 DEFAULT_SCHEDULING_LOOKAHEAD,
35};
36use sc_consensus_aura::{standalone as aura_internal, AuraApi};
37use sp_api::{ApiExt, ProvideRuntimeApi, RuntimeApiInfo};
38use sp_core::Pair;
39use sp_keystore::KeystorePtr;
40use sp_timestamp::Timestamp;
41
42pub mod basic;
43pub mod lookahead;
44pub mod slot_based;
45
46const PARENT_SEARCH_DEPTH: usize = 40;
58
59struct BackingGroupConnectionHelper {
62 keystore: sp_keystore::KeystorePtr,
63 overseer_handle: OverseerHandle,
64 our_slot: Option<Slot>,
65}
66
67impl BackingGroupConnectionHelper {
68 pub fn new(keystore: sp_keystore::KeystorePtr, overseer_handle: OverseerHandle) -> Self {
69 Self { keystore, overseer_handle, our_slot: None }
70 }
71
72 async fn send_subsystem_message(&mut self, message: CollatorProtocolMessage) {
73 self.overseer_handle.send_msg(message, "BackingGroupConnectionHelper").await;
74 }
75
76 pub async fn update<P>(&mut self, current_slot: Slot, authorities: &[P::Public])
78 where
79 P: sp_core::Pair + Send + Sync,
80 P::Public: Codec,
81 {
82 if Some(current_slot) <= self.our_slot {
83 return
86 }
87
88 let next_slot = current_slot + 1;
89 let next_slot_is_ours =
90 aura_internal::claim_slot::<P>(next_slot, authorities, &self.keystore)
91 .await
92 .is_some();
93
94 if next_slot_is_ours {
95 if self.our_slot.is_none() {
98 tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is next, connecting to backing groups", next_slot);
100 self.send_subsystem_message(CollatorProtocolMessage::ConnectToBackingGroups)
101 .await;
102 }
103 self.our_slot = Some(next_slot);
104 } else if self.our_slot.take().is_some() {
105 tracing::debug!(target: crate::LOG_TARGET, "Current slot = {}, disconnecting from backing groups", current_slot);
107 self.send_subsystem_message(CollatorProtocolMessage::DisconnectFromBackingGroups)
108 .await;
109 }
110 }
111}
112
113async fn check_validation_code_or_log(
118 local_validation_code_hash: &ValidationCodeHash,
119 para_id: ParaId,
120 relay_client: &impl RelayChainInterface,
121 relay_parent: RelayHash,
122) {
123 let state_validation_code_hash = match relay_client
124 .validation_code_hash(relay_parent, para_id, OccupiedCoreAssumption::Included)
125 .await
126 {
127 Ok(hash) => hash,
128 Err(error) => {
129 tracing::debug!(
130 target: super::LOG_TARGET,
131 %error,
132 ?relay_parent,
133 %para_id,
134 "Failed to fetch validation code hash",
135 );
136 return
137 },
138 };
139
140 match state_validation_code_hash {
141 Some(state) =>
142 if state != *local_validation_code_hash {
143 tracing::warn!(
144 target: super::LOG_TARGET,
145 %para_id,
146 ?relay_parent,
147 ?local_validation_code_hash,
148 relay_validation_code_hash = ?state,
149 "Parachain code doesn't match validation code stored in the relay chain state.",
150 );
151 },
152 None => {
153 tracing::warn!(
154 target: super::LOG_TARGET,
155 %para_id,
156 ?relay_parent,
157 "Could not find validation code for parachain in the relay chain state.",
158 );
159 },
160 }
161}
162
163async fn scheduling_lookahead(
165 relay_parent: RelayHash,
166 relay_client: &impl RelayChainInterface,
167) -> Option<u32> {
168 let runtime_api_version = relay_client
169 .version(relay_parent)
170 .await
171 .map_err(|e| {
172 tracing::error!(
173 target: super::LOG_TARGET,
174 error = ?e,
175 "Failed to fetch relay chain runtime version.",
176 )
177 })
178 .ok()?;
179
180 let parachain_host_runtime_api_version = runtime_api_version
181 .api_version(
182 &<dyn polkadot_primitives::runtime_api::ParachainHost<polkadot_primitives::Block>>::ID,
183 )
184 .unwrap_or_default();
185
186 if parachain_host_runtime_api_version <
187 RuntimeApiRequest::SCHEDULING_LOOKAHEAD_RUNTIME_REQUIREMENT
188 {
189 return None
190 }
191
192 match relay_client.scheduling_lookahead(relay_parent).await {
193 Ok(scheduling_lookahead) => Some(scheduling_lookahead),
194 Err(err) => {
195 tracing::error!(
196 target: crate::LOG_TARGET,
197 ?err,
198 ?relay_parent,
199 "Failed to fetch scheduling lookahead from relay chain",
200 );
201 None
202 },
203 }
204}
205
206async fn claim_queue_at(
208 relay_parent: RelayHash,
209 relay_client: &impl RelayChainInterface,
210) -> ClaimQueueSnapshot {
211 match relay_client.claim_queue(relay_parent).await {
213 Ok(claim_queue) => claim_queue.into(),
214 Err(error) => {
215 tracing::error!(
216 target: crate::LOG_TARGET,
217 ?error,
218 ?relay_parent,
219 "Failed to query claim queue runtime API",
220 );
221 Default::default()
222 },
223 }
224}
225
226async fn can_build_upon<Block: BlockT, Client, P>(
229 para_slot: Slot,
230 relay_slot: Slot,
231 timestamp: Timestamp,
232 parent_hash: Block::Hash,
233 included_block: Block::Hash,
234 client: &Client,
235 keystore: &KeystorePtr,
236) -> Option<SlotClaim<P::Public>>
237where
238 Client: ProvideRuntimeApi<Block>,
239 Client::Api: AuraApi<Block, P::Public> + AuraUnincludedSegmentApi<Block> + ApiExt<Block>,
240 P: Pair,
241 P::Public: Codec,
242 P::Signature: Codec,
243{
244 let runtime_api = client.runtime_api();
245 let authorities = runtime_api.authorities(parent_hash).ok()?;
246 let author_pub = aura_internal::claim_slot::<P>(para_slot, &authorities, keystore).await?;
247
248 if parent_hash == included_block {
254 return Some(SlotClaim::unchecked::<P>(author_pub, para_slot, timestamp));
255 }
256
257 let api_version = runtime_api
258 .api_version::<dyn AuraUnincludedSegmentApi<Block>>(parent_hash)
259 .ok()
260 .flatten()?;
261
262 let slot = if api_version > 1 { relay_slot } else { para_slot };
263
264 runtime_api
265 .can_build_upon(parent_hash, included_block, slot)
266 .ok()?
267 .then(|| SlotClaim::unchecked::<P>(author_pub, para_slot, timestamp))
268}
269
270async fn find_parent<Block>(
274 relay_parent: RelayHash,
275 para_id: ParaId,
276 para_backend: &impl sc_client_api::Backend<Block>,
277 relay_client: &impl RelayChainInterface,
278) -> Option<(<Block as BlockT>::Header, consensus_common::PotentialParent<Block>)>
279where
280 Block: BlockT,
281{
282 let parent_search_params = ParentSearchParams {
283 relay_parent,
284 para_id,
285 ancestry_lookback: scheduling_lookahead(relay_parent, relay_client)
286 .await
287 .unwrap_or(DEFAULT_SCHEDULING_LOOKAHEAD)
288 .saturating_sub(1) as usize,
289 max_depth: PARENT_SEARCH_DEPTH,
290 ignore_alternative_branches: true,
291 };
292
293 let potential_parents = cumulus_client_consensus_common::find_potential_parents::<Block>(
294 parent_search_params,
295 para_backend,
296 relay_client,
297 )
298 .await;
299
300 let potential_parents = match potential_parents {
301 Err(e) => {
302 tracing::error!(
303 target: crate::LOG_TARGET,
304 ?relay_parent,
305 err = ?e,
306 "Could not fetch potential parents to build upon"
307 );
308
309 return None
310 },
311 Ok(x) => x,
312 };
313
314 let included_block = potential_parents.iter().find(|x| x.depth == 0)?.header.clone();
315 potential_parents
316 .into_iter()
317 .max_by_key(|a| a.depth)
318 .map(|parent| (included_block, parent))
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use crate::collators::{can_build_upon, BackingGroupConnectionHelper};
325 use codec::Encode;
326 use cumulus_primitives_aura::Slot;
327 use cumulus_primitives_core::BlockT;
328 use cumulus_relay_chain_interface::PHash;
329 use cumulus_test_client::{
330 runtime::{Block, Hash},
331 Client, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder,
332 TestClientBuilderExt,
333 };
334 use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
335 use futures::StreamExt;
336 use polkadot_overseer::{Event, Handle};
337 use polkadot_primitives::HeadData;
338 use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
339 use sp_consensus::BlockOrigin;
340 use sp_keystore::{Keystore, KeystorePtr};
341 use sp_timestamp::Timestamp;
342 use std::sync::{Arc, Mutex};
343
344 async fn import_block<I: BlockImport<Block>>(
345 importer: &I,
346 block: Block,
347 origin: BlockOrigin,
348 import_as_best: bool,
349 ) {
350 let (header, body) = block.deconstruct();
351
352 let mut block_import_params = BlockImportParams::new(origin, header);
353 block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
354 block_import_params.body = Some(body);
355 importer.import_block(block_import_params).await.unwrap();
356 }
357
358 fn sproof_with_parent_by_hash(client: &Client, hash: PHash) -> RelayStateSproofBuilder {
359 let header = client.header(hash).ok().flatten().expect("No header for parent block");
360 let included = HeadData(header.encode());
361 let mut builder = RelayStateSproofBuilder::default();
362 builder.para_id = cumulus_test_client::runtime::PARACHAIN_ID.into();
363 builder.included_para_head = Some(included);
364
365 builder
366 }
367 async fn build_and_import_block(client: &Client, included: Hash) -> Block {
368 let sproof = sproof_with_parent_by_hash(client, included);
369
370 let block_builder = client.init_block_builder(None, sproof).block_builder;
371
372 let block = block_builder.build().unwrap().block;
373
374 let origin = BlockOrigin::NetworkInitialSync;
375 import_block(client, block.clone(), origin, true).await;
376 block
377 }
378
379 fn set_up_components(num_authorities: usize) -> (Arc<Client>, KeystorePtr) {
380 let keystore = Arc::new(sp_keystore::testing::MemoryKeystore::new()) as Arc<_>;
381 for key in sp_keyring::Sr25519Keyring::iter().take(num_authorities) {
382 Keystore::sr25519_generate_new(
383 &*keystore,
384 sp_application_crypto::key_types::AURA,
385 Some(&key.to_seed()),
386 )
387 .expect("Can insert key into MemoryKeyStore");
388 }
389 (Arc::new(TestClientBuilder::new().build()), keystore)
390 }
391
392 #[tokio::test]
398 async fn test_can_build_upon() {
399 let (client, keystore) = set_up_components(6);
400
401 let genesis_hash = client.chain_info().genesis_hash;
402 let mut last_hash = genesis_hash;
403
404 while can_build_upon::<_, _, sp_consensus_aura::sr25519::AuthorityPair>(
406 Slot::from(u64::MAX),
407 Slot::from(u64::MAX),
408 Timestamp::default(),
409 last_hash,
410 genesis_hash,
411 &*client,
412 &keystore,
413 )
414 .await
415 .is_some()
416 {
417 let block = build_and_import_block(&client, genesis_hash).await;
418 last_hash = block.header().hash();
419 }
420
421 let result = can_build_upon::<_, _, sp_consensus_aura::sr25519::AuthorityPair>(
424 Slot::from(u64::MAX),
425 Slot::from(u64::MAX),
426 Timestamp::default(),
427 last_hash,
428 last_hash,
429 &*client,
430 &keystore,
431 )
432 .await;
433 assert!(result.is_some());
434 }
435
436 fn create_overseer_handle() -> (OverseerHandle, Arc<Mutex<Vec<CollatorProtocolMessage>>>) {
438 let messages = Arc::new(Mutex::new(Vec::new()));
439 let messages_clone = messages.clone();
440
441 let (tx, mut rx) = polkadot_node_subsystem_util::metered::channel(100);
442
443 tokio::spawn(async move {
445 while let Some(event) = rx.next().await {
446 if let Event::MsgToSubsystem { msg, .. } = event {
447 if let polkadot_node_subsystem::AllMessages::CollatorProtocol(cp_msg) = msg {
448 messages_clone.lock().unwrap().push(cp_msg);
449 }
450 }
451 }
452 });
453
454 (Handle::new(tx), messages)
455 }
456
457 #[tokio::test]
458 async fn preconnect_when_next_slot_is_ours() {
459 let (client, keystore) = set_up_components(1);
460 let genesis_hash = client.chain_info().genesis_hash;
461 let (overseer_handle, messages_recorder) = create_overseer_handle();
462
463 let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
464
465 let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
467
468 helper
470 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
471 .await;
472
473 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
475
476 let messages = messages_recorder.lock().unwrap();
477 assert_eq!(messages.len(), 1);
478 assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
479 assert_eq!(helper.our_slot, Some(Slot::from(6)));
480 }
481
482 #[tokio::test]
483 async fn preconnect_no_duplicate_connect_message() {
484 let (client, keystore) = set_up_components(1);
485 let genesis_hash = client.chain_info().genesis_hash;
486 let (overseer_handle, messages_recorder) = create_overseer_handle();
487
488 let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
489
490 let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
492
493 helper
495 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
496 .await;
497
498 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
500 assert_eq!(messages_recorder.lock().unwrap().len(), 1);
501 messages_recorder.lock().unwrap().clear();
502
503 helper
505 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
506 .await;
507 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
508 assert_eq!(messages_recorder.lock().unwrap().len(), 0);
509
510 helper
512 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(6), &authorities)
513 .await;
514 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
515 assert_eq!(messages_recorder.lock().unwrap().len(), 0);
516 }
517
518 #[tokio::test]
519 async fn preconnect_disconnect_when_slot_passes() {
520 let (client, keystore) = set_up_components(1);
521 let genesis_hash = client.chain_info().genesis_hash;
522 let (overseer_handle, messages_recorder) = create_overseer_handle();
523
524 let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
525
526 let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
528
529 helper
534 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
535 .await;
536 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
537 assert_eq!(helper.our_slot, Some(Slot::from(6)));
538 messages_recorder.lock().unwrap().clear();
539
540 helper
542 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(8), &authorities)
543 .await;
544 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
545
546 {
547 let messages = messages_recorder.lock().unwrap();
548 assert_eq!(messages.len(), 1, "Expected exactly one disconnect message");
549 assert!(matches!(messages[0], CollatorProtocolMessage::DisconnectFromBackingGroups));
550 assert_eq!(helper.our_slot, None);
551 }
552
553 messages_recorder.lock().unwrap().clear();
554
555 helper
558 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(8), &authorities)
559 .await;
560 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
561
562 let messages = messages_recorder.lock().unwrap();
563 assert_eq!(messages.len(), 0, "Expected no messages");
564 assert_eq!(helper.our_slot, None);
565 }
566
567 #[tokio::test]
568 async fn preconnect_no_disconnect_without_previous_connection() {
569 let (client, keystore) = set_up_components(1);
570 let genesis_hash = client.chain_info().genesis_hash;
571 let (overseer_handle, messages_recorder) = create_overseer_handle();
572
573 let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
574
575 let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
577
578 helper
584 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(1), &authorities)
585 .await;
586
587 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
588 assert_eq!(messages_recorder.lock().unwrap().len(), 0);
590 assert_eq!(helper.our_slot, None);
591 }
592
593 #[tokio::test]
594 async fn preconnect_multiple_cycles() {
595 let (client, keystore) = set_up_components(1);
596 let genesis_hash = client.chain_info().genesis_hash;
597 let (overseer_handle, messages_recorder) = create_overseer_handle();
598
599 let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
600
601 let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
603
604 helper
609 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
610 .await;
611 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
612 {
613 let messages = messages_recorder.lock().unwrap();
614 assert_eq!(messages.len(), 1);
615 assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
616 }
617 assert_eq!(helper.our_slot, Some(Slot::from(6)));
618 messages_recorder.lock().unwrap().clear();
619
620 helper
622 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(7), &authorities)
623 .await;
624 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
625 {
626 let messages = messages_recorder.lock().unwrap();
627 assert_eq!(messages.len(), 1);
628 assert!(matches!(messages[0], CollatorProtocolMessage::DisconnectFromBackingGroups));
629 }
630 assert_eq!(helper.our_slot, None);
631 messages_recorder.lock().unwrap().clear();
632
633 helper
635 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(11), &authorities)
636 .await;
637 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
638 {
639 let messages = messages_recorder.lock().unwrap();
640 assert_eq!(messages.len(), 1);
641 assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
642 }
643 assert_eq!(helper.our_slot, Some(Slot::from(12)));
644 }
645
646 #[tokio::test]
647 async fn preconnect_handles_empty_authorities() {
648 let keystore = Arc::new(sp_keystore::testing::MemoryKeystore::new()) as Arc<_>;
649 let (overseer_handle, messages_recorder) = create_overseer_handle();
650
651 let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
652
653 let authorities = vec![];
655 helper
656 .update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(0), &authorities)
657 .await;
658
659 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
660 assert_eq!(messages_recorder.lock().unwrap().len(), 0);
662 }
663}
664
665pub struct RelayParentData {
667 relay_parent: RelayHeader,
669 descendants: Vec<RelayHeader>,
671}
672
673impl RelayParentData {
674 pub fn new(relay_parent: RelayHeader) -> Self {
676 Self { relay_parent, descendants: Default::default() }
677 }
678
679 pub fn new_with_descendants(relay_parent: RelayHeader, descendants: Vec<RelayHeader>) -> Self {
681 Self { relay_parent, descendants }
682 }
683
684 pub fn relay_parent(&self) -> &RelayHeader {
686 &self.relay_parent
687 }
688
689 #[cfg(test)]
691 pub fn descendants_len(&self) -> usize {
692 self.descendants.len()
693 }
694
695 pub fn into_inherent_descendant_list(self) -> Vec<RelayHeader> {
699 let Self { relay_parent, mut descendants } = self;
700
701 if descendants.is_empty() {
702 return Default::default()
703 }
704
705 let mut result = vec![relay_parent];
706 result.append(&mut descendants);
707 result
708 }
709}