1use crate::LOG_TARGET;
19use codec::Codec;
20use cumulus_primitives_aura::Slot;
21use cumulus_primitives_core::BlockT;
22use sc_client_api::UsageProvider;
23use sc_consensus_aura::SlotDuration;
24use sp_api::ProvideRuntimeApi;
25use sp_application_crypto::AppPublic;
26use sp_consensus_aura::AuraApi;
27use sp_core::Pair;
28use sp_runtime::traits::Member;
29use sp_timestamp::Timestamp;
30use std::{
31 cmp::{max, min},
32 sync::Arc,
33 time::Duration,
34};
35
36const BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS: Duration = Duration::from_millis(500);
39
40#[derive(Debug)]
41pub(crate) struct SlotInfo {
42 pub timestamp: Timestamp,
43 pub slot: Slot,
44}
45
46#[derive(Debug)]
48pub(crate) struct SlotTimer<Block, Client, P> {
49 client: Arc<Client>,
51 time_offset: Duration,
53 last_reported_core_num: Option<u32>,
55 relay_slot_duration: Duration,
58 last_reported_slot: Option<Slot>,
60 _marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
61}
62
63fn compute_next_wake_up_time(
72 para_slot_duration: SlotDuration,
73 relay_slot_duration: Duration,
74 core_count: Option<u32>,
75 time_now: Duration,
76 time_offset: Duration,
77) -> (Duration, Slot) {
78 let para_slots_per_relay_block =
79 (relay_slot_duration.as_millis() / para_slot_duration.as_millis() as u128) as u32;
80 let assigned_core_num = core_count.unwrap_or(1);
81
82 let mut block_production_interval = min(para_slot_duration.as_duration(), relay_slot_duration);
85
86 if assigned_core_num > para_slots_per_relay_block &&
87 para_slot_duration.as_duration() >= relay_slot_duration
88 {
89 block_production_interval =
90 max(relay_slot_duration / assigned_core_num, BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS);
91 tracing::debug!(
92 target: LOG_TARGET,
93 ?block_production_interval,
94 "Expected to produce for {assigned_core_num} cores but only have {para_slots_per_relay_block} slots. Attempting to produce multiple blocks per slot."
95 );
96 }
97
98 let (duration, timestamp) =
99 time_until_next_attempt(time_now, block_production_interval, time_offset);
100 let aura_slot = Slot::from_timestamp(timestamp, para_slot_duration);
101 (duration, aura_slot)
102}
103
104fn duration_now() -> Duration {
106 use std::time::SystemTime;
107 let now = SystemTime::now();
108 now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| {
109 panic!("Current time {:?} is before Unix epoch. Something is wrong: {:?}", now, e)
110 })
111}
112
113fn time_until_next_attempt(
117 now: Duration,
118 block_production_interval: Duration,
119 offset: Duration,
120) -> (Duration, Timestamp) {
121 let now = now.as_millis().saturating_sub(offset.as_millis());
122
123 let next_slot_time = ((now + block_production_interval.as_millis()) /
124 block_production_interval.as_millis()) *
125 block_production_interval.as_millis();
126 let remaining_millis = next_slot_time - now;
127 (Duration::from_millis(remaining_millis as u64), Timestamp::from(next_slot_time as u64))
128}
129
130impl<Block, Client, P> SlotTimer<Block, Client, P>
131where
132 Block: BlockT,
133 Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + UsageProvider<Block>,
134 Client::Api: AuraApi<Block, P::Public>,
135 P: Pair,
136 P::Public: AppPublic + Member + Codec,
137 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
138{
139 pub fn new_with_offset(
141 client: Arc<Client>,
142 time_offset: Duration,
143 relay_slot_duration: Duration,
144 ) -> Self {
145 Self {
146 client,
147 time_offset,
148 last_reported_core_num: None,
149 relay_slot_duration,
150 last_reported_slot: None,
151 _marker: Default::default(),
152 }
153 }
154
155 pub fn update_scheduling(&mut self, num_cores_next_block: u32) {
157 self.last_reported_core_num = Some(num_cores_next_block);
158 }
159
160 pub fn time_until_next_slot(&mut self) -> Result<(Duration, Slot), ()> {
162 let Ok(slot_duration) = crate::slot_duration(&*self.client) else {
163 tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
164 return Err(())
165 };
166
167 Ok(compute_next_wake_up_time(
168 slot_duration,
169 self.relay_slot_duration,
170 self.last_reported_core_num,
171 duration_now(),
172 self.time_offset,
173 ))
174 }
175
176 pub async fn wait_until_next_slot(&mut self) -> Result<(), ()> {
178 let Ok(slot_duration) = crate::slot_duration(&*self.client) else {
179 tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
180 return Err(())
181 };
182
183 let (time_until_next_attempt, mut next_aura_slot) = self.time_until_next_slot()?;
184
185 match self.last_reported_slot {
186 Some(ls) if ls + 1 < next_aura_slot && next_aura_slot <= ls + 3 => {
189 next_aura_slot = ls + 1u64;
190 },
191 None | Some(_) => {
192 tokio::time::sleep(time_until_next_attempt).await;
193 },
194 }
195
196 tracing::debug!(
197 target: LOG_TARGET,
198 ?slot_duration,
199 aura_slot = ?next_aura_slot,
200 "New block production opportunity."
201 );
202
203 self.last_reported_slot = Some(next_aura_slot);
204 Ok(())
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use rstest::rstest;
212 use sc_consensus_aura::SlotDuration;
213 const RELAY_CHAIN_SLOT_DURATION: u64 = 6000;
214
215 #[rstest]
216 #[case(6000, Some(1), 1000, 0, 5000)]
219 #[case(6000, Some(1), 0, 0, 6000)]
220 #[case(6000, Some(1), 6000, 0, 6000)]
221 #[case(6000, Some(0), 6000, 0, 6000)]
222 #[case(6000, None, 1000, 0, 5000)]
225 #[case(6000, None, 0, 0, 6000)]
226 #[case(6000, None, 6000, 0, 6000)]
227 #[case(6000, Some(1), 1000, 1000, 6000)]
230 #[case(6000, Some(1), 12000, 2000, 2000)]
231 #[case(6000, Some(1), 12000, 6000, 6000)]
232 #[case(6000, Some(1), 12000, 7000, 1000)]
233 #[case(6000, Some(3), 12000, 0, 2000)]
236 #[case(6000, Some(2), 12000, 0, 3000)]
237 #[case(6000, Some(3), 11999, 0, 1)]
238 #[case(6000, Some(12), 0, 0, 500)]
241 #[case(6000, Some(100), 0, 0, 500)]
245 #[case(2000, Some(1), 1000, 0, 1000)]
248 #[case(2000, Some(1), 3000, 0, 1000)]
249 #[case(2000, Some(1), 10000, 0, 2000)]
250 #[case(2000, Some(2), 1000, 0, 1000)]
251 #[case(2000, Some(3), 3000, 0, 1000)]
254 #[case(12000, None, 0, 0, 6000)]
258 #[case(12000, None, 6100, 0, 5900)]
259 #[case(12000, None, 6000, 2000, 2000)]
260 #[case(12000, Some(2), 6000, 0, 3000)]
261 #[case(12000, Some(3), 6000, 0, 2000)]
262 #[case(12000, Some(3), 8100, 0, 1900)]
263 fn test_get_next_slot(
264 #[case] para_slot_millis: u64,
265 #[case] core_count: Option<u32>,
266 #[case] time_now: u64,
267 #[case] offset_millis: u64,
268 #[case] expected_wait_duration: u128,
269 ) {
270 let para_slot_duration = SlotDuration::from_millis(para_slot_millis); let relay_slot_duration = Duration::from_millis(RELAY_CHAIN_SLOT_DURATION);
272 let time_now = Duration::from_millis(time_now); let offset = Duration::from_millis(offset_millis);
274
275 let (wait_duration, _) = compute_next_wake_up_time(
276 para_slot_duration,
277 relay_slot_duration,
278 core_count,
279 time_now,
280 offset,
281 );
282
283 assert_eq!(wait_duration.as_millis(), expected_wait_duration, "Wait time mismatch."); }
285}