use std::collections::{HashMap, HashSet};
use futures::{Future, FutureExt};
use polkadot_node_network_protocol::{
request_response::{
outgoing::RequestError,
v1::{DisputeRequest, DisputeResponse},
OutgoingRequest, OutgoingResult, Recipient, Requests,
},
IfDisconnected,
};
use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, overseer};
use polkadot_node_subsystem_util::{metrics, nesting_sender::NestingSender, runtime::RuntimeInfo};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex,
};
use super::error::{FatalError, Result};
use crate::{
metrics::{FAILED, SUCCEEDED},
Metrics, LOG_TARGET,
};
pub struct SendTask<M> {
request: DisputeRequest,
deliveries: HashMap<AuthorityDiscoveryId, DeliveryStatus>,
has_failed_sends: bool,
tx: NestingSender<M, TaskFinish>,
}
enum DeliveryStatus {
Pending,
Succeeded,
}
#[derive(Debug)]
pub struct TaskFinish {
pub candidate_hash: CandidateHash,
pub receiver: AuthorityDiscoveryId,
pub result: TaskResult,
}
#[derive(Debug)]
pub enum TaskResult {
Succeeded,
Failed(RequestError),
}
impl TaskResult {
pub fn as_metrics_label(&self) -> &'static str {
match self {
Self::Succeeded => SUCCEEDED,
Self::Failed(_) => FAILED,
}
}
}
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
impl<M: 'static + Send + Sync> SendTask<M> {
pub async fn new<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_sessions: &HashMap<SessionIndex, Hash>,
tx: NestingSender<M, TaskFinish>,
request: DisputeRequest,
metrics: &Metrics,
) -> Result<Self> {
let mut send_task =
Self { request, deliveries: HashMap::new(), has_failed_sends: false, tx };
send_task.refresh_sends(ctx, runtime, active_sessions, metrics).await?;
Ok(send_task)
}
pub async fn refresh_sends<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_sessions: &HashMap<SessionIndex, Hash>,
metrics: &Metrics,
) -> Result<bool> {
let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?;
let add_authorities: Vec<_> = new_authorities
.iter()
.filter(|a| !self.deliveries.contains_key(a))
.map(Clone::clone)
.collect();
gum::trace!(
target: LOG_TARGET,
already_running_deliveries = ?self.deliveries.len(),
"Cleaning up deliveries"
);
self.deliveries.retain(|k, _| new_authorities.contains(k));
gum::trace!(
target: LOG_TARGET,
new_and_failed_authorities = ?add_authorities.len(),
overall_authority_set_size = ?new_authorities.len(),
already_running_deliveries = ?self.deliveries.len(),
"Starting new send requests for authorities."
);
let new_statuses =
send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics)
.await?;
let was_empty = new_statuses.is_empty();
gum::trace!(
target: LOG_TARGET,
sent_requests = ?new_statuses.len(),
"Requests dispatched."
);
self.has_failed_sends = false;
self.deliveries.extend(new_statuses.into_iter());
Ok(!was_empty)
}
pub fn has_failed_sends(&self) -> bool {
self.has_failed_sends
}
pub fn on_finished_send(&mut self, authority: &AuthorityDiscoveryId, result: TaskResult) {
match result {
TaskResult::Failed(err) => {
gum::trace!(
target: LOG_TARGET,
?authority,
candidate_hash = %self.request.0.candidate_receipt.hash(),
%err,
"Error sending dispute statements to node."
);
self.has_failed_sends = true;
self.deliveries.remove(authority);
},
TaskResult::Succeeded => {
let status = match self.deliveries.get_mut(&authority) {
None => {
gum::debug!(
target: LOG_TARGET,
candidate = ?self.request.0.candidate_receipt.hash(),
?authority,
?result,
"Received `FromSendingTask::Finished` for non existing task."
);
return
},
Some(status) => status,
};
*status = DeliveryStatus::Succeeded;
},
}
}
async fn get_relevant_validators<Context>(
&self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_sessions: &HashMap<SessionIndex, Hash>,
) -> Result<HashSet<AuthorityDiscoveryId>> {
let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent();
let info = runtime
.get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
.await?;
let session_info = &info.session_info;
let validator_count = session_info.validators.len();
let mut authorities: HashSet<_> = session_info
.discovery_keys
.iter()
.take(validator_count)
.enumerate()
.filter(|(i, _)| Some(ValidatorIndex(*i as _)) != info.validator_info.our_index)
.map(|(_, v)| v.clone())
.collect();
for (session_index, head) in active_sessions.iter() {
let info =
runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
let session_info = &info.session_info;
let new_set = session_info
.discovery_keys
.iter()
.enumerate()
.filter(|(i, _)| Some(ValidatorIndex(*i as _)) != info.validator_info.our_index)
.map(|(_, v)| v.clone());
authorities.extend(new_set);
}
Ok(authorities)
}
}
#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
async fn send_requests<Context, M: 'static + Send + Sync>(
ctx: &mut Context,
tx: NestingSender<M, TaskFinish>,
receivers: Vec<AuthorityDiscoveryId>,
req: DisputeRequest,
metrics: &Metrics,
) -> Result<HashMap<AuthorityDiscoveryId, DeliveryStatus>> {
let mut statuses = HashMap::with_capacity(receivers.len());
let mut reqs = Vec::with_capacity(receivers.len());
for receiver in receivers {
let (outgoing, pending_response) =
OutgoingRequest::new(Recipient::Authority(receiver.clone()), req.clone());
reqs.push(Requests::DisputeSendingV1(outgoing));
let fut = wait_response_task(
pending_response,
req.0.candidate_receipt.hash(),
receiver.clone(),
tx.clone(),
metrics.time_dispute_request(),
);
ctx.spawn("dispute-sender", fut.boxed()).map_err(FatalError::SpawnTask)?;
statuses.insert(receiver, DeliveryStatus::Pending);
}
let msg = NetworkBridgeTxMessage::SendRequests(reqs, IfDisconnected::ImmediateError);
ctx.send_message(msg).await;
Ok(statuses)
}
async fn wait_response_task<M: 'static + Send + Sync>(
pending_response: impl Future<Output = OutgoingResult<DisputeResponse>>,
candidate_hash: CandidateHash,
receiver: AuthorityDiscoveryId,
mut tx: NestingSender<M, TaskFinish>,
_timer: Option<metrics::prometheus::prometheus::HistogramTimer>,
) {
let result = pending_response.await;
let msg = match result {
Err(err) => TaskFinish { candidate_hash, receiver, result: TaskResult::Failed(err) },
Ok(DisputeResponse::Confirmed) =>
TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded },
};
if let Err(err) = tx.send_message(msg).await {
gum::debug!(
target: LOG_TARGET,
%err,
"Failed to notify subsystem about dispute sending result."
);
}
}