polkadot_node_primitives/approval/
time.rs1use futures::{
20 future::BoxFuture,
21 prelude::*,
22 stream::{FusedStream, FuturesUnordered},
23 Stream, StreamExt,
24};
25
26use crate::approval::v1::DelayTranche;
27use sp_consensus_slots::Slot;
28use std::{
29 collections::HashSet,
30 pin::Pin,
31 task::Poll,
32 time::{Duration, SystemTime},
33};
34
35use polkadot_primitives::{Hash, ValidatorIndex};
36pub const TICK_DURATION_MILLIS: u64 = 500;
38
39pub type Tick = u64;
41
42pub const TICK_TOO_FAR_IN_FUTURE: Tick = 20; pub trait Clock {
48 fn tick_now(&self) -> Tick;
50
51 fn wait(&self, tick: Tick) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
53}
54
55pub trait ClockExt {
57 fn tranche_now(&self, slot_duration_millis: u64, base_slot: Slot) -> DelayTranche;
59}
60
61impl<C: Clock + ?Sized> ClockExt for C {
62 fn tranche_now(&self, slot_duration_millis: u64, base_slot: Slot) -> DelayTranche {
63 self.tick_now()
64 .saturating_sub(slot_number_to_tick(slot_duration_millis, base_slot)) as u32
65 }
66}
67
68#[derive(Clone)]
70pub struct SystemClock;
71
72impl Clock for SystemClock {
73 fn tick_now(&self) -> Tick {
75 match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
76 Err(_) => 0,
77 Ok(d) => d.as_millis() as u64 / TICK_DURATION_MILLIS,
78 }
79 }
80
81 fn wait(&self, tick: Tick) -> Pin<Box<dyn Future<Output = ()> + Send>> {
83 let fut = async move {
84 let now = SystemTime::now();
85 let tick_onset = tick_to_time(tick);
86 if now < tick_onset {
87 if let Some(until) = tick_onset.duration_since(now).ok() {
88 futures_timer::Delay::new(until).await;
89 }
90 }
91 };
92
93 Box::pin(fut)
94 }
95}
96
97fn tick_to_time(tick: Tick) -> SystemTime {
98 SystemTime::UNIX_EPOCH + Duration::from_millis(TICK_DURATION_MILLIS * tick)
99}
100
101pub fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick {
103 let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS;
104 u64::from(slot) * ticks_per_slot
105}
106
107pub fn tick_to_slot_number(slot_duration_millis: u64, tick: Tick) -> Slot {
109 let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS;
110 (tick / ticks_per_slot).into()
111}
112
113pub fn tranche_to_tick(slot_duration_millis: u64, slot: Slot, tranche: u32) -> Tick {
115 slot_number_to_tick(slot_duration_millis, slot) + tranche as u64
116}
117
118#[derive(Default)]
122pub struct DelayedApprovalTimer {
123 timers: FuturesUnordered<BoxFuture<'static, (Hash, ValidatorIndex)>>,
124 blocks: HashSet<Hash>,
125}
126
127impl DelayedApprovalTimer {
128 pub fn maybe_arm_timer(
133 &mut self,
134 wait_until: Tick,
135 clock: &dyn Clock,
136 block_hash: Hash,
137 validator_index: ValidatorIndex,
138 ) {
139 if self.blocks.insert(block_hash) {
140 let clock_wait = clock.wait(wait_until);
141 self.timers.push(Box::pin(async move {
142 clock_wait.await;
143 (block_hash, validator_index)
144 }));
145 }
146 }
147}
148
149impl Stream for DelayedApprovalTimer {
150 type Item = (Hash, ValidatorIndex);
151
152 fn poll_next(
153 mut self: std::pin::Pin<&mut Self>,
154 cx: &mut std::task::Context<'_>,
155 ) -> std::task::Poll<Option<Self::Item>> {
156 let poll_result = self.timers.poll_next_unpin(cx);
157 match poll_result {
158 Poll::Ready(Some(result)) => {
159 self.blocks.remove(&result.0);
160 Poll::Ready(Some(result))
161 },
162 _ => poll_result,
163 }
164 }
165}
166
167impl FusedStream for DelayedApprovalTimer {
168 fn is_terminated(&self) -> bool {
169 self.timers.is_terminated()
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use std::time::Duration;
176
177 use futures::{executor::block_on, FutureExt, StreamExt};
178 use futures_timer::Delay;
179 use polkadot_primitives::{Hash, ValidatorIndex};
180
181 use crate::approval::time::{Clock, SystemClock};
182
183 use super::DelayedApprovalTimer;
184
185 #[test]
186 fn test_select_empty_timer() {
187 block_on(async move {
188 let mut timer = DelayedApprovalTimer::default();
189
190 for _ in 1..10 {
191 let result = futures::select!(
192 _ = timer.select_next_some() => {
193 0
194 }
195 _ = Delay::new(Duration::from_millis(100)).fuse() => {
197 1
198 }
199 );
200
201 assert_eq!(result, 1);
202 }
203 });
204 }
205
206 #[test]
207 fn test_timer_functionality() {
208 block_on(async move {
209 let mut timer = DelayedApprovalTimer::default();
210 let test_hashes =
211 vec![Hash::repeat_byte(0x01), Hash::repeat_byte(0x02), Hash::repeat_byte(0x03)];
212 for (index, hash) in test_hashes.iter().enumerate() {
213 timer.maybe_arm_timer(
214 SystemClock.tick_now() + index as u64,
215 &SystemClock,
216 *hash,
217 ValidatorIndex::from(2),
218 );
219 timer.maybe_arm_timer(
220 SystemClock.tick_now() + index as u64,
221 &SystemClock,
222 *hash,
223 ValidatorIndex::from(2),
224 );
225 }
226 let timeout_hash = Hash::repeat_byte(0x02);
227 for i in 0..test_hashes.len() * 2 {
228 let result = futures::select!(
229 (hash, _) = timer.select_next_some() => {
230 hash
231 }
232 _ = Delay::new(Duration::from_secs(2)).fuse() => {
234 timeout_hash
235 }
236 );
237 assert_eq!(test_hashes.get(i).cloned().unwrap_or(timeout_hash), result);
238 }
239
240 for (index, hash) in test_hashes.iter().enumerate() {
242 timer.maybe_arm_timer(
243 SystemClock.tick_now() + index as u64,
244 &SystemClock,
245 *hash,
246 ValidatorIndex::from(2),
247 );
248 timer.maybe_arm_timer(
249 SystemClock.tick_now() + index as u64,
250 &SystemClock,
251 *hash,
252 ValidatorIndex::from(2),
253 );
254 }
255
256 for i in 0..test_hashes.len() * 2 {
257 let result = futures::select!(
258 (hash, _) = timer.select_next_some() => {
259 hash
260 }
261 _ = Delay::new(Duration::from_secs(2)).fuse() => {
263 timeout_hash
264 }
265 );
266 assert_eq!(test_hashes.get(i).cloned().unwrap_or(timeout_hash), result);
267 }
268 });
269 }
270}