referrerpolicy=no-referrer-when-downgrade

polkadot_dispute_distribution/receiver/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::{
18	pin::Pin,
19	task::{Context, Poll},
20	time::Duration,
21};
22
23use futures::{
24	channel::oneshot,
25	future::poll_fn,
26	pin_mut,
27	stream::{FuturesUnordered, StreamExt},
28	Future,
29};
30
31use gum::CandidateHash;
32use polkadot_node_network_protocol::{
33	authority_discovery::AuthorityDiscovery,
34	request_response::{
35		incoming::{self, OutgoingResponse, OutgoingResponseSender},
36		v1::{DisputeRequest, DisputeResponse},
37		IncomingRequest, IncomingRequestReceiver,
38	},
39	PeerId, UnifiedReputationChange as Rep,
40};
41use polkadot_node_primitives::DISPUTE_WINDOW;
42use polkadot_node_subsystem::{
43	messages::{DisputeCoordinatorMessage, ImportStatementsResult},
44	overseer,
45};
46use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo};
47
48use crate::{
49	metrics::{FAILED, SUCCEEDED},
50	Metrics, LOG_TARGET,
51};
52
53mod error;
54
55/// Rate limiting queues for incoming requests by peers.
56mod peer_queues;
57
58/// Batch imports together.
59mod batches;
60
61use self::{
62	batches::{Batches, FoundBatch, PreparedImport},
63	error::{log_error, JfyiError, JfyiResult, Result},
64	peer_queues::PeerQueues,
65};
66
67const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
68const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid.");
69const COST_NOT_A_VALIDATOR: Rep = Rep::CostMajor("Reporting peer was not a validator.");
70
71/// Invalid imports can be caused by flooding, e.g. by a disabled validator.
72const COST_INVALID_IMPORT: Rep =
73	Rep::CostMinor("Import was deemed invalid by dispute-coordinator.");
74
75/// How many votes must have arrived in the last `BATCH_COLLECTING_INTERVAL`
76///
77/// in order for a batch to stay alive and not get flushed/imported to the dispute-coordinator.
78///
79/// This ensures a timely import of batches.
80#[cfg(not(test))]
81pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 10;
82#[cfg(test)]
83pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 2;
84
85/// Time we allow to pass for new votes to trickle in.
86///
87/// See `MIN_KEEP_BATCH_ALIVE_VOTES` above.
88/// Should be greater or equal to `RECEIVE_RATE_LIMIT` (there is no point in checking any faster).
89pub const BATCH_COLLECTING_INTERVAL: Duration = Duration::from_millis(500);
90
91/// State for handling incoming `DisputeRequest` messages.
92pub struct DisputesReceiver<Sender, AD> {
93	/// Access to session information.
94	runtime: RuntimeInfo,
95
96	/// Subsystem sender for communication with other subsystems.
97	sender: Sender,
98
99	/// Channel to retrieve incoming requests from.
100	receiver: IncomingRequestReceiver<DisputeRequest>,
101
102	/// Rate limiting queue for each peer (only authorities).
103	peer_queues: PeerQueues,
104
105	/// Currently active batches of imports per candidate.
106	batches: Batches,
107
108	/// Authority discovery service:
109	authority_discovery: AD,
110
111	/// Imports currently being processed by the `dispute-coordinator`.
112	pending_imports: FuturesUnordered<PendingImport>,
113
114	/// Log received requests.
115	metrics: Metrics,
116}
117
118/// Messages as handled by this receiver internally.
119enum MuxedMessage {
120	/// An import got confirmed by the coordinator.
121	///
122	/// We need to handle those for two reasons:
123	///
124	/// - We need to make sure responses are actually sent (therefore we need to await futures
125	/// promptly).
126	/// - We need to punish peers whose import got rejected.
127	ConfirmedImport(ImportResult),
128
129	/// A new request has arrived and should be handled.
130	NewRequest(IncomingRequest<DisputeRequest>),
131
132	/// Rate limit timer hit - is time to process one row of messages.
133	///
134	/// This is the result of calling `self.peer_queues.pop_reqs()`.
135	WakePeerQueuesPopReqs(Vec<IncomingRequest<DisputeRequest>>),
136
137	/// It is time to check batches.
138	///
139	/// Every `BATCH_COLLECTING_INTERVAL` we check whether less than `MIN_KEEP_BATCH_ALIVE_VOTES`
140	/// new votes arrived, if so the batch is ready for import.
141	///
142	/// This is the result of calling `self.batches.check_batches()`.
143	WakeCheckBatches(Vec<PreparedImport>),
144}
145
146impl<Sender, AD> DisputesReceiver<Sender, AD>
147where
148	AD: AuthorityDiscovery,
149	Sender: overseer::DisputeDistributionSenderTrait,
150{
151	/// Create a new receiver which can be `run`.
152	pub fn new(
153		sender: Sender,
154		receiver: IncomingRequestReceiver<DisputeRequest>,
155		authority_discovery: AD,
156		metrics: Metrics,
157	) -> Self {
158		let runtime = RuntimeInfo::new_with_config(runtime::Config {
159			keystore: None,
160			session_cache_lru_size: DISPUTE_WINDOW.get(),
161		});
162		Self {
163			runtime,
164			sender,
165			receiver,
166			peer_queues: PeerQueues::new(),
167			batches: Batches::new(),
168			authority_discovery,
169			pending_imports: FuturesUnordered::new(),
170			metrics,
171		}
172	}
173
174	/// Get that receiver started.
175	///
176	/// This is an endless loop and should be spawned into its own task.
177	pub async fn run(mut self) {
178		loop {
179			match log_error(self.run_inner().await) {
180				Ok(()) => {},
181				Err(fatal) => {
182					gum::debug!(
183						target: LOG_TARGET,
184						error = ?fatal,
185						"Shutting down"
186					);
187					return
188				},
189			}
190		}
191	}
192
193	/// Actual work happening here in three phases:
194	///
195	/// 1. Receive and queue incoming messages until the rate limit timer hits.
196	/// 2. Do import/batching for the head of all queues.
197	/// 3. Check and flush any ready batches.
198	async fn run_inner(&mut self) -> Result<()> {
199		let msg = self.receive_message().await?;
200
201		match msg {
202			MuxedMessage::NewRequest(req) => {
203				// Phase 1:
204				self.metrics.on_received_request();
205				self.dispatch_to_queues(req).await?;
206			},
207			MuxedMessage::WakePeerQueuesPopReqs(reqs) => {
208				// Phase 2:
209				for req in reqs {
210					// No early return - we cannot cancel imports of one peer, because the import of
211					// another failed:
212					match log_error(self.start_import_or_batch(req).await) {
213						Ok(()) => {},
214						Err(fatal) => return Err(fatal.into()),
215					}
216				}
217			},
218			MuxedMessage::WakeCheckBatches(ready_imports) => {
219				// Phase 3:
220				self.import_ready_batches(ready_imports).await;
221			},
222			MuxedMessage::ConfirmedImport(import_result) => {
223				self.update_imported_requests_metrics(&import_result);
224				// Confirm imports to requesters/punish them on invalid imports:
225				send_responses_to_requesters(import_result).await?;
226			},
227		}
228
229		Ok(())
230	}
231
232	/// Receive one `MuxedMessage`.
233	///
234	///
235	/// Dispatching events to messages as they happen.
236	async fn receive_message(&mut self) -> Result<MuxedMessage> {
237		poll_fn(|ctx| {
238			// In case of Ready(None), we want to wait for pending requests:
239			if let Poll::Ready(Some(v)) = self.pending_imports.poll_next_unpin(ctx) {
240				return Poll::Ready(Ok(MuxedMessage::ConfirmedImport(v?)))
241			}
242
243			let rate_limited = self.peer_queues.pop_reqs();
244			pin_mut!(rate_limited);
245			// We poll rate_limit before batches, so we don't unnecessarily delay importing to
246			// batches.
247			if let Poll::Ready(reqs) = rate_limited.poll(ctx) {
248				return Poll::Ready(Ok(MuxedMessage::WakePeerQueuesPopReqs(reqs)))
249			}
250
251			let ready_batches = self.batches.check_batches();
252			pin_mut!(ready_batches);
253			if let Poll::Ready(ready_batches) = ready_batches.poll(ctx) {
254				return Poll::Ready(Ok(MuxedMessage::WakeCheckBatches(ready_batches)))
255			}
256
257			let next_req = self.receiver.recv(|| vec![COST_INVALID_REQUEST]);
258			pin_mut!(next_req);
259			if let Poll::Ready(r) = next_req.poll(ctx) {
260				return match r {
261					Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())),
262					Ok(v) => Poll::Ready(Ok(MuxedMessage::NewRequest(v))),
263				}
264			}
265			Poll::Pending
266		})
267		.await
268	}
269
270	/// Process incoming requests.
271	///
272	/// - Check sender is authority
273	/// - Dispatch message to corresponding queue in `peer_queues`.
274	/// - If queue is full, drop message and change reputation of sender.
275	async fn dispatch_to_queues(&mut self, req: IncomingRequest<DisputeRequest>) -> JfyiResult<()> {
276		let peer = req.peer;
277		// Only accept messages from validators, in case there are multiple `AuthorityId`s, we
278		// just take the first one. On session boundaries this might allow validators to double
279		// their rate limit for a short period of time, which seems acceptable.
280		let authority_id = match self
281			.authority_discovery
282			.get_authority_ids_by_peer_id(peer)
283			.await
284			.and_then(|s| s.into_iter().next())
285		{
286			None => {
287				req.send_outgoing_response(OutgoingResponse {
288					result: Err(()),
289					reputation_changes: vec![COST_NOT_A_VALIDATOR],
290					sent_feedback: None,
291				})
292				.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
293				return Err(JfyiError::NotAValidator(peer).into())
294			},
295			Some(auth_id) => auth_id,
296		};
297
298		// Queue request:
299		if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) {
300			gum::debug!(
301				target: LOG_TARGET,
302				?authority_id,
303				?peer,
304				"Peer hit the rate limit - dropping message."
305			);
306			req.send_outgoing_response(OutgoingResponse {
307				result: Err(()),
308				reputation_changes: vec![],
309				sent_feedback: None,
310			})
311			.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
312			return Err(JfyiError::AuthorityFlooding(authority_id))
313		}
314		Ok(())
315	}
316
317	/// Start importing votes for the given request or batch.
318	///
319	/// Signature check and in case we already have an existing batch we import to that batch,
320	/// otherwise import to `dispute-coordinator` directly and open a batch.
321	async fn start_import_or_batch(
322		&mut self,
323		incoming: IncomingRequest<DisputeRequest>,
324	) -> Result<()> {
325		let IncomingRequest { peer, payload, pending_response } = incoming;
326
327		let info = self
328			.runtime
329			.get_session_info_by_index(
330				&mut self.sender,
331				payload.0.candidate_receipt.descriptor.relay_parent(),
332				payload.0.session_index,
333			)
334			.await?;
335
336		let votes_result = payload.0.try_into_signed_votes(&info.session_info);
337
338		let (candidate_receipt, valid_vote, invalid_vote) = match votes_result {
339			Err(()) => {
340				// Signature invalid:
341				pending_response
342					.send_outgoing_response(OutgoingResponse {
343						result: Err(()),
344						reputation_changes: vec![COST_INVALID_SIGNATURE],
345						sent_feedback: None,
346					})
347					.map_err(|_| JfyiError::SetPeerReputation(peer))?;
348
349				return Err(From::from(JfyiError::InvalidSignature(peer)))
350			},
351			Ok(votes) => votes,
352		};
353
354		let candidate_hash = *valid_vote.0.candidate_hash();
355
356		match self.batches.find_batch(candidate_hash, candidate_receipt)? {
357			FoundBatch::Created(batch) => {
358				// There was no entry yet - start import immediately:
359				gum::trace!(
360					target: LOG_TARGET,
361					?candidate_hash,
362					?peer,
363					"No batch yet - triggering immediate import"
364				);
365				let import = PreparedImport {
366					candidate_receipt: batch.candidate_receipt().clone(),
367					statements: vec![valid_vote, invalid_vote],
368					requesters: vec![(peer, pending_response)],
369				};
370				self.start_import(import).await;
371			},
372			FoundBatch::Found(batch) => {
373				gum::trace!(target: LOG_TARGET, ?candidate_hash, "Batch exists - batching request");
374				let batch_result =
375					batch.add_votes(valid_vote, invalid_vote, peer, pending_response);
376
377				if let Err(pending_response) = batch_result {
378					// We don't expect honest peers to send redundant votes within a single batch,
379					// as the timeout for retry is much higher. Still we don't want to punish the
380					// node as it might not be the node's fault. Some other (malicious) node could
381					// have been faster sending the same votes in order to harm the reputation of
382					// that honest node. Given that we already have a rate limit, if a validator
383					// chooses to waste available rate with redundant votes - so be it. The actual
384					// dispute resolution is unaffected.
385					gum::debug!(
386						target: LOG_TARGET,
387						?peer,
388						"Peer sent completely redundant votes within a single batch - that looks fishy!",
389					);
390					pending_response
391						.send_outgoing_response(OutgoingResponse {
392							// While we have seen duplicate votes, we cannot confirm as we don't
393							// know yet whether the batch is going to be confirmed, so we assume
394							// the worst. We don't want to push the pending response to the batch
395							// either as that would be unbounded, only limited by the rate limit.
396							result: Err(()),
397							reputation_changes: Vec::new(),
398							sent_feedback: None,
399						})
400						.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
401					return Err(From::from(JfyiError::RedundantMessage(peer)))
402				}
403			},
404		}
405
406		Ok(())
407	}
408
409	/// Trigger import into the dispute-coordinator of ready batches (`PreparedImport`s).
410	async fn import_ready_batches(&mut self, ready_imports: Vec<PreparedImport>) {
411		for import in ready_imports {
412			self.start_import(import).await;
413		}
414	}
415
416	/// Start import and add response receiver to `pending_imports`.
417	async fn start_import(&mut self, import: PreparedImport) {
418		let PreparedImport { candidate_receipt, statements, requesters } = import;
419		let (session_index, candidate_hash) = match statements.iter().next() {
420			None => {
421				gum::debug!(
422					target: LOG_TARGET,
423					candidate_hash = ?candidate_receipt.hash(),
424					"Not importing empty batch"
425				);
426				return
427			},
428			Some(vote) => (vote.0.session_index(), *vote.0.candidate_hash()),
429		};
430
431		let (pending_confirmation, confirmation_rx) = oneshot::channel();
432		self.sender
433			.send_message(DisputeCoordinatorMessage::ImportStatements {
434				candidate_receipt,
435				session: session_index,
436				statements,
437				pending_confirmation: Some(pending_confirmation),
438			})
439			.await;
440
441		let pending =
442			PendingImport { candidate_hash, requesters, pending_response: confirmation_rx };
443
444		self.pending_imports.push(pending);
445	}
446
447	fn update_imported_requests_metrics(&self, result: &ImportResult) {
448		let label = match result.result {
449			ImportStatementsResult::ValidImport => SUCCEEDED,
450			ImportStatementsResult::InvalidImport => FAILED,
451		};
452		self.metrics.on_imported(label, result.requesters.len());
453	}
454}
455
456async fn send_responses_to_requesters(import_result: ImportResult) -> JfyiResult<()> {
457	let ImportResult { requesters, result } = import_result;
458
459	let mk_response = match result {
460		ImportStatementsResult::ValidImport => || OutgoingResponse {
461			result: Ok(DisputeResponse::Confirmed),
462			reputation_changes: Vec::new(),
463			sent_feedback: None,
464		},
465		ImportStatementsResult::InvalidImport => || OutgoingResponse {
466			result: Err(()),
467			reputation_changes: vec![COST_INVALID_IMPORT],
468			sent_feedback: None,
469		},
470	};
471
472	let mut sending_failed_for = Vec::new();
473	for (peer, pending_response) in requesters {
474		if let Err(()) = pending_response.send_outgoing_response(mk_response()) {
475			sending_failed_for.push(peer);
476		}
477	}
478
479	if !sending_failed_for.is_empty() {
480		Err(JfyiError::SendResponses(sending_failed_for))
481	} else {
482		Ok(())
483	}
484}
485
486/// A future that resolves into an `ImportResult` when ready.
487///
488/// This future is used on `dispute-coordinator` import messages for the oneshot response receiver
489/// to:
490/// - Keep track of concerned `CandidateHash` for reporting errors.
491/// - Keep track of requesting peers so we can confirm the import/punish them on invalid imports.
492struct PendingImport {
493	candidate_hash: CandidateHash,
494	requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
495	pending_response: oneshot::Receiver<ImportStatementsResult>,
496}
497
498/// A `PendingImport` becomes an `ImportResult` once done.
499struct ImportResult {
500	/// Requesters of that import.
501	requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
502	/// Actual result of the import.
503	result: ImportStatementsResult,
504}
505
506impl PendingImport {
507	async fn wait_for_result(&mut self) -> JfyiResult<ImportResult> {
508		let result = (&mut self.pending_response)
509			.await
510			.map_err(|_| JfyiError::ImportCanceled(self.candidate_hash))?;
511		Ok(ImportResult { requesters: std::mem::take(&mut self.requesters), result })
512	}
513}
514
515impl Future for PendingImport {
516	type Output = JfyiResult<ImportResult>;
517	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
518		let fut = self.wait_for_result();
519		pin_mut!(fut);
520		fut.poll(cx)
521	}
522}