referrerpolicy=no-referrer-when-downgrade

polkadot_node_primitives/approval/
time.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Time utilities for approval voting subsystems.
18
19use futures::{
20	future::BoxFuture,
21	prelude::*,
22	stream::{FusedStream, FuturesUnordered},
23	Stream, StreamExt,
24};
25
26use crate::approval::v1::DelayTranche;
27use sp_consensus_slots::Slot;
28use std::{
29	collections::HashSet,
30	pin::Pin,
31	task::Poll,
32	time::{Duration, SystemTime},
33};
34
35use polkadot_primitives::{Hash, ValidatorIndex};
36/// The duration of a single tick in milliseconds.
37pub const TICK_DURATION_MILLIS: u64 = 500;
38
39/// A base unit of time, starting from the Unix epoch, split into half-second intervals.
40pub type Tick = u64;
41
42/// How far in the future a tick can be accepted.
43pub const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
44
45/// A clock which allows querying of the current tick as well as
46/// waiting for a tick to be reached.
47pub trait Clock {
48	/// Yields the current tick.
49	fn tick_now(&self) -> Tick;
50
51	/// Yields a future which concludes when the given tick is reached.
52	fn wait(&self, tick: Tick) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
53}
54
55/// Extension methods for clocks.
56pub trait ClockExt {
57	/// Returns the current tranche.
58	fn tranche_now(&self, slot_duration_millis: u64, base_slot: Slot) -> DelayTranche;
59}
60
61impl<C: Clock + ?Sized> ClockExt for C {
62	fn tranche_now(&self, slot_duration_millis: u64, base_slot: Slot) -> DelayTranche {
63		self.tick_now()
64			.saturating_sub(slot_number_to_tick(slot_duration_millis, base_slot)) as u32
65	}
66}
67
68/// A clock which uses the actual underlying system clock.
69#[derive(Clone)]
70pub struct SystemClock;
71
72impl Clock for SystemClock {
73	/// Yields the current tick.
74	fn tick_now(&self) -> Tick {
75		match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
76			Err(_) => 0,
77			Ok(d) => d.as_millis() as u64 / TICK_DURATION_MILLIS,
78		}
79	}
80
81	/// Yields a future which concludes when the given tick is reached.
82	fn wait(&self, tick: Tick) -> Pin<Box<dyn Future<Output = ()> + Send>> {
83		let fut = async move {
84			let now = SystemTime::now();
85			let tick_onset = tick_to_time(tick);
86			if now < tick_onset {
87				if let Some(until) = tick_onset.duration_since(now).ok() {
88					futures_timer::Delay::new(until).await;
89				}
90			}
91		};
92
93		Box::pin(fut)
94	}
95}
96
97fn tick_to_time(tick: Tick) -> SystemTime {
98	SystemTime::UNIX_EPOCH + Duration::from_millis(TICK_DURATION_MILLIS * tick)
99}
100
101/// assumes `slot_duration_millis` evenly divided by tick duration.
102pub fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick {
103	let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS;
104	u64::from(slot) * ticks_per_slot
105}
106
107/// Converts a tick to the slot number.
108pub fn tick_to_slot_number(slot_duration_millis: u64, tick: Tick) -> Slot {
109	let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS;
110	(tick / ticks_per_slot).into()
111}
112
113/// Converts a tranche from a slot to the tick number.
114pub fn tranche_to_tick(slot_duration_millis: u64, slot: Slot, tranche: u32) -> Tick {
115	slot_number_to_tick(slot_duration_millis, slot) + tranche as u64
116}
117
118/// A list of delayed futures that gets triggered when the waiting time has expired and it is
119/// time to sign the candidate.
120/// We have a timer per relay-chain block.
121#[derive(Default)]
122pub struct DelayedApprovalTimer {
123	timers: FuturesUnordered<BoxFuture<'static, (Hash, ValidatorIndex)>>,
124	blocks: HashSet<Hash>,
125}
126
127impl DelayedApprovalTimer {
128	/// Starts a single timer per block hash
129	///
130	/// Guarantees that if a timer already exits for the give block hash,
131	/// no additional timer is started.
132	pub fn maybe_arm_timer(
133		&mut self,
134		wait_until: Tick,
135		clock: &dyn Clock,
136		block_hash: Hash,
137		validator_index: ValidatorIndex,
138	) {
139		if self.blocks.insert(block_hash) {
140			let clock_wait = clock.wait(wait_until);
141			self.timers.push(Box::pin(async move {
142				clock_wait.await;
143				(block_hash, validator_index)
144			}));
145		}
146	}
147}
148
149impl Stream for DelayedApprovalTimer {
150	type Item = (Hash, ValidatorIndex);
151
152	fn poll_next(
153		mut self: std::pin::Pin<&mut Self>,
154		cx: &mut std::task::Context<'_>,
155	) -> std::task::Poll<Option<Self::Item>> {
156		let poll_result = self.timers.poll_next_unpin(cx);
157		match poll_result {
158			Poll::Ready(Some(result)) => {
159				self.blocks.remove(&result.0);
160				Poll::Ready(Some(result))
161			},
162			_ => poll_result,
163		}
164	}
165}
166
167impl FusedStream for DelayedApprovalTimer {
168	fn is_terminated(&self) -> bool {
169		self.timers.is_terminated()
170	}
171}
172
173#[cfg(test)]
174mod tests {
175	use std::time::Duration;
176
177	use futures::{executor::block_on, FutureExt, StreamExt};
178	use futures_timer::Delay;
179	use polkadot_primitives::{Hash, ValidatorIndex};
180
181	use crate::approval::time::{Clock, SystemClock};
182
183	use super::DelayedApprovalTimer;
184
185	#[test]
186	fn test_select_empty_timer() {
187		block_on(async move {
188			let mut timer = DelayedApprovalTimer::default();
189
190			for _ in 1..10 {
191				let result = futures::select!(
192					_ = timer.select_next_some() => {
193						0
194					}
195					// Only this arm should fire
196					_ = Delay::new(Duration::from_millis(100)).fuse() => {
197						1
198					}
199				);
200
201				assert_eq!(result, 1);
202			}
203		});
204	}
205
206	#[test]
207	fn test_timer_functionality() {
208		block_on(async move {
209			let mut timer = DelayedApprovalTimer::default();
210			let test_hashes =
211				vec![Hash::repeat_byte(0x01), Hash::repeat_byte(0x02), Hash::repeat_byte(0x03)];
212			for (index, hash) in test_hashes.iter().enumerate() {
213				timer.maybe_arm_timer(
214					SystemClock.tick_now() + index as u64,
215					&SystemClock,
216					*hash,
217					ValidatorIndex::from(2),
218				);
219				timer.maybe_arm_timer(
220					SystemClock.tick_now() + index as u64,
221					&SystemClock,
222					*hash,
223					ValidatorIndex::from(2),
224				);
225			}
226			let timeout_hash = Hash::repeat_byte(0x02);
227			for i in 0..test_hashes.len() * 2 {
228				let result = futures::select!(
229					(hash, _) = timer.select_next_some() => {
230						hash
231					}
232					// Timers should fire only once, so for the rest of the iterations we should timeout through here.
233					_ = Delay::new(Duration::from_secs(2)).fuse() => {
234						timeout_hash
235					}
236				);
237				assert_eq!(test_hashes.get(i).cloned().unwrap_or(timeout_hash), result);
238			}
239
240			// Now check timer can be restarted if already fired
241			for (index, hash) in test_hashes.iter().enumerate() {
242				timer.maybe_arm_timer(
243					SystemClock.tick_now() + index as u64,
244					&SystemClock,
245					*hash,
246					ValidatorIndex::from(2),
247				);
248				timer.maybe_arm_timer(
249					SystemClock.tick_now() + index as u64,
250					&SystemClock,
251					*hash,
252					ValidatorIndex::from(2),
253				);
254			}
255
256			for i in 0..test_hashes.len() * 2 {
257				let result = futures::select!(
258					(hash, _) = timer.select_next_some() => {
259						hash
260					}
261					// Timers should fire only once, so for the rest of the iterations we should timeout through here.
262					_ = Delay::new(Duration::from_secs(2)).fuse() => {
263						timeout_hash
264					}
265				);
266				assert_eq!(test_hashes.get(i).cloned().unwrap_or(timeout_hash), result);
267			}
268		});
269	}
270}