polkadot_dispute_distribution/lib.rs
1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
16
17//! # Sending and receiving of `DisputeRequest`s.
18//!
19//! This subsystem essentially consists of two parts:
20//!
21//! - a sender
22//! - and a receiver
23//!
24//! The sender is responsible for getting our vote out, see `sender`. The receiver handles
25//! incoming [`DisputeRequest`](v1::DisputeRequest)s and offers spam protection, see `receiver`.
26
27use std::time::Duration;
28
29use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};
30
31use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
32use polkadot_node_subsystem_util::nesting_sender::NestingSender;
33use sp_keystore::KeystorePtr;
34
35use polkadot_node_network_protocol::request_response::{incoming::IncomingRequestReceiver, v1};
36use polkadot_node_primitives::DISPUTE_WINDOW;
37use polkadot_node_subsystem::{
38 messages::DisputeDistributionMessage, overseer, FromOrchestra, OverseerSignal,
39 SpawnedSubsystem, SubsystemError,
40};
41use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo};
42
43/// ## The sender [`DisputeSender`]
44///
45/// The sender (`DisputeSender`) keeps track of live disputes and makes sure our vote gets out for
46/// each one of those. The sender is responsible for sending our vote to each validator
47/// participating in the dispute and to each authority currently authoring blocks. The sending can
48/// be initiated by sending `DisputeDistributionMessage::SendDispute` message to this subsystem.
49///
50/// In addition the `DisputeSender` will query the coordinator for active disputes on each
51/// [`DisputeSender::update_leaves`] call and will initiate sending (start a `SendTask`) for every,
52/// to this subsystem, unknown dispute. This is to make sure, we get our vote out, even on
53/// restarts.
54///
55/// The actual work of sending and keeping track of transmission attempts to each validator for a
56/// particular dispute are done by [`SendTask`]. The purpose of the `DisputeSender` is to keep
57/// track of all ongoing disputes and start and clean up `SendTask`s accordingly.
58mod sender;
59use self::sender::{DisputeSender, DisputeSenderMessage};
60
61/// ## The receiver [`DisputesReceiver`]
62///
63/// The receiving side is implemented as `DisputesReceiver` and is run as a separate long running
64/// task within this subsystem ([`DisputesReceiver::run`]).
65///
66/// Conceptually all the receiver has to do, is waiting for incoming requests which are passed in
67/// via a dedicated channel and forwarding them to the dispute coordinator via
68/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
69/// nodes, the reality is not that simple of course. Before importing statements the receiver will
70/// batch up imports as well as possible for efficient imports while maintaining timely dispute
71/// resolution and handling of spamming validators:
72///
73/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
74/// service.
75/// - Drop messages from a node, if it sends at a too high rate.
76/// - Filter out duplicate messages (over some period of time).
77/// - Drop any obviously invalid votes (invalid signatures for example).
78/// - Ban peers whose votes were deemed invalid.
79///
80/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
81/// do, while at the same time making it aware of new disputes as fast as possible.
82///
83/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
84/// This way a received confirmation guarantees, that the vote has been stored to disk by the
85/// receiver.
86mod receiver;
87use self::receiver::DisputesReceiver;
88
89/// Error and [`Result`] type for this subsystem.
90mod error;
91use error::{log_error, Error, FatalError, FatalResult, Result};
92
93#[cfg(test)]
94mod tests;
95
96mod metrics;
97//// Prometheus `Metrics` for dispute distribution.
98pub use metrics::Metrics;
99
100const LOG_TARGET: &'static str = "parachain::dispute-distribution";
101
102/// Rate limit on the `receiver` side.
103///
104/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average,
105/// we start dropping messages from that peer to enforce that limit.
106pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100);
107
108/// Rate limit on the `sender` side.
109///
110/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as
111/// well.
112///
113/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`.
114pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50));
115
116/// The dispute distribution subsystem.
117pub struct DisputeDistributionSubsystem<AD> {
118 /// Easy and efficient runtime access for this subsystem.
119 runtime: RuntimeInfo,
120
121 /// Sender for our dispute requests.
122 disputes_sender: DisputeSender<DisputeSenderMessage>,
123
124 /// Receive messages from `DisputeSender` background tasks.
125 sender_rx: mpsc::Receiver<DisputeSenderMessage>,
126
127 /// Receiver for incoming requests.
128 req_receiver: Option<IncomingRequestReceiver<v1::DisputeRequest>>,
129
130 /// Authority discovery service.
131 authority_discovery: AD,
132
133 /// Metrics for this subsystem.
134 metrics: Metrics,
135}
136
137#[overseer::subsystem(DisputeDistribution, error = SubsystemError, prefix = self::overseer)]
138impl<Context, AD> DisputeDistributionSubsystem<AD>
139where
140 <Context as overseer::DisputeDistributionContextTrait>::Sender:
141 overseer::DisputeDistributionSenderTrait + Sync + Send,
142 AD: AuthorityDiscovery + Clone,
143{
144 fn start(self, ctx: Context) -> SpawnedSubsystem {
145 let future = self
146 .run(ctx)
147 .map_err(|e| SubsystemError::with_origin("dispute-distribution", e))
148 .boxed();
149
150 SpawnedSubsystem { name: "dispute-distribution-subsystem", future }
151 }
152}
153
154#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
155impl<AD> DisputeDistributionSubsystem<AD>
156where
157 AD: AuthorityDiscovery + Clone,
158{
159 /// Create a new instance of the dispute distribution.
160 pub fn new(
161 keystore: KeystorePtr,
162 req_receiver: IncomingRequestReceiver<v1::DisputeRequest>,
163 authority_discovery: AD,
164 metrics: Metrics,
165 ) -> Self {
166 let runtime = RuntimeInfo::new_with_config(runtime::Config {
167 keystore: Some(keystore),
168 session_cache_lru_size: DISPUTE_WINDOW.get(),
169 });
170 let (tx, sender_rx) = NestingSender::new_root(1);
171 let disputes_sender = DisputeSender::new(tx, metrics.clone());
172 Self {
173 runtime,
174 disputes_sender,
175 sender_rx,
176 req_receiver: Some(req_receiver),
177 authority_discovery,
178 metrics,
179 }
180 }
181
182 /// Start processing work as passed on from the Overseer.
183 async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), FatalError> {
184 let receiver = DisputesReceiver::new(
185 ctx.sender().clone(),
186 self.req_receiver
187 .take()
188 .expect("Must be provided on `new` and we take ownership here. qed."),
189 self.authority_discovery.clone(),
190 self.metrics.clone(),
191 );
192 ctx.spawn("disputes-receiver", receiver.run().boxed())
193 .map_err(FatalError::SpawnTask)?;
194
195 // Process messages for sending side.
196 //
197 // Note: We want the sender to be rate limited and we are currently taking advantage of the
198 // fact that the root task of this subsystem is only concerned with sending: Functions of
199 // `DisputeSender` might back pressure if the rate limit is hit, which will slow down this
200 // loop. If this fact ever changes, we will likely need another task.
201 loop {
202 let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
203 match message {
204 MuxedMessage::Subsystem(result) => {
205 let result = match result? {
206 FromOrchestra::Signal(signal) => {
207 match self.handle_signals(&mut ctx, signal).await {
208 Ok(SignalResult::Conclude) => return Ok(()),
209 Ok(SignalResult::Continue) => Ok(()),
210 Err(f) => Err(f),
211 }
212 },
213 FromOrchestra::Communication { msg } =>
214 self.handle_subsystem_message(&mut ctx, msg).await,
215 };
216 log_error(result, "on FromOrchestra")?;
217 },
218 MuxedMessage::Sender(result) => {
219 let result = self
220 .disputes_sender
221 .on_message(
222 &mut ctx,
223 &mut self.runtime,
224 result.ok_or(FatalError::SenderExhausted)?,
225 )
226 .await
227 .map_err(Error::Sender);
228 log_error(result, "on_message")?;
229 },
230 }
231 }
232 }
233
234 /// Handle overseer signals.
235 async fn handle_signals<Context>(
236 &mut self,
237 ctx: &mut Context,
238 signal: OverseerSignal,
239 ) -> Result<SignalResult> {
240 match signal {
241 OverseerSignal::Conclude => return Ok(SignalResult::Conclude),
242 OverseerSignal::ActiveLeaves(update) => {
243 self.disputes_sender.update_leaves(ctx, &mut self.runtime, update).await?;
244 },
245 OverseerSignal::BlockFinalized(_, _) => {},
246 };
247 Ok(SignalResult::Continue)
248 }
249
250 /// Handle `DisputeDistributionMessage`s.
251 async fn handle_subsystem_message<Context>(
252 &mut self,
253 ctx: &mut Context,
254 msg: DisputeDistributionMessage,
255 ) -> Result<()> {
256 match msg {
257 DisputeDistributionMessage::SendDispute(dispute_msg) =>
258 self.disputes_sender.start_sender(ctx, &mut self.runtime, dispute_msg).await?,
259 }
260 Ok(())
261 }
262}
263
264/// Messages to be handled in this subsystem.
265#[derive(Debug)]
266enum MuxedMessage {
267 /// Messages from other subsystems.
268 Subsystem(FatalResult<FromOrchestra<DisputeDistributionMessage>>),
269 /// Messages from spawned sender background tasks.
270 Sender(Option<DisputeSenderMessage>),
271}
272
273#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
274impl MuxedMessage {
275 async fn receive<Context>(
276 ctx: &mut Context,
277 from_sender: &mut mpsc::Receiver<DisputeSenderMessage>,
278 ) -> Self {
279 // We are only fusing here to make `select` happy, in reality we will quit if the stream
280 // ends.
281 let from_overseer = ctx.recv().fuse();
282 futures::pin_mut!(from_overseer, from_sender);
283 // We select biased to make sure we finish up loose ends, before starting new work.
284 futures::select_biased!(
285 msg = from_sender.next() => MuxedMessage::Sender(msg),
286 msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
287 )
288 }
289}
290
291/// Result of handling signal from overseer.
292enum SignalResult {
293 /// Overseer asked us to conclude.
294 Conclude,
295 /// We can continue processing events.
296 Continue,
297}