referrerpolicy=no-referrer-when-downgrade
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

use std::collections::{hash_map::Entry, HashMap, VecDeque};

use futures::future::pending;
use futures_timer::Delay;
use polkadot_node_network_protocol::request_response::{v1::DisputeRequest, IncomingRequest};
use polkadot_primitives::AuthorityDiscoveryId;

use crate::RECEIVE_RATE_LIMIT;

/// How many messages we are willing to queue per peer (validator).
///
/// The larger this value is, the larger bursts are allowed to be without us dropping messages. On
/// the flip side this gets allocated per validator, so for a size of 10 this will result
/// in `10_000 * size_of(IncomingRequest)` in the worst case.
///
/// `PEER_QUEUE_CAPACITY` must not be 0 for obvious reasons.
#[cfg(not(test))]
pub const PEER_QUEUE_CAPACITY: usize = 10;
#[cfg(test)]
pub const PEER_QUEUE_CAPACITY: usize = 2;

/// Queues for messages from authority peers for rate limiting.
///
/// Invariants ensured:
///
/// 1. No queue will ever have more than `PEER_QUEUE_CAPACITY` elements.
/// 2. There are no empty queues. Whenever a queue gets empty, it is removed. This way checking
///    whether there are any messages queued is cheap.
/// 3. As long as not empty, `pop_reqs` will, if called in sequence, not return `Ready` more often
///    than once for every `RECEIVE_RATE_LIMIT`, but it will always return Ready eventually.
/// 4. If empty `pop_reqs` will never return `Ready`, but will always be `Pending`.
pub struct PeerQueues {
	/// Actual queues.
	queues: HashMap<AuthorityDiscoveryId, VecDeque<IncomingRequest<DisputeRequest>>>,

	/// Delay timer for establishing the rate limit.
	rate_limit_timer: Option<Delay>,
}

impl PeerQueues {
	/// New empty `PeerQueues`.
	pub fn new() -> Self {
		Self { queues: HashMap::new(), rate_limit_timer: None }
	}

	/// Push an incoming request for a given authority.
	///
	/// Returns: `Ok(())` if succeeded, `Err((args))` if capacity is reached.
	pub fn push_req(
		&mut self,
		peer: AuthorityDiscoveryId,
		req: IncomingRequest<DisputeRequest>,
	) -> Result<(), (AuthorityDiscoveryId, IncomingRequest<DisputeRequest>)> {
		let queue = match self.queues.entry(peer) {
			Entry::Vacant(vacant) => vacant.insert(VecDeque::new()),
			Entry::Occupied(occupied) => {
				if occupied.get().len() >= PEER_QUEUE_CAPACITY {
					return Err((occupied.key().clone(), req))
				}
				occupied.into_mut()
			},
		};
		queue.push_back(req);

		// We have at least one element to process - rate limit `timer` needs to exist now:
		self.ensure_timer();
		Ok(())
	}

	/// Pop all heads and return them for processing.
	///
	/// This gets one message from each peer that has sent at least one.
	///
	/// This function is rate limited, if called in sequence it will not return more often than
	/// every `RECEIVE_RATE_LIMIT`.
	///
	/// NOTE: If empty this function will not return `Ready` at all, but will always be `Pending`.
	pub async fn pop_reqs(&mut self) -> Vec<IncomingRequest<DisputeRequest>> {
		self.wait_for_timer().await;

		let mut heads = Vec::with_capacity(self.queues.len());
		let old_queues = std::mem::replace(&mut self.queues, HashMap::new());
		for (k, mut queue) in old_queues.into_iter() {
			let front = queue.pop_front();
			debug_assert!(front.is_some(), "Invariant that queues are never empty is broken.");

			if let Some(front) = front {
				heads.push(front);
			}
			if !queue.is_empty() {
				self.queues.insert(k, queue);
			}
		}

		if !self.is_empty() {
			// Still not empty - we should get woken at some point.
			self.ensure_timer();
		}

		heads
	}

	/// Whether or not all queues are empty.
	pub fn is_empty(&self) -> bool {
		self.queues.is_empty()
	}

	/// Ensure there is an active `timer`.
	///
	/// Checks whether one exists and if not creates one.
	fn ensure_timer(&mut self) -> &mut Delay {
		self.rate_limit_timer.get_or_insert(Delay::new(RECEIVE_RATE_LIMIT))
	}

	/// Wait for `timer` if it exists, or be `Pending` forever.
	///
	/// Afterwards it gets set back to `None`.
	async fn wait_for_timer(&mut self) {
		match self.rate_limit_timer.as_mut() {
			None => pending().await,
			Some(timer) => timer.await,
		}
		self.rate_limit_timer = None;
	}
}