referrerpolicy=no-referrer-when-downgrade

polkadot_dispute_distribution/receiver/
peer_queues.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::collections::{hash_map::Entry, HashMap, VecDeque};
18
19use futures::future::pending;
20use futures_timer::Delay;
21use polkadot_node_network_protocol::request_response::{v1::DisputeRequest, IncomingRequest};
22use polkadot_primitives::AuthorityDiscoveryId;
23
24use crate::RECEIVE_RATE_LIMIT;
25
26/// How many messages we are willing to queue per peer (validator).
27///
28/// The larger this value is, the larger bursts are allowed to be without us dropping messages. On
29/// the flip side this gets allocated per validator, so for a size of 10 this will result
30/// in `10_000 * size_of(IncomingRequest)` in the worst case.
31///
32/// `PEER_QUEUE_CAPACITY` must not be 0 for obvious reasons.
33#[cfg(not(test))]
34pub const PEER_QUEUE_CAPACITY: usize = 10;
35#[cfg(test)]
36pub const PEER_QUEUE_CAPACITY: usize = 2;
37
38/// Queues for messages from authority peers for rate limiting.
39///
40/// Invariants ensured:
41///
42/// 1. No queue will ever have more than `PEER_QUEUE_CAPACITY` elements.
43/// 2. There are no empty queues. Whenever a queue gets empty, it is removed. This way checking
44///    whether there are any messages queued is cheap.
45/// 3. As long as not empty, `pop_reqs` will, if called in sequence, not return `Ready` more often
46///    than once for every `RECEIVE_RATE_LIMIT`, but it will always return Ready eventually.
47/// 4. If empty `pop_reqs` will never return `Ready`, but will always be `Pending`.
48pub struct PeerQueues {
49	/// Actual queues.
50	queues: HashMap<AuthorityDiscoveryId, VecDeque<IncomingRequest<DisputeRequest>>>,
51
52	/// Delay timer for establishing the rate limit.
53	rate_limit_timer: Option<Delay>,
54}
55
56impl PeerQueues {
57	/// New empty `PeerQueues`.
58	pub fn new() -> Self {
59		Self { queues: HashMap::new(), rate_limit_timer: None }
60	}
61
62	/// Push an incoming request for a given authority.
63	///
64	/// Returns: `Ok(())` if succeeded, `Err((args))` if capacity is reached.
65	pub fn push_req(
66		&mut self,
67		peer: AuthorityDiscoveryId,
68		req: IncomingRequest<DisputeRequest>,
69	) -> Result<(), (AuthorityDiscoveryId, IncomingRequest<DisputeRequest>)> {
70		let queue = match self.queues.entry(peer) {
71			Entry::Vacant(vacant) => vacant.insert(VecDeque::new()),
72			Entry::Occupied(occupied) => {
73				if occupied.get().len() >= PEER_QUEUE_CAPACITY {
74					return Err((occupied.key().clone(), req))
75				}
76				occupied.into_mut()
77			},
78		};
79		queue.push_back(req);
80
81		// We have at least one element to process - rate limit `timer` needs to exist now:
82		self.ensure_timer();
83		Ok(())
84	}
85
86	/// Pop all heads and return them for processing.
87	///
88	/// This gets one message from each peer that has sent at least one.
89	///
90	/// This function is rate limited, if called in sequence it will not return more often than
91	/// every `RECEIVE_RATE_LIMIT`.
92	///
93	/// NOTE: If empty this function will not return `Ready` at all, but will always be `Pending`.
94	pub async fn pop_reqs(&mut self) -> Vec<IncomingRequest<DisputeRequest>> {
95		self.wait_for_timer().await;
96
97		let mut heads = Vec::with_capacity(self.queues.len());
98		let old_queues = std::mem::replace(&mut self.queues, HashMap::new());
99		for (k, mut queue) in old_queues.into_iter() {
100			let front = queue.pop_front();
101			debug_assert!(front.is_some(), "Invariant that queues are never empty is broken.");
102
103			if let Some(front) = front {
104				heads.push(front);
105			}
106			if !queue.is_empty() {
107				self.queues.insert(k, queue);
108			}
109		}
110
111		if !self.is_empty() {
112			// Still not empty - we should get woken at some point.
113			self.ensure_timer();
114		}
115
116		heads
117	}
118
119	/// Whether or not all queues are empty.
120	pub fn is_empty(&self) -> bool {
121		self.queues.is_empty()
122	}
123
124	/// Ensure there is an active `timer`.
125	///
126	/// Checks whether one exists and if not creates one.
127	fn ensure_timer(&mut self) -> &mut Delay {
128		self.rate_limit_timer.get_or_insert(Delay::new(RECEIVE_RATE_LIMIT))
129	}
130
131	/// Wait for `timer` if it exists, or be `Pending` forever.
132	///
133	/// Afterwards it gets set back to `None`.
134	async fn wait_for_timer(&mut self) {
135		match self.rate_limit_timer.as_mut() {
136			None => pending().await,
137			Some(timer) => timer.await,
138		}
139		self.rate_limit_timer = None;
140	}
141}