use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{
channel::oneshot,
future::poll_fn,
pin_mut,
stream::{FuturesUnordered, StreamExt},
Future,
};
use gum::CandidateHash;
use polkadot_node_network_protocol::{
authority_discovery::AuthorityDiscovery,
request_response::{
incoming::{self, OutgoingResponse, OutgoingResponseSender},
v1::{DisputeRequest, DisputeResponse},
IncomingRequest, IncomingRequestReceiver,
},
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_node_primitives::DISPUTE_WINDOW;
use polkadot_node_subsystem::{
messages::{DisputeCoordinatorMessage, ImportStatementsResult},
overseer,
};
use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo};
use crate::{
metrics::{FAILED, SUCCEEDED},
Metrics, LOG_TARGET,
};
mod error;
mod peer_queues;
mod batches;
use self::{
batches::{Batches, FoundBatch, PreparedImport},
error::{log_error, JfyiError, JfyiResult, Result},
peer_queues::PeerQueues,
};
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid.");
const COST_NOT_A_VALIDATOR: Rep = Rep::CostMajor("Reporting peer was not a validator.");
const COST_INVALID_IMPORT: Rep =
Rep::CostMinor("Import was deemed invalid by dispute-coordinator.");
const COST_APPARENT_FLOOD: Rep = Rep::CostMinor("Peer exceeded the rate limit.");
#[cfg(not(test))]
pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 10;
#[cfg(test)]
pub const MIN_KEEP_BATCH_ALIVE_VOTES: u32 = 2;
pub const BATCH_COLLECTING_INTERVAL: Duration = Duration::from_millis(500);
pub struct DisputesReceiver<Sender, AD> {
runtime: RuntimeInfo,
sender: Sender,
receiver: IncomingRequestReceiver<DisputeRequest>,
peer_queues: PeerQueues,
batches: Batches,
authority_discovery: AD,
pending_imports: FuturesUnordered<PendingImport>,
metrics: Metrics,
}
enum MuxedMessage {
ConfirmedImport(ImportResult),
NewRequest(IncomingRequest<DisputeRequest>),
WakePeerQueuesPopReqs(Vec<IncomingRequest<DisputeRequest>>),
WakeCheckBatches(Vec<PreparedImport>),
}
impl<Sender, AD> DisputesReceiver<Sender, AD>
where
AD: AuthorityDiscovery,
Sender: overseer::DisputeDistributionSenderTrait,
{
pub fn new(
sender: Sender,
receiver: IncomingRequestReceiver<DisputeRequest>,
authority_discovery: AD,
metrics: Metrics,
) -> Self {
let runtime = RuntimeInfo::new_with_config(runtime::Config {
keystore: None,
session_cache_lru_size: DISPUTE_WINDOW.get(),
});
Self {
runtime,
sender,
receiver,
peer_queues: PeerQueues::new(),
batches: Batches::new(),
authority_discovery,
pending_imports: FuturesUnordered::new(),
metrics,
}
}
pub async fn run(mut self) {
loop {
match log_error(self.run_inner().await) {
Ok(()) => {},
Err(fatal) => {
gum::debug!(
target: LOG_TARGET,
error = ?fatal,
"Shutting down"
);
return
},
}
}
}
async fn run_inner(&mut self) -> Result<()> {
let msg = self.receive_message().await?;
match msg {
MuxedMessage::NewRequest(req) => {
self.metrics.on_received_request();
self.dispatch_to_queues(req).await?;
},
MuxedMessage::WakePeerQueuesPopReqs(reqs) => {
for req in reqs {
match log_error(self.start_import_or_batch(req).await) {
Ok(()) => {},
Err(fatal) => return Err(fatal.into()),
}
}
},
MuxedMessage::WakeCheckBatches(ready_imports) => {
self.import_ready_batches(ready_imports).await;
},
MuxedMessage::ConfirmedImport(import_result) => {
self.update_imported_requests_metrics(&import_result);
send_responses_to_requesters(import_result).await?;
},
}
Ok(())
}
async fn receive_message(&mut self) -> Result<MuxedMessage> {
poll_fn(|ctx| {
if let Poll::Ready(Some(v)) = self.pending_imports.poll_next_unpin(ctx) {
return Poll::Ready(Ok(MuxedMessage::ConfirmedImport(v?)))
}
let rate_limited = self.peer_queues.pop_reqs();
pin_mut!(rate_limited);
if let Poll::Ready(reqs) = rate_limited.poll(ctx) {
return Poll::Ready(Ok(MuxedMessage::WakePeerQueuesPopReqs(reqs)))
}
let ready_batches = self.batches.check_batches();
pin_mut!(ready_batches);
if let Poll::Ready(ready_batches) = ready_batches.poll(ctx) {
return Poll::Ready(Ok(MuxedMessage::WakeCheckBatches(ready_batches)))
}
let next_req = self.receiver.recv(|| vec![COST_INVALID_REQUEST]);
pin_mut!(next_req);
if let Poll::Ready(r) = next_req.poll(ctx) {
return match r {
Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())),
Ok(v) => Poll::Ready(Ok(MuxedMessage::NewRequest(v))),
}
}
Poll::Pending
})
.await
}
async fn dispatch_to_queues(&mut self, req: IncomingRequest<DisputeRequest>) -> JfyiResult<()> {
let peer = req.peer;
let authority_id = match self
.authority_discovery
.get_authority_ids_by_peer_id(peer)
.await
.and_then(|s| s.into_iter().next())
{
None => {
req.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_NOT_A_VALIDATOR],
sent_feedback: None,
})
.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
return Err(JfyiError::NotAValidator(peer).into())
},
Some(auth_id) => auth_id,
};
if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) {
gum::debug!(
target: LOG_TARGET,
?authority_id,
?peer,
"Peer hit the rate limit - dropping message."
);
req.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_APPARENT_FLOOD],
sent_feedback: None,
})
.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
return Err(JfyiError::AuthorityFlooding(authority_id))
}
Ok(())
}
async fn start_import_or_batch(
&mut self,
incoming: IncomingRequest<DisputeRequest>,
) -> Result<()> {
let IncomingRequest { peer, payload, pending_response } = incoming;
let info = self
.runtime
.get_session_info_by_index(
&mut self.sender,
payload.0.candidate_receipt.descriptor.relay_parent(),
payload.0.session_index,
)
.await?;
let votes_result = payload.0.try_into_signed_votes(&info.session_info);
let (candidate_receipt, valid_vote, invalid_vote) = match votes_result {
Err(()) => {
pending_response
.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_INVALID_SIGNATURE],
sent_feedback: None,
})
.map_err(|_| JfyiError::SetPeerReputation(peer))?;
return Err(From::from(JfyiError::InvalidSignature(peer)))
},
Ok(votes) => votes,
};
let candidate_hash = *valid_vote.0.candidate_hash();
match self.batches.find_batch(candidate_hash, candidate_receipt)? {
FoundBatch::Created(batch) => {
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
?peer,
"No batch yet - triggering immediate import"
);
let import = PreparedImport {
candidate_receipt: batch.candidate_receipt().clone(),
statements: vec![valid_vote, invalid_vote],
requesters: vec![(peer, pending_response)],
};
self.start_import(import).await;
},
FoundBatch::Found(batch) => {
gum::trace!(target: LOG_TARGET, ?candidate_hash, "Batch exists - batching request");
let batch_result =
batch.add_votes(valid_vote, invalid_vote, peer, pending_response);
if let Err(pending_response) = batch_result {
gum::debug!(
target: LOG_TARGET,
?peer,
"Peer sent completely redundant votes within a single batch - that looks fishy!",
);
pending_response
.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: Vec::new(),
sent_feedback: None,
})
.map_err(|_| JfyiError::SendResponses(vec![peer]))?;
return Err(From::from(JfyiError::RedundantMessage(peer)))
}
},
}
Ok(())
}
async fn import_ready_batches(&mut self, ready_imports: Vec<PreparedImport>) {
for import in ready_imports {
self.start_import(import).await;
}
}
async fn start_import(&mut self, import: PreparedImport) {
let PreparedImport { candidate_receipt, statements, requesters } = import;
let (session_index, candidate_hash) = match statements.iter().next() {
None => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_receipt.hash(),
"Not importing empty batch"
);
return
},
Some(vote) => (vote.0.session_index(), *vote.0.candidate_hash()),
};
let (pending_confirmation, confirmation_rx) = oneshot::channel();
self.sender
.send_message(DisputeCoordinatorMessage::ImportStatements {
candidate_receipt,
session: session_index,
statements,
pending_confirmation: Some(pending_confirmation),
})
.await;
let pending =
PendingImport { candidate_hash, requesters, pending_response: confirmation_rx };
self.pending_imports.push(pending);
}
fn update_imported_requests_metrics(&self, result: &ImportResult) {
let label = match result.result {
ImportStatementsResult::ValidImport => SUCCEEDED,
ImportStatementsResult::InvalidImport => FAILED,
};
self.metrics.on_imported(label, result.requesters.len());
}
}
async fn send_responses_to_requesters(import_result: ImportResult) -> JfyiResult<()> {
let ImportResult { requesters, result } = import_result;
let mk_response = match result {
ImportStatementsResult::ValidImport => || OutgoingResponse {
result: Ok(DisputeResponse::Confirmed),
reputation_changes: Vec::new(),
sent_feedback: None,
},
ImportStatementsResult::InvalidImport => || OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_INVALID_IMPORT],
sent_feedback: None,
},
};
let mut sending_failed_for = Vec::new();
for (peer, pending_response) in requesters {
if let Err(()) = pending_response.send_outgoing_response(mk_response()) {
sending_failed_for.push(peer);
}
}
if !sending_failed_for.is_empty() {
Err(JfyiError::SendResponses(sending_failed_for))
} else {
Ok(())
}
}
struct PendingImport {
candidate_hash: CandidateHash,
requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
pending_response: oneshot::Receiver<ImportStatementsResult>,
}
struct ImportResult {
requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
result: ImportStatementsResult,
}
impl PendingImport {
async fn wait_for_result(&mut self) -> JfyiResult<ImportResult> {
let result = (&mut self.pending_response)
.await
.map_err(|_| JfyiError::ImportCanceled(self.candidate_hash))?;
Ok(ImportResult { requesters: std::mem::take(&mut self.requesters), result })
}
}
impl Future for PendingImport {
type Output = JfyiResult<ImportResult>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let fut = self.wait_for_result();
pin_mut!(fut);
fut.poll(cx)
}
}