referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_aura/collators/slot_based/
slot_timer.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use crate::LOG_TARGET;
19use codec::Codec;
20use cumulus_primitives_aura::Slot;
21use cumulus_primitives_core::BlockT;
22use sc_client_api::UsageProvider;
23use sc_consensus_aura::SlotDuration;
24use sp_api::ProvideRuntimeApi;
25use sp_application_crypto::AppPublic;
26use sp_consensus_aura::AuraApi;
27use sp_core::Pair;
28use sp_runtime::traits::Member;
29use sp_timestamp::Timestamp;
30use std::{
31	cmp::{max, min},
32	sync::Arc,
33	time::Duration,
34};
35
36/// Lower limits of allowed block production interval.
37/// Defensive mechanism, corresponds to 12 cores at 6 second block time.
38const BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS: Duration = Duration::from_millis(500);
39
40#[derive(Debug)]
41pub(crate) struct SlotInfo {
42	pub timestamp: Timestamp,
43	pub slot: Slot,
44}
45
46/// Manages block-production timings based on chain parameters and assigned cores.
47#[derive(Debug)]
48pub(crate) struct SlotTimer<Block, Client, P> {
49	/// Client that is used for runtime calls
50	client: Arc<Client>,
51	/// Offset the current time by this duration.
52	time_offset: Duration,
53	/// Last reported core count.
54	last_reported_core_num: Option<u32>,
55	/// Slot duration of the relay chain. This is used to compute how man block-production
56	/// attempts we should trigger per relay chain block.
57	relay_slot_duration: Duration,
58	/// Stores the latest slot that was reported by [`Self::wait_until_next_slot`].
59	last_reported_slot: Option<Slot>,
60	_marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
61}
62
63/// Compute when to try block-authoring next.
64/// The exact time point is determined by the slot duration of relay- and parachain as
65/// well as the last observed core count. If more cores are available, we attempt to author blocks
66/// for them.
67///
68/// Returns a tuple with:
69/// - `Duration`: How long to wait until the next slot.
70/// - `Slot`: The AURA slot used for authoring
71fn compute_next_wake_up_time(
72	para_slot_duration: SlotDuration,
73	relay_slot_duration: Duration,
74	core_count: Option<u32>,
75	time_now: Duration,
76	time_offset: Duration,
77) -> (Duration, Slot) {
78	let para_slots_per_relay_block =
79		(relay_slot_duration.as_millis() / para_slot_duration.as_millis() as u128) as u32;
80	let assigned_core_num = core_count.unwrap_or(1);
81
82	// Trigger at least once per relay block, if we have for example 12 second slot duration,
83	// we should still produce two blocks if we are scheduled on every relay block.
84	let mut block_production_interval = min(para_slot_duration.as_duration(), relay_slot_duration);
85
86	if assigned_core_num > para_slots_per_relay_block &&
87		para_slot_duration.as_duration() >= relay_slot_duration
88	{
89		block_production_interval =
90			max(relay_slot_duration / assigned_core_num, BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS);
91		tracing::debug!(
92			target: LOG_TARGET,
93			?block_production_interval,
94			"Expected to produce for {assigned_core_num} cores but only have {para_slots_per_relay_block} slots. Attempting to produce multiple blocks per slot."
95		);
96	}
97
98	let (duration, timestamp) =
99		time_until_next_attempt(time_now, block_production_interval, time_offset);
100	let aura_slot = Slot::from_timestamp(timestamp, para_slot_duration);
101	(duration, aura_slot)
102}
103
104/// Returns current duration since Unix epoch.
105fn duration_now() -> Duration {
106	use std::time::SystemTime;
107	let now = SystemTime::now();
108	now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| {
109		panic!("Current time {:?} is before Unix epoch. Something is wrong: {:?}", now, e)
110	})
111}
112
113/// Returns the duration until the next block production should be attempted.
114/// Returns:
115/// - Duration: The duration until the next attempt.
116fn time_until_next_attempt(
117	now: Duration,
118	block_production_interval: Duration,
119	offset: Duration,
120) -> (Duration, Timestamp) {
121	let now = now.as_millis().saturating_sub(offset.as_millis());
122
123	let next_slot_time = ((now + block_production_interval.as_millis()) /
124		block_production_interval.as_millis()) *
125		block_production_interval.as_millis();
126	let remaining_millis = next_slot_time - now;
127	(Duration::from_millis(remaining_millis as u64), Timestamp::from(next_slot_time as u64))
128}
129
130impl<Block, Client, P> SlotTimer<Block, Client, P>
131where
132	Block: BlockT,
133	Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + UsageProvider<Block>,
134	Client::Api: AuraApi<Block, P::Public>,
135	P: Pair,
136	P::Public: AppPublic + Member + Codec,
137	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
138{
139	/// Create a new slot timer.
140	pub fn new_with_offset(
141		client: Arc<Client>,
142		time_offset: Duration,
143		relay_slot_duration: Duration,
144	) -> Self {
145		Self {
146			client,
147			time_offset,
148			last_reported_core_num: None,
149			relay_slot_duration,
150			last_reported_slot: None,
151			_marker: Default::default(),
152		}
153	}
154
155	/// Inform the slot timer about the last seen number of cores.
156	pub fn update_scheduling(&mut self, num_cores_next_block: u32) {
157		self.last_reported_core_num = Some(num_cores_next_block);
158	}
159
160	/// Returns the next slot and how much time left until then.
161	pub fn time_until_next_slot(&mut self) -> Result<(Duration, Slot), ()> {
162		let Ok(slot_duration) = crate::slot_duration(&*self.client) else {
163			tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
164			return Err(())
165		};
166
167		Ok(compute_next_wake_up_time(
168			slot_duration,
169			self.relay_slot_duration,
170			self.last_reported_core_num,
171			duration_now(),
172			self.time_offset,
173		))
174	}
175
176	/// Returns a future that resolves when the next block production should be attempted.
177	pub async fn wait_until_next_slot(&mut self) -> Result<(), ()> {
178		let Ok(slot_duration) = crate::slot_duration(&*self.client) else {
179			tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
180			return Err(())
181		};
182
183		let (time_until_next_attempt, mut next_aura_slot) = self.time_until_next_slot()?;
184
185		match self.last_reported_slot {
186			// If we already reported a slot, we don't want to skip a slot. But we also don't want
187			// to go through all the slots if a node was halted for some reason.
188			Some(ls) if ls + 1 < next_aura_slot && next_aura_slot <= ls + 3 => {
189				next_aura_slot = ls + 1u64;
190			},
191			None | Some(_) => {
192				tokio::time::sleep(time_until_next_attempt).await;
193			},
194		}
195
196		tracing::debug!(
197			target: LOG_TARGET,
198			?slot_duration,
199			aura_slot = ?next_aura_slot,
200			"New block production opportunity."
201		);
202
203		self.last_reported_slot = Some(next_aura_slot);
204		Ok(())
205	}
206}
207
208#[cfg(test)]
209mod tests {
210	use super::*;
211	use rstest::rstest;
212	use sc_consensus_aura::SlotDuration;
213	const RELAY_CHAIN_SLOT_DURATION: u64 = 6000;
214
215	#[rstest]
216	// Test that different now timestamps have correct impact
217	//                    ||||
218	#[case(6000, Some(1), 1000, 0, 5000)]
219	#[case(6000, Some(1), 0, 0, 6000)]
220	#[case(6000, Some(1), 6000, 0, 6000)]
221	#[case(6000, Some(0), 6000, 0, 6000)]
222	// Test that `None` core defaults to 1
223	//           ||||
224	#[case(6000, None, 1000, 0, 5000)]
225	#[case(6000, None, 0, 0, 6000)]
226	#[case(6000, None, 6000, 0, 6000)]
227	// Test that offset affects the current time correctly
228	//                          ||||
229	#[case(6000, Some(1), 1000, 1000, 6000)]
230	#[case(6000, Some(1), 12000, 2000, 2000)]
231	#[case(6000, Some(1), 12000, 6000, 6000)]
232	#[case(6000, Some(1), 12000, 7000, 1000)]
233	// Test that number of cores affects the block production interval
234	//           |||||||
235	#[case(6000, Some(3), 12000, 0, 2000)]
236	#[case(6000, Some(2), 12000, 0, 3000)]
237	#[case(6000, Some(3), 11999, 0, 1)]
238	// High core count
239	//           ||||||||
240	#[case(6000, Some(12), 0, 0, 500)]
241	/// Test that the minimum block interval is respected
242	/// at high core counts.
243	///          |||||||||
244	#[case(6000, Some(100), 0, 0, 500)]
245	// Test that slot_duration works correctly
246	//     ||||
247	#[case(2000, Some(1), 1000, 0, 1000)]
248	#[case(2000, Some(1), 3000, 0, 1000)]
249	#[case(2000, Some(1), 10000, 0, 2000)]
250	#[case(2000, Some(2), 1000, 0, 1000)]
251	// Cores are ignored if relay_slot_duration != para_slot_duration
252	//           |||||||
253	#[case(2000, Some(3), 3000, 0, 1000)]
254	// For long slot durations, we should still check
255	// every relay chain block for the slot.
256	//     |||||
257	#[case(12000, None, 0, 0, 6000)]
258	#[case(12000, None, 6100, 0, 5900)]
259	#[case(12000, None, 6000, 2000, 2000)]
260	#[case(12000, Some(2), 6000, 0, 3000)]
261	#[case(12000, Some(3), 6000, 0, 2000)]
262	#[case(12000, Some(3), 8100, 0, 1900)]
263	fn test_get_next_slot(
264		#[case] para_slot_millis: u64,
265		#[case] core_count: Option<u32>,
266		#[case] time_now: u64,
267		#[case] offset_millis: u64,
268		#[case] expected_wait_duration: u128,
269	) {
270		let para_slot_duration = SlotDuration::from_millis(para_slot_millis); // 6 second slots
271		let relay_slot_duration = Duration::from_millis(RELAY_CHAIN_SLOT_DURATION);
272		let time_now = Duration::from_millis(time_now); // 1 second passed
273		let offset = Duration::from_millis(offset_millis);
274
275		let (wait_duration, _) = compute_next_wake_up_time(
276			para_slot_duration,
277			relay_slot_duration,
278			core_count,
279			time_now,
280			offset,
281		);
282
283		assert_eq!(wait_duration.as_millis(), expected_wait_duration, "Wait time mismatch."); // Should wait 5 seconds
284	}
285}