referrerpolicy=no-referrer-when-downgrade

polkadot_dispute_distribution/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! # Sending and receiving of `DisputeRequest`s.
18//!
19//! This subsystem essentially consists of two parts:
20//!
21//! - a sender
22//! - and a receiver
23//!
24//! The sender is responsible for getting our vote out, see `sender`. The receiver handles
25//! incoming [`DisputeRequest`](v1::DisputeRequest)s and offers spam protection, see `receiver`.
26
27use std::time::Duration;
28
29use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};
30
31use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
32use polkadot_node_subsystem_util::nesting_sender::NestingSender;
33use sp_keystore::KeystorePtr;
34
35use polkadot_node_network_protocol::request_response::{incoming::IncomingRequestReceiver, v1};
36use polkadot_node_primitives::DISPUTE_WINDOW;
37use polkadot_node_subsystem::{
38	messages::DisputeDistributionMessage, overseer, FromOrchestra, OverseerSignal,
39	SpawnedSubsystem, SubsystemError,
40};
41use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo};
42
43/// ## The sender [`DisputeSender`]
44///
45/// The sender (`DisputeSender`) keeps track of live disputes and makes sure our vote gets out for
46/// each one of those. The sender is responsible for sending our vote to each validator
47/// participating in the dispute and to each authority currently authoring blocks. The sending can
48/// be initiated by sending `DisputeDistributionMessage::SendDispute` message to this subsystem.
49///
50/// In addition the `DisputeSender` will query the coordinator for active disputes on each
51/// [`DisputeSender::update_leaves`] call and will initiate sending (start a `SendTask`) for every,
52/// to this subsystem, unknown dispute. This is to make sure, we get our vote out, even on
53/// restarts.
54///
55/// The actual work of sending and keeping track of transmission attempts to each validator for a
56/// particular dispute are done by [`SendTask`].  The purpose of the `DisputeSender` is to keep
57/// track of all ongoing disputes and start and clean up `SendTask`s accordingly.
58mod sender;
59use self::sender::{DisputeSender, DisputeSenderMessage};
60
61/// ## The receiver [`DisputesReceiver`]
62///
63/// The receiving side is implemented as `DisputesReceiver` and is run as a separate long running
64/// task within this subsystem ([`DisputesReceiver::run`]).
65///
66/// Conceptually all the receiver has to do, is waiting for incoming requests which are passed in
67/// via a dedicated channel and forwarding them to the dispute coordinator via
68/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
69/// nodes, the reality is not that simple of course. Before importing statements the receiver will
70/// batch up imports as well as possible for efficient imports while maintaining timely dispute
71/// resolution and handling of spamming validators:
72///
73/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
74/// service.
75/// - Drop messages from a node, if it sends at a too high rate.
76/// - Filter out duplicate messages (over some period of time).
77/// - Drop any obviously invalid votes (invalid signatures for example).
78/// - Ban peers whose votes were deemed invalid.
79///
80/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
81/// do, while at the same time making it aware of new disputes as fast as possible.
82///
83/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
84/// This way a received confirmation guarantees, that the vote has been stored to disk by the
85/// receiver.
86mod receiver;
87use self::receiver::DisputesReceiver;
88
89/// Error and [`Result`] type for this subsystem.
90mod error;
91use error::{log_error, Error, FatalError, FatalResult, Result};
92
93#[cfg(test)]
94mod tests;
95
96mod metrics;
97//// Prometheus `Metrics` for dispute distribution.
98pub use metrics::Metrics;
99
100const LOG_TARGET: &'static str = "parachain::dispute-distribution";
101
102/// Rate limit on the `receiver` side.
103///
104/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average,
105/// we start dropping messages from that peer to enforce that limit.
106pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100);
107
108/// Rate limit on the `sender` side.
109///
110/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as
111/// well.
112///
113/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`.
114pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50));
115
116/// The dispute distribution subsystem.
117pub struct DisputeDistributionSubsystem<AD> {
118	/// Easy and efficient runtime access for this subsystem.
119	runtime: RuntimeInfo,
120
121	/// Sender for our dispute requests.
122	disputes_sender: DisputeSender<DisputeSenderMessage>,
123
124	/// Receive messages from `DisputeSender` background tasks.
125	sender_rx: mpsc::Receiver<DisputeSenderMessage>,
126
127	/// Receiver for incoming requests.
128	req_receiver: Option<IncomingRequestReceiver<v1::DisputeRequest>>,
129
130	/// Authority discovery service.
131	authority_discovery: AD,
132
133	/// Metrics for this subsystem.
134	metrics: Metrics,
135}
136
137#[overseer::subsystem(DisputeDistribution, error = SubsystemError, prefix = self::overseer)]
138impl<Context, AD> DisputeDistributionSubsystem<AD>
139where
140	<Context as overseer::DisputeDistributionContextTrait>::Sender:
141		overseer::DisputeDistributionSenderTrait + Sync + Send,
142	AD: AuthorityDiscovery + Clone,
143{
144	fn start(self, ctx: Context) -> SpawnedSubsystem {
145		let future = self
146			.run(ctx)
147			.map_err(|e| SubsystemError::with_origin("dispute-distribution", e))
148			.boxed();
149
150		SpawnedSubsystem { name: "dispute-distribution-subsystem", future }
151	}
152}
153
154#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
155impl<AD> DisputeDistributionSubsystem<AD>
156where
157	AD: AuthorityDiscovery + Clone,
158{
159	/// Create a new instance of the dispute distribution.
160	pub fn new(
161		keystore: KeystorePtr,
162		req_receiver: IncomingRequestReceiver<v1::DisputeRequest>,
163		authority_discovery: AD,
164		metrics: Metrics,
165	) -> Self {
166		let runtime = RuntimeInfo::new_with_config(runtime::Config {
167			keystore: Some(keystore),
168			session_cache_lru_size: DISPUTE_WINDOW.get(),
169		});
170		let (tx, sender_rx) = NestingSender::new_root(1);
171		let disputes_sender = DisputeSender::new(tx, metrics.clone());
172		Self {
173			runtime,
174			disputes_sender,
175			sender_rx,
176			req_receiver: Some(req_receiver),
177			authority_discovery,
178			metrics,
179		}
180	}
181
182	/// Start processing work as passed on from the Overseer.
183	async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), FatalError> {
184		let receiver = DisputesReceiver::new(
185			ctx.sender().clone(),
186			self.req_receiver
187				.take()
188				.expect("Must be provided on `new` and we take ownership here. qed."),
189			self.authority_discovery.clone(),
190			self.metrics.clone(),
191		);
192		ctx.spawn("disputes-receiver", receiver.run().boxed())
193			.map_err(FatalError::SpawnTask)?;
194
195		// Process messages for sending side.
196		//
197		// Note: We want the sender to be rate limited and we are currently taking advantage of the
198		// fact that the root task of this subsystem is only concerned with sending: Functions of
199		// `DisputeSender` might back pressure if the rate limit is hit, which will slow down this
200		// loop. If this fact ever changes, we will likely need another task.
201		loop {
202			let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
203			match message {
204				MuxedMessage::Subsystem(result) => {
205					let result = match result? {
206						FromOrchestra::Signal(signal) => {
207							match self.handle_signals(&mut ctx, signal).await {
208								Ok(SignalResult::Conclude) => return Ok(()),
209								Ok(SignalResult::Continue) => Ok(()),
210								Err(f) => Err(f),
211							}
212						},
213						FromOrchestra::Communication { msg } =>
214							self.handle_subsystem_message(&mut ctx, msg).await,
215					};
216					log_error(result, "on FromOrchestra")?;
217				},
218				MuxedMessage::Sender(result) => {
219					let result = self
220						.disputes_sender
221						.on_message(
222							&mut ctx,
223							&mut self.runtime,
224							result.ok_or(FatalError::SenderExhausted)?,
225						)
226						.await
227						.map_err(Error::Sender);
228					log_error(result, "on_message")?;
229				},
230			}
231		}
232	}
233
234	/// Handle overseer signals.
235	async fn handle_signals<Context>(
236		&mut self,
237		ctx: &mut Context,
238		signal: OverseerSignal,
239	) -> Result<SignalResult> {
240		match signal {
241			OverseerSignal::Conclude => return Ok(SignalResult::Conclude),
242			OverseerSignal::ActiveLeaves(update) => {
243				self.disputes_sender.update_leaves(ctx, &mut self.runtime, update).await?;
244			},
245			OverseerSignal::BlockFinalized(_, _) => {},
246		};
247		Ok(SignalResult::Continue)
248	}
249
250	/// Handle `DisputeDistributionMessage`s.
251	async fn handle_subsystem_message<Context>(
252		&mut self,
253		ctx: &mut Context,
254		msg: DisputeDistributionMessage,
255	) -> Result<()> {
256		match msg {
257			DisputeDistributionMessage::SendDispute(dispute_msg) =>
258				self.disputes_sender.start_sender(ctx, &mut self.runtime, dispute_msg).await?,
259		}
260		Ok(())
261	}
262}
263
264/// Messages to be handled in this subsystem.
265#[derive(Debug)]
266enum MuxedMessage {
267	/// Messages from other subsystems.
268	Subsystem(FatalResult<FromOrchestra<DisputeDistributionMessage>>),
269	/// Messages from spawned sender background tasks.
270	Sender(Option<DisputeSenderMessage>),
271}
272
273#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
274impl MuxedMessage {
275	async fn receive<Context>(
276		ctx: &mut Context,
277		from_sender: &mut mpsc::Receiver<DisputeSenderMessage>,
278	) -> Self {
279		// We are only fusing here to make `select` happy, in reality we will quit if the stream
280		// ends.
281		let from_overseer = ctx.recv().fuse();
282		futures::pin_mut!(from_overseer, from_sender);
283		// We select biased to make sure we finish up loose ends, before starting new work.
284		futures::select_biased!(
285			msg = from_sender.next() => MuxedMessage::Sender(msg),
286			msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
287		)
288	}
289}
290
291/// Result of handling signal from overseer.
292enum SignalResult {
293	/// Overseer asked us to conclude.
294	Conclude,
295	/// We can continue processing events.
296	Continue,
297}