1use std::{
18 pin::Pin,
19 task::{Context, Poll},
20 time::Duration,
21};
22
23use futures::{
24 channel::oneshot,
25 future::poll_fn,
26 pin_mut,
27 stream::{FuturesUnordered, StreamExt},
28 Future,
29};
30
31use gum::CandidateHash;
32use polkadot_node_network_protocol::{
33 authority_discovery::AuthorityDiscovery,
34 request_response::{
35 incoming::{self, OutgoingResponse, OutgoingResponseSender},
36 v1::{DisputeRequest, DisputeResponse},
37 IncomingRequest, IncomingRequestReceiver,
38 },
39 PeerId, UnifiedReputationChange as Rep,
40};
41use polkadot_node_primitives::DISPUTE_WINDOW;
42use polkadot_node_subsystem::{
43 messages::{DisputeCoordinatorMessage, ImportStatementsResult},
44 overseer,
45};
46use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo};
47
48use crate::{
49 metrics::{FAILED, SUCCEEDED},
50 Metrics, LOG_TARGET,
51};
52
53mod error;
54
55mod peer_queues;
57
58mod batches;
60
61use self::{
62 batches::{Batches, FoundBatch, PreparedImport},
63 error::{log_error, JfyiError, JfyiResult, Result},
64 peer_queues::PeerQueues,
65};
66
67const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
68const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid.");
69const COST_NOT_A_VALIDATOR: Rep = Rep::CostMajor("Reporting peer was not a validator.");
70
71const COST_INVALID_IMPORT: Rep =
73 Rep::CostMinor("Import was deemed invalid by dispute-coordinator.");
74
75#[cfg(not(test))]
81pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 10;
82#[cfg(test)]
83pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 2;
84
85pub const BATCH_COLLECTING_INTERVAL: Duration = Duration::from_millis(500);
90
91pub struct DisputesReceiver<Sender, AD> {
93 runtime: RuntimeInfo,
95
96 sender: Sender,
98
99 receiver: IncomingRequestReceiver<DisputeRequest>,
101
102 peer_queues: PeerQueues,
104
105 batches: Batches,
107
108 authority_discovery: AD,
110
111 pending_imports: FuturesUnordered<PendingImport>,
113
114 metrics: Metrics,
116}
117
118enum MuxedMessage {
120 ConfirmedImport(ImportResult),
128
129 NewRequest(IncomingRequest<DisputeRequest>),
131
132 WakePeerQueuesPopReqs(Vec<IncomingRequest<DisputeRequest>>),
136
137 WakeCheckBatches(Vec<PreparedImport>),
144}
145
146impl<Sender, AD> DisputesReceiver<Sender, AD>
147where
148 AD: AuthorityDiscovery,
149 Sender: overseer::DisputeDistributionSenderTrait,
150{
151 pub fn new(
153 sender: Sender,
154 receiver: IncomingRequestReceiver<DisputeRequest>,
155 authority_discovery: AD,
156 metrics: Metrics,
157 ) -> Self {
158 let runtime = RuntimeInfo::new_with_config(runtime::Config {
159 keystore: None,
160 session_cache_lru_size: DISPUTE_WINDOW.get(),
161 });
162 Self {
163 runtime,
164 sender,
165 receiver,
166 peer_queues: PeerQueues::new(),
167 batches: Batches::new(),
168 authority_discovery,
169 pending_imports: FuturesUnordered::new(),
170 metrics,
171 }
172 }
173
174 pub async fn run(mut self) {
178 loop {
179 match log_error(self.run_inner().await) {
180 Ok(()) => {},
181 Err(fatal) => {
182 gum::debug!(
183 target: LOG_TARGET,
184 error = ?fatal,
185 "Shutting down"
186 );
187 return
188 },
189 }
190 }
191 }
192
193 async fn run_inner(&mut self) -> Result<()> {
199 let msg = self.receive_message().await?;
200
201 match msg {
202 MuxedMessage::NewRequest(req) => {
203 self.metrics.on_received_request();
205 self.dispatch_to_queues(req).await?;
206 },
207 MuxedMessage::WakePeerQueuesPopReqs(reqs) => {
208 for req in reqs {
210 match log_error(self.start_import_or_batch(req).await) {
213 Ok(()) => {},
214 Err(fatal) => return Err(fatal.into()),
215 }
216 }
217 },
218 MuxedMessage::WakeCheckBatches(ready_imports) => {
219 self.import_ready_batches(ready_imports).await;
221 },
222 MuxedMessage::ConfirmedImport(import_result) => {
223 self.update_imported_requests_metrics(&import_result);
224 send_responses_to_requesters(import_result).await?;
226 },
227 }
228
229 Ok(())
230 }
231
232 async fn receive_message(&mut self) -> Result<MuxedMessage> {
237 poll_fn(|ctx| {
238 if let Poll::Ready(Some(v)) = self.pending_imports.poll_next_unpin(ctx) {
240 return Poll::Ready(Ok(MuxedMessage::ConfirmedImport(v?)))
241 }
242
243 let rate_limited = self.peer_queues.pop_reqs();
244 pin_mut!(rate_limited);
245 if let Poll::Ready(reqs) = rate_limited.poll(ctx) {
248 return Poll::Ready(Ok(MuxedMessage::WakePeerQueuesPopReqs(reqs)))
249 }
250
251 let ready_batches = self.batches.check_batches();
252 pin_mut!(ready_batches);
253 if let Poll::Ready(ready_batches) = ready_batches.poll(ctx) {
254 return Poll::Ready(Ok(MuxedMessage::WakeCheckBatches(ready_batches)))
255 }
256
257 let next_req = self.receiver.recv(|| vec![COST_INVALID_REQUEST]);
258 pin_mut!(next_req);
259 if let Poll::Ready(r) = next_req.poll(ctx) {
260 return match r {
261 Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())),
262 Ok(v) => Poll::Ready(Ok(MuxedMessage::NewRequest(v))),
263 }
264 }
265 Poll::Pending
266 })
267 .await
268 }
269
270 async fn dispatch_to_queues(&mut self, req: IncomingRequest<DisputeRequest>) -> JfyiResult<()> {
276 let peer = req.peer;
277 let authority_id = match self
281 .authority_discovery
282 .get_authority_ids_by_peer_id(peer)
283 .await
284 .and_then(|s| s.into_iter().next())
285 {
286 None => {
287 req.send_outgoing_response(OutgoingResponse {
288 result: Err(()),
289 reputation_changes: vec![COST_NOT_A_VALIDATOR],
290 sent_feedback: None,
291 })
292 .map_err(|_| JfyiError::SendResponses(vec![peer]))?;
293 return Err(JfyiError::NotAValidator(peer).into())
294 },
295 Some(auth_id) => auth_id,
296 };
297
298 if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) {
300 gum::debug!(
301 target: LOG_TARGET,
302 ?authority_id,
303 ?peer,
304 "Peer hit the rate limit - dropping message."
305 );
306 req.send_outgoing_response(OutgoingResponse {
307 result: Err(()),
308 reputation_changes: vec![],
309 sent_feedback: None,
310 })
311 .map_err(|_| JfyiError::SendResponses(vec![peer]))?;
312 return Err(JfyiError::AuthorityFlooding(authority_id))
313 }
314 Ok(())
315 }
316
317 async fn start_import_or_batch(
322 &mut self,
323 incoming: IncomingRequest<DisputeRequest>,
324 ) -> Result<()> {
325 let IncomingRequest { peer, payload, pending_response } = incoming;
326
327 let info = self
328 .runtime
329 .get_session_info_by_index(
330 &mut self.sender,
331 payload.0.candidate_receipt.descriptor.relay_parent(),
332 payload.0.session_index,
333 )
334 .await?;
335
336 let votes_result = payload.0.try_into_signed_votes(&info.session_info);
337
338 let (candidate_receipt, valid_vote, invalid_vote) = match votes_result {
339 Err(()) => {
340 pending_response
342 .send_outgoing_response(OutgoingResponse {
343 result: Err(()),
344 reputation_changes: vec![COST_INVALID_SIGNATURE],
345 sent_feedback: None,
346 })
347 .map_err(|_| JfyiError::SetPeerReputation(peer))?;
348
349 return Err(From::from(JfyiError::InvalidSignature(peer)))
350 },
351 Ok(votes) => votes,
352 };
353
354 let candidate_hash = *valid_vote.0.candidate_hash();
355
356 match self.batches.find_batch(candidate_hash, candidate_receipt)? {
357 FoundBatch::Created(batch) => {
358 gum::trace!(
360 target: LOG_TARGET,
361 ?candidate_hash,
362 ?peer,
363 "No batch yet - triggering immediate import"
364 );
365 let import = PreparedImport {
366 candidate_receipt: batch.candidate_receipt().clone(),
367 statements: vec![valid_vote, invalid_vote],
368 requesters: vec![(peer, pending_response)],
369 };
370 self.start_import(import).await;
371 },
372 FoundBatch::Found(batch) => {
373 gum::trace!(target: LOG_TARGET, ?candidate_hash, "Batch exists - batching request");
374 let batch_result =
375 batch.add_votes(valid_vote, invalid_vote, peer, pending_response);
376
377 if let Err(pending_response) = batch_result {
378 gum::debug!(
386 target: LOG_TARGET,
387 ?peer,
388 "Peer sent completely redundant votes within a single batch - that looks fishy!",
389 );
390 pending_response
391 .send_outgoing_response(OutgoingResponse {
392 result: Err(()),
397 reputation_changes: Vec::new(),
398 sent_feedback: None,
399 })
400 .map_err(|_| JfyiError::SendResponses(vec![peer]))?;
401 return Err(From::from(JfyiError::RedundantMessage(peer)))
402 }
403 },
404 }
405
406 Ok(())
407 }
408
409 async fn import_ready_batches(&mut self, ready_imports: Vec<PreparedImport>) {
411 for import in ready_imports {
412 self.start_import(import).await;
413 }
414 }
415
416 async fn start_import(&mut self, import: PreparedImport) {
418 let PreparedImport { candidate_receipt, statements, requesters } = import;
419 let (session_index, candidate_hash) = match statements.iter().next() {
420 None => {
421 gum::debug!(
422 target: LOG_TARGET,
423 candidate_hash = ?candidate_receipt.hash(),
424 "Not importing empty batch"
425 );
426 return
427 },
428 Some(vote) => (vote.0.session_index(), *vote.0.candidate_hash()),
429 };
430
431 let (pending_confirmation, confirmation_rx) = oneshot::channel();
432 self.sender
433 .send_message(DisputeCoordinatorMessage::ImportStatements {
434 candidate_receipt,
435 session: session_index,
436 statements,
437 pending_confirmation: Some(pending_confirmation),
438 })
439 .await;
440
441 let pending =
442 PendingImport { candidate_hash, requesters, pending_response: confirmation_rx };
443
444 self.pending_imports.push(pending);
445 }
446
447 fn update_imported_requests_metrics(&self, result: &ImportResult) {
448 let label = match result.result {
449 ImportStatementsResult::ValidImport => SUCCEEDED,
450 ImportStatementsResult::InvalidImport => FAILED,
451 };
452 self.metrics.on_imported(label, result.requesters.len());
453 }
454}
455
456async fn send_responses_to_requesters(import_result: ImportResult) -> JfyiResult<()> {
457 let ImportResult { requesters, result } = import_result;
458
459 let mk_response = match result {
460 ImportStatementsResult::ValidImport => || OutgoingResponse {
461 result: Ok(DisputeResponse::Confirmed),
462 reputation_changes: Vec::new(),
463 sent_feedback: None,
464 },
465 ImportStatementsResult::InvalidImport => || OutgoingResponse {
466 result: Err(()),
467 reputation_changes: vec![COST_INVALID_IMPORT],
468 sent_feedback: None,
469 },
470 };
471
472 let mut sending_failed_for = Vec::new();
473 for (peer, pending_response) in requesters {
474 if let Err(()) = pending_response.send_outgoing_response(mk_response()) {
475 sending_failed_for.push(peer);
476 }
477 }
478
479 if !sending_failed_for.is_empty() {
480 Err(JfyiError::SendResponses(sending_failed_for))
481 } else {
482 Ok(())
483 }
484}
485
486struct PendingImport {
493 candidate_hash: CandidateHash,
494 requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
495 pending_response: oneshot::Receiver<ImportStatementsResult>,
496}
497
498struct ImportResult {
500 requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
502 result: ImportStatementsResult,
504}
505
506impl PendingImport {
507 async fn wait_for_result(&mut self) -> JfyiResult<ImportResult> {
508 let result = (&mut self.pending_response)
509 .await
510 .map_err(|_| JfyiError::ImportCanceled(self.candidate_hash))?;
511 Ok(ImportResult { requesters: std::mem::take(&mut self.requesters), result })
512 }
513}
514
515impl Future for PendingImport {
516 type Output = JfyiResult<ImportResult>;
517 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
518 let fut = self.wait_for_result();
519 pin_mut!(fut);
520 fut.poll(cx)
521 }
522}