polkadot_dispute_distribution/receiver/peer_queues.rs
1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
16
17use std::collections::{hash_map::Entry, HashMap, VecDeque};
18
19use futures::future::pending;
20use futures_timer::Delay;
21use polkadot_node_network_protocol::request_response::{v1::DisputeRequest, IncomingRequest};
22use polkadot_primitives::AuthorityDiscoveryId;
23
24use crate::RECEIVE_RATE_LIMIT;
25
26/// How many messages we are willing to queue per peer (validator).
27///
28/// The larger this value is, the larger bursts are allowed to be without us dropping messages. On
29/// the flip side this gets allocated per validator, so for a size of 10 this will result
30/// in `10_000 * size_of(IncomingRequest)` in the worst case.
31///
32/// `PEER_QUEUE_CAPACITY` must not be 0 for obvious reasons.
33#[cfg(not(test))]
34pub const PEER_QUEUE_CAPACITY: usize = 10;
35#[cfg(test)]
36pub const PEER_QUEUE_CAPACITY: usize = 2;
37
38/// Queues for messages from authority peers for rate limiting.
39///
40/// Invariants ensured:
41///
42/// 1. No queue will ever have more than `PEER_QUEUE_CAPACITY` elements.
43/// 2. There are no empty queues. Whenever a queue gets empty, it is removed. This way checking
44/// whether there are any messages queued is cheap.
45/// 3. As long as not empty, `pop_reqs` will, if called in sequence, not return `Ready` more often
46/// than once for every `RECEIVE_RATE_LIMIT`, but it will always return Ready eventually.
47/// 4. If empty `pop_reqs` will never return `Ready`, but will always be `Pending`.
48pub struct PeerQueues {
49 /// Actual queues.
50 queues: HashMap<AuthorityDiscoveryId, VecDeque<IncomingRequest<DisputeRequest>>>,
51
52 /// Delay timer for establishing the rate limit.
53 rate_limit_timer: Option<Delay>,
54}
55
56impl PeerQueues {
57 /// New empty `PeerQueues`.
58 pub fn new() -> Self {
59 Self { queues: HashMap::new(), rate_limit_timer: None }
60 }
61
62 /// Push an incoming request for a given authority.
63 ///
64 /// Returns: `Ok(())` if succeeded, `Err((args))` if capacity is reached.
65 pub fn push_req(
66 &mut self,
67 peer: AuthorityDiscoveryId,
68 req: IncomingRequest<DisputeRequest>,
69 ) -> Result<(), (AuthorityDiscoveryId, IncomingRequest<DisputeRequest>)> {
70 let queue = match self.queues.entry(peer) {
71 Entry::Vacant(vacant) => vacant.insert(VecDeque::new()),
72 Entry::Occupied(occupied) => {
73 if occupied.get().len() >= PEER_QUEUE_CAPACITY {
74 return Err((occupied.key().clone(), req))
75 }
76 occupied.into_mut()
77 },
78 };
79 queue.push_back(req);
80
81 // We have at least one element to process - rate limit `timer` needs to exist now:
82 self.ensure_timer();
83 Ok(())
84 }
85
86 /// Pop all heads and return them for processing.
87 ///
88 /// This gets one message from each peer that has sent at least one.
89 ///
90 /// This function is rate limited, if called in sequence it will not return more often than
91 /// every `RECEIVE_RATE_LIMIT`.
92 ///
93 /// NOTE: If empty this function will not return `Ready` at all, but will always be `Pending`.
94 pub async fn pop_reqs(&mut self) -> Vec<IncomingRequest<DisputeRequest>> {
95 self.wait_for_timer().await;
96
97 let mut heads = Vec::with_capacity(self.queues.len());
98 let old_queues = std::mem::replace(&mut self.queues, HashMap::new());
99 for (k, mut queue) in old_queues.into_iter() {
100 let front = queue.pop_front();
101 debug_assert!(front.is_some(), "Invariant that queues are never empty is broken.");
102
103 if let Some(front) = front {
104 heads.push(front);
105 }
106 if !queue.is_empty() {
107 self.queues.insert(k, queue);
108 }
109 }
110
111 if !self.is_empty() {
112 // Still not empty - we should get woken at some point.
113 self.ensure_timer();
114 }
115
116 heads
117 }
118
119 /// Whether or not all queues are empty.
120 pub fn is_empty(&self) -> bool {
121 self.queues.is_empty()
122 }
123
124 /// Ensure there is an active `timer`.
125 ///
126 /// Checks whether one exists and if not creates one.
127 fn ensure_timer(&mut self) -> &mut Delay {
128 self.rate_limit_timer.get_or_insert(Delay::new(RECEIVE_RATE_LIMIT))
129 }
130
131 /// Wait for `timer` if it exists, or be `Pending` forever.
132 ///
133 /// Afterwards it gets set back to `None`.
134 async fn wait_for_timer(&mut self) {
135 match self.rate_limit_timer.as_mut() {
136 None => pending().await,
137 Some(timer) => timer.await,
138 }
139 self.rate_limit_timer = None;
140 }
141}