referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_aura/collators/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Stock, pure Aura collators.
19//!
20//! This includes the [`basic`] collator, which only builds on top of the most recently
21//! included parachain block, as well as the [`lookahead`] collator, which prospectively
22//! builds on parachain blocks which have not yet been included in the relay chain.
23
24use crate::collator::SlotClaim;
25use codec::Codec;
26use cumulus_client_consensus_common::{self as consensus_common, ParentSearchParams};
27use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
28use cumulus_primitives_core::{relay_chain::Header as RelayHeader, BlockT};
29use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
30use polkadot_node_subsystem::messages::{CollatorProtocolMessage, RuntimeApiRequest};
31use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot;
32use polkadot_primitives::{
33	Hash as RelayHash, Id as ParaId, OccupiedCoreAssumption, ValidationCodeHash,
34	DEFAULT_SCHEDULING_LOOKAHEAD,
35};
36use sc_consensus_aura::{standalone as aura_internal, AuraApi};
37use sp_api::{ApiExt, ProvideRuntimeApi, RuntimeApiInfo};
38use sp_core::Pair;
39use sp_keystore::KeystorePtr;
40use sp_timestamp::Timestamp;
41
42pub mod basic;
43pub mod lookahead;
44pub mod slot_based;
45
46// This is an arbitrary value which is guaranteed to exceed the required depth for 500ms blocks
47// built with a relay parent offset of 1. It must be larger than the unincluded segment capacity.
48//
49// The formula we use to compute the capacity of the unincluded segment in the parachain runtime
50// is:
51// UNINCLUDED_SEGMENT_CAPACITY = (2 + RELAY_PARENT_OFFSET) * BLOCK_PROCESSING_VELOCITY + 1.
52//
53// Since we only search for parent blocks which have already been imported,
54// we can guarantee that all imported blocks respect the unincluded segment
55// rules specified by the parachain's runtime and thus will never be too deep. This is just an extra
56// sanity check.
57const PARENT_SEARCH_DEPTH: usize = 40;
58
59// Helper to pre-connect to the backing group we got assigned to and keep the connection
60// open until backing group changes or own slot ends.
61struct BackingGroupConnectionHelper {
62	keystore: sp_keystore::KeystorePtr,
63	overseer_handle: OverseerHandle,
64	our_slot: Option<Slot>,
65}
66
67impl BackingGroupConnectionHelper {
68	pub fn new(keystore: sp_keystore::KeystorePtr, overseer_handle: OverseerHandle) -> Self {
69		Self { keystore, overseer_handle, our_slot: None }
70	}
71
72	async fn send_subsystem_message(&mut self, message: CollatorProtocolMessage) {
73		self.overseer_handle.send_msg(message, "BackingGroupConnectionHelper").await;
74	}
75
76	/// Update the current slot and initiate connections to backing groups if needed.
77	pub async fn update<P>(&mut self, current_slot: Slot, authorities: &[P::Public])
78	where
79		P: sp_core::Pair + Send + Sync,
80		P::Public: Codec,
81	{
82		if Some(current_slot) <= self.our_slot {
83			// Current slot or next slot is ours.
84			// We already sent pre-connect message, no need to proceed further.
85			return
86		}
87
88		let next_slot = current_slot + 1;
89		let next_slot_is_ours =
90			aura_internal::claim_slot::<P>(next_slot, authorities, &self.keystore)
91				.await
92				.is_some();
93
94		if next_slot_is_ours {
95			// Only send message if we were not connected. This avoids sending duplicate messages
96			// when running with a single collator.
97			if self.our_slot.is_none() {
98				// Next slot is ours, send connect message.
99				tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is next, connecting to backing groups", next_slot);
100				self.send_subsystem_message(CollatorProtocolMessage::ConnectToBackingGroups)
101					.await;
102			}
103			self.our_slot = Some(next_slot);
104		} else if self.our_slot.take().is_some() {
105			// Next slot is not ours, send disconnect only if we had a slot before.
106			tracing::debug!(target: crate::LOG_TARGET, "Current slot = {}, disconnecting from backing groups", current_slot);
107			self.send_subsystem_message(CollatorProtocolMessage::DisconnectFromBackingGroups)
108				.await;
109		}
110	}
111}
112
113/// Check the `local_validation_code_hash` against the validation code hash in the relay chain
114/// state.
115///
116/// If the code hashes do not match, it prints a warning.
117async fn check_validation_code_or_log(
118	local_validation_code_hash: &ValidationCodeHash,
119	para_id: ParaId,
120	relay_client: &impl RelayChainInterface,
121	relay_parent: RelayHash,
122) {
123	let state_validation_code_hash = match relay_client
124		.validation_code_hash(relay_parent, para_id, OccupiedCoreAssumption::Included)
125		.await
126	{
127		Ok(hash) => hash,
128		Err(error) => {
129			tracing::debug!(
130				target: super::LOG_TARGET,
131				%error,
132				?relay_parent,
133				%para_id,
134				"Failed to fetch validation code hash",
135			);
136			return
137		},
138	};
139
140	match state_validation_code_hash {
141		Some(state) =>
142			if state != *local_validation_code_hash {
143				tracing::warn!(
144					target: super::LOG_TARGET,
145					%para_id,
146					?relay_parent,
147					?local_validation_code_hash,
148					relay_validation_code_hash = ?state,
149					"Parachain code doesn't match validation code stored in the relay chain state.",
150				);
151			},
152		None => {
153			tracing::warn!(
154				target: super::LOG_TARGET,
155				%para_id,
156				?relay_parent,
157				"Could not find validation code for parachain in the relay chain state.",
158			);
159		},
160	}
161}
162
163/// Fetch scheduling lookahead at given relay parent.
164async fn scheduling_lookahead(
165	relay_parent: RelayHash,
166	relay_client: &impl RelayChainInterface,
167) -> Option<u32> {
168	let runtime_api_version = relay_client
169		.version(relay_parent)
170		.await
171		.map_err(|e| {
172			tracing::error!(
173				target: super::LOG_TARGET,
174				error = ?e,
175				"Failed to fetch relay chain runtime version.",
176			)
177		})
178		.ok()?;
179
180	let parachain_host_runtime_api_version = runtime_api_version
181		.api_version(
182			&<dyn polkadot_primitives::runtime_api::ParachainHost<polkadot_primitives::Block>>::ID,
183		)
184		.unwrap_or_default();
185
186	if parachain_host_runtime_api_version <
187		RuntimeApiRequest::SCHEDULING_LOOKAHEAD_RUNTIME_REQUIREMENT
188	{
189		return None
190	}
191
192	match relay_client.scheduling_lookahead(relay_parent).await {
193		Ok(scheduling_lookahead) => Some(scheduling_lookahead),
194		Err(err) => {
195			tracing::error!(
196				target: crate::LOG_TARGET,
197				?err,
198				?relay_parent,
199				"Failed to fetch scheduling lookahead from relay chain",
200			);
201			None
202		},
203	}
204}
205
206// Returns the claim queue at the given relay parent.
207async fn claim_queue_at(
208	relay_parent: RelayHash,
209	relay_client: &impl RelayChainInterface,
210) -> ClaimQueueSnapshot {
211	// Get `ClaimQueue` from runtime
212	match relay_client.claim_queue(relay_parent).await {
213		Ok(claim_queue) => claim_queue.into(),
214		Err(error) => {
215			tracing::error!(
216				target: crate::LOG_TARGET,
217				?error,
218				?relay_parent,
219				"Failed to query claim queue runtime API",
220			);
221			Default::default()
222		},
223	}
224}
225
226// Checks if we own the slot at the given block and whether there
227// is space in the unincluded segment.
228async fn can_build_upon<Block: BlockT, Client, P>(
229	para_slot: Slot,
230	relay_slot: Slot,
231	timestamp: Timestamp,
232	parent_hash: Block::Hash,
233	included_block: Block::Hash,
234	client: &Client,
235	keystore: &KeystorePtr,
236) -> Option<SlotClaim<P::Public>>
237where
238	Client: ProvideRuntimeApi<Block>,
239	Client::Api: AuraApi<Block, P::Public> + AuraUnincludedSegmentApi<Block> + ApiExt<Block>,
240	P: Pair,
241	P::Public: Codec,
242	P::Signature: Codec,
243{
244	let runtime_api = client.runtime_api();
245	let authorities = runtime_api.authorities(parent_hash).ok()?;
246	let author_pub = aura_internal::claim_slot::<P>(para_slot, &authorities, keystore).await?;
247
248	// This function is typically called when we want to build block N. At that point, the
249	// unincluded segment in the runtime is unaware of the hash of block N-1. If the unincluded
250	// segment in the runtime is full, but block N-1 is the included block, the unincluded segment
251	// should have length 0 and we can build. Since the hash is not available to the runtime
252	// however, we need this extra check here.
253	if parent_hash == included_block {
254		return Some(SlotClaim::unchecked::<P>(author_pub, para_slot, timestamp));
255	}
256
257	let api_version = runtime_api
258		.api_version::<dyn AuraUnincludedSegmentApi<Block>>(parent_hash)
259		.ok()
260		.flatten()?;
261
262	let slot = if api_version > 1 { relay_slot } else { para_slot };
263
264	runtime_api
265		.can_build_upon(parent_hash, included_block, slot)
266		.ok()?
267		.then(|| SlotClaim::unchecked::<P>(author_pub, para_slot, timestamp))
268}
269
270/// Use [`cumulus_client_consensus_common::find_potential_parents`] to find parachain blocks that
271/// we can build on. Once a list of potential parents is retrieved, return the last one of the
272/// longest chain.
273async fn find_parent<Block>(
274	relay_parent: RelayHash,
275	para_id: ParaId,
276	para_backend: &impl sc_client_api::Backend<Block>,
277	relay_client: &impl RelayChainInterface,
278) -> Option<(<Block as BlockT>::Header, consensus_common::PotentialParent<Block>)>
279where
280	Block: BlockT,
281{
282	let parent_search_params = ParentSearchParams {
283		relay_parent,
284		para_id,
285		ancestry_lookback: scheduling_lookahead(relay_parent, relay_client)
286			.await
287			.unwrap_or(DEFAULT_SCHEDULING_LOOKAHEAD)
288			.saturating_sub(1) as usize,
289		max_depth: PARENT_SEARCH_DEPTH,
290		ignore_alternative_branches: true,
291	};
292
293	let potential_parents = cumulus_client_consensus_common::find_potential_parents::<Block>(
294		parent_search_params,
295		para_backend,
296		relay_client,
297	)
298	.await;
299
300	let potential_parents = match potential_parents {
301		Err(e) => {
302			tracing::error!(
303				target: crate::LOG_TARGET,
304				?relay_parent,
305				err = ?e,
306				"Could not fetch potential parents to build upon"
307			);
308
309			return None
310		},
311		Ok(x) => x,
312	};
313
314	let included_block = potential_parents.iter().find(|x| x.depth == 0)?.header.clone();
315	potential_parents
316		.into_iter()
317		.max_by_key(|a| a.depth)
318		.map(|parent| (included_block, parent))
319}
320
321#[cfg(test)]
322mod tests {
323	use super::*;
324	use crate::collators::{can_build_upon, BackingGroupConnectionHelper};
325	use codec::Encode;
326	use cumulus_primitives_aura::Slot;
327	use cumulus_primitives_core::BlockT;
328	use cumulus_relay_chain_interface::PHash;
329	use cumulus_test_client::{
330		runtime::{Block, Hash},
331		Client, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder,
332		TestClientBuilderExt,
333	};
334	use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
335	use futures::StreamExt;
336	use polkadot_overseer::{Event, Handle};
337	use polkadot_primitives::HeadData;
338	use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
339	use sp_consensus::BlockOrigin;
340	use sp_keystore::{Keystore, KeystorePtr};
341	use sp_timestamp::Timestamp;
342	use std::sync::{Arc, Mutex};
343
344	async fn import_block<I: BlockImport<Block>>(
345		importer: &I,
346		block: Block,
347		origin: BlockOrigin,
348		import_as_best: bool,
349	) {
350		let (header, body) = block.deconstruct();
351
352		let mut block_import_params = BlockImportParams::new(origin, header);
353		block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
354		block_import_params.body = Some(body);
355		importer.import_block(block_import_params).await.unwrap();
356	}
357
358	fn sproof_with_parent_by_hash(client: &Client, hash: PHash) -> RelayStateSproofBuilder {
359		let header = client.header(hash).ok().flatten().expect("No header for parent block");
360		let included = HeadData(header.encode());
361		let mut builder = RelayStateSproofBuilder::default();
362		builder.para_id = cumulus_test_client::runtime::PARACHAIN_ID.into();
363		builder.included_para_head = Some(included);
364
365		builder
366	}
367	async fn build_and_import_block(client: &Client, included: Hash) -> Block {
368		let sproof = sproof_with_parent_by_hash(client, included);
369
370		let block_builder = client.init_block_builder(None, sproof).block_builder;
371
372		let block = block_builder.build().unwrap().block;
373
374		let origin = BlockOrigin::NetworkInitialSync;
375		import_block(client, block.clone(), origin, true).await;
376		block
377	}
378
379	fn set_up_components(num_authorities: usize) -> (Arc<Client>, KeystorePtr) {
380		let keystore = Arc::new(sp_keystore::testing::MemoryKeystore::new()) as Arc<_>;
381		for key in sp_keyring::Sr25519Keyring::iter().take(num_authorities) {
382			Keystore::sr25519_generate_new(
383				&*keystore,
384				sp_application_crypto::key_types::AURA,
385				Some(&key.to_seed()),
386			)
387			.expect("Can insert key into MemoryKeyStore");
388		}
389		(Arc::new(TestClientBuilder::new().build()), keystore)
390	}
391
392	/// This tests a special scenario where the unincluded segment in the runtime
393	/// is full. We are calling `can_build_upon`, passing the last built block as the
394	/// included one. In the runtime we will not find the hash of the included block in the
395	/// unincluded segment. The `can_build_upon` runtime API would therefore return `false`, but
396	/// we are ensuring on the node side that we are are always able to build on the included block.
397	#[tokio::test]
398	async fn test_can_build_upon() {
399		let (client, keystore) = set_up_components(6);
400
401		let genesis_hash = client.chain_info().genesis_hash;
402		let mut last_hash = genesis_hash;
403
404		// Fill up the unincluded segment tracker in the runtime.
405		while can_build_upon::<_, _, sp_consensus_aura::sr25519::AuthorityPair>(
406			Slot::from(u64::MAX),
407			Slot::from(u64::MAX),
408			Timestamp::default(),
409			last_hash,
410			genesis_hash,
411			&*client,
412			&keystore,
413		)
414		.await
415		.is_some()
416		{
417			let block = build_and_import_block(&client, genesis_hash).await;
418			last_hash = block.header().hash();
419		}
420
421		// Blocks were built with the genesis hash set as included block.
422		// We call `can_build_upon` with the last built block as the included block.
423		let result = can_build_upon::<_, _, sp_consensus_aura::sr25519::AuthorityPair>(
424			Slot::from(u64::MAX),
425			Slot::from(u64::MAX),
426			Timestamp::default(),
427			last_hash,
428			last_hash,
429			&*client,
430			&keystore,
431		)
432		.await;
433		assert!(result.is_some());
434	}
435
436	/// Helper to create a mock overseer handle and message recorder
437	fn create_overseer_handle() -> (OverseerHandle, Arc<Mutex<Vec<CollatorProtocolMessage>>>) {
438		let messages = Arc::new(Mutex::new(Vec::new()));
439		let messages_clone = messages.clone();
440
441		let (tx, mut rx) = polkadot_node_subsystem_util::metered::channel(100);
442
443		// Spawn a task to receive and record overseer messages
444		tokio::spawn(async move {
445			while let Some(event) = rx.next().await {
446				if let Event::MsgToSubsystem { msg, .. } = event {
447					if let polkadot_node_subsystem::AllMessages::CollatorProtocol(cp_msg) = msg {
448						messages_clone.lock().unwrap().push(cp_msg);
449					}
450				}
451			}
452		});
453
454		(Handle::new(tx), messages)
455	}
456
457	#[tokio::test]
458	async fn preconnect_when_next_slot_is_ours() {
459		let (client, keystore) = set_up_components(1);
460		let genesis_hash = client.chain_info().genesis_hash;
461		let (overseer_handle, messages_recorder) = create_overseer_handle();
462
463		let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
464
465		// Fetch authorities for the update call
466		let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
467
468		// Update with slot 5, next slot (6) should be ours
469		helper
470			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
471			.await;
472
473		// Give time for message to be processed
474		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
475
476		let messages = messages_recorder.lock().unwrap();
477		assert_eq!(messages.len(), 1);
478		assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
479		assert_eq!(helper.our_slot, Some(Slot::from(6)));
480	}
481
482	#[tokio::test]
483	async fn preconnect_no_duplicate_connect_message() {
484		let (client, keystore) = set_up_components(1);
485		let genesis_hash = client.chain_info().genesis_hash;
486		let (overseer_handle, messages_recorder) = create_overseer_handle();
487
488		let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
489
490		// Fetch authorities for the update calls
491		let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
492
493		// Update with slot 5, next slot (6) is ours
494		helper
495			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
496			.await;
497
498		// Give time for message to be processed
499		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
500		assert_eq!(messages_recorder.lock().unwrap().len(), 1);
501		messages_recorder.lock().unwrap().clear();
502
503		// Update with slot 5 again - should not send another message
504		helper
505			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
506			.await;
507		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
508		assert_eq!(messages_recorder.lock().unwrap().len(), 0);
509
510		// Update with slot 1 (our slot) - should not send another message
511		helper
512			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(6), &authorities)
513			.await;
514		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
515		assert_eq!(messages_recorder.lock().unwrap().len(), 0);
516	}
517
518	#[tokio::test]
519	async fn preconnect_disconnect_when_slot_passes() {
520		let (client, keystore) = set_up_components(1);
521		let genesis_hash = client.chain_info().genesis_hash;
522		let (overseer_handle, messages_recorder) = create_overseer_handle();
523
524		let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
525
526		// Fetch authorities for the update calls
527		let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
528
529		// Slot 0 -> Alice, Slot 1 -> Bob, Slot 2 -> Charlie, Slot 3 -> Dave, Slot 4 -> Eve,
530		// Slot 5 -> Ferdie, Slot 6 -> Alice
531
532		// Update with slot 5, next slot (6) is ours -> should connect
533		helper
534			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
535			.await;
536		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
537		assert_eq!(helper.our_slot, Some(Slot::from(6)));
538		messages_recorder.lock().unwrap().clear();
539
540		// Update with slot 8, next slot (9) is Charlie's -> should disconnect
541		helper
542			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(8), &authorities)
543			.await;
544		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
545
546		{
547			let messages = messages_recorder.lock().unwrap();
548			assert_eq!(messages.len(), 1, "Expected exactly one disconnect message");
549			assert!(matches!(messages[0], CollatorProtocolMessage::DisconnectFromBackingGroups));
550			assert_eq!(helper.our_slot, None);
551		}
552
553		messages_recorder.lock().unwrap().clear();
554
555		// Update again with slot 8, next slot (9) is Charlie's -> should not send another
556		// disconnect message
557		helper
558			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(8), &authorities)
559			.await;
560		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
561
562		let messages = messages_recorder.lock().unwrap();
563		assert_eq!(messages.len(), 0, "Expected no messages");
564		assert_eq!(helper.our_slot, None);
565	}
566
567	#[tokio::test]
568	async fn preconnect_no_disconnect_without_previous_connection() {
569		let (client, keystore) = set_up_components(1);
570		let genesis_hash = client.chain_info().genesis_hash;
571		let (overseer_handle, messages_recorder) = create_overseer_handle();
572
573		let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
574
575		// Fetch authorities for the update call
576		let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
577
578		// Slot 0 -> Alice, Slot 1 -> Bob, Slot 2 -> Charlie, Slot 3 -> Dave, Slot 4 -> Eve,
579		// Slot 5 -> Ferdie
580
581		// Update with slot 1 (Bob's slot), next slot (2) is Charlie's
582		// Since we never connected before (our_slot is None), we should not send disconnect
583		helper
584			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(1), &authorities)
585			.await;
586
587		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
588		// Should not send any message since we never connected
589		assert_eq!(messages_recorder.lock().unwrap().len(), 0);
590		assert_eq!(helper.our_slot, None);
591	}
592
593	#[tokio::test]
594	async fn preconnect_multiple_cycles() {
595		let (client, keystore) = set_up_components(1);
596		let genesis_hash = client.chain_info().genesis_hash;
597		let (overseer_handle, messages_recorder) = create_overseer_handle();
598
599		let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
600
601		// Fetch authorities for the update calls
602		let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
603
604		// Slot 0 -> Alice, Slot 1 -> Bob, Slot 2 -> Charlie, Slot 3 -> Dave, Slot 4 -> Eve,
605		// Slot 5 -> Ferdie, Slot 6 -> Alice, Slot 7 -> Bob, ...
606
607		// Cycle 1: Connect at slot 5, next slot (6) is ours
608		helper
609			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
610			.await;
611		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
612		{
613			let messages = messages_recorder.lock().unwrap();
614			assert_eq!(messages.len(), 1);
615			assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
616		}
617		assert_eq!(helper.our_slot, Some(Slot::from(6)));
618		messages_recorder.lock().unwrap().clear();
619
620		// Cycle 1: Disconnect at slot 7, next slot (8) is Charlie's
621		helper
622			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(7), &authorities)
623			.await;
624		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
625		{
626			let messages = messages_recorder.lock().unwrap();
627			assert_eq!(messages.len(), 1);
628			assert!(matches!(messages[0], CollatorProtocolMessage::DisconnectFromBackingGroups));
629		}
630		assert_eq!(helper.our_slot, None);
631		messages_recorder.lock().unwrap().clear();
632
633		// Cycle 2: Connect again at slot 11, next slot (12) is ours
634		helper
635			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(11), &authorities)
636			.await;
637		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
638		{
639			let messages = messages_recorder.lock().unwrap();
640			assert_eq!(messages.len(), 1);
641			assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
642		}
643		assert_eq!(helper.our_slot, Some(Slot::from(12)));
644	}
645
646	#[tokio::test]
647	async fn preconnect_handles_empty_authorities() {
648		let keystore = Arc::new(sp_keystore::testing::MemoryKeystore::new()) as Arc<_>;
649		let (overseer_handle, messages_recorder) = create_overseer_handle();
650
651		let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
652
653		// Pass empty authorities list
654		let authorities = vec![];
655		helper
656			.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(0), &authorities)
657			.await;
658
659		tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
660		// Should not send any message if authorities list is empty
661		assert_eq!(messages_recorder.lock().unwrap().len(), 0);
662	}
663}
664
665/// Holds a relay parent and its descendants.
666pub struct RelayParentData {
667	/// The relay parent block header
668	relay_parent: RelayHeader,
669	/// Ordered collection of descendant block headers, from oldest to newest
670	descendants: Vec<RelayHeader>,
671}
672
673impl RelayParentData {
674	/// Creates a new instance with the given relay parent and no descendants.
675	pub fn new(relay_parent: RelayHeader) -> Self {
676		Self { relay_parent, descendants: Default::default() }
677	}
678
679	/// Creates a new instance with the given relay parent and descendants.
680	pub fn new_with_descendants(relay_parent: RelayHeader, descendants: Vec<RelayHeader>) -> Self {
681		Self { relay_parent, descendants }
682	}
683
684	/// Returns a reference to the relay parent header.
685	pub fn relay_parent(&self) -> &RelayHeader {
686		&self.relay_parent
687	}
688
689	/// Returns the number of descendants.
690	#[cfg(test)]
691	pub fn descendants_len(&self) -> usize {
692		self.descendants.len()
693	}
694
695	/// Consumes the structure and returns a vector containing the relay parent followed by its
696	/// descendants in chronological order. The resulting list should be provided to the parachain
697	/// inherent data.
698	pub fn into_inherent_descendant_list(self) -> Vec<RelayHeader> {
699		let Self { relay_parent, mut descendants } = self;
700
701		if descendants.is_empty() {
702			return Default::default()
703		}
704
705		let mut result = vec![relay_parent];
706		result.append(&mut descendants);
707		result
708	}
709}