polkadot_dispute_distribution/sender/
send_task.rs1use std::collections::{HashMap, HashSet};
18
19use futures::{Future, FutureExt};
20
21use polkadot_node_network_protocol::{
22 request_response::{
23 outgoing::RequestError,
24 v1::{DisputeRequest, DisputeResponse},
25 OutgoingRequest, OutgoingResult, Recipient, Requests,
26 },
27 IfDisconnected,
28};
29use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, overseer};
30use polkadot_node_subsystem_util::{metrics, nesting_sender::NestingSender, runtime::RuntimeInfo};
31use polkadot_primitives::{
32 AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex,
33};
34
35use super::error::{FatalError, Result};
36
37use crate::{
38 metrics::{FAILED, SUCCEEDED},
39 Metrics, LOG_TARGET,
40};
41
42pub struct SendTask<M> {
48 request: DisputeRequest,
51
52 deliveries: HashMap<AuthorityDiscoveryId, DeliveryStatus>,
56
57 has_failed_sends: bool,
59
60 tx: NestingSender<M, TaskFinish>,
62}
63
64enum DeliveryStatus {
66 Pending,
68 Succeeded,
70}
71
72#[derive(Debug)]
74pub struct TaskFinish {
75 pub candidate_hash: CandidateHash,
77 pub receiver: AuthorityDiscoveryId,
79 pub result: TaskResult,
81}
82
83#[derive(Debug)]
84pub enum TaskResult {
85 Succeeded,
87 Failed(RequestError),
91}
92
93impl TaskResult {
94 pub fn as_metrics_label(&self) -> &'static str {
95 match self {
96 Self::Succeeded => SUCCEEDED,
97 Self::Failed(_) => FAILED,
98 }
99 }
100}
101
102#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
103impl<M: 'static + Send + Sync> SendTask<M> {
104 pub async fn new<Context>(
110 ctx: &mut Context,
111 runtime: &mut RuntimeInfo,
112 active_sessions: &HashMap<SessionIndex, Hash>,
113 tx: NestingSender<M, TaskFinish>,
114 request: DisputeRequest,
115 metrics: &Metrics,
116 ) -> Result<Self> {
117 let mut send_task =
118 Self { request, deliveries: HashMap::new(), has_failed_sends: false, tx };
119 send_task.refresh_sends(ctx, runtime, active_sessions, metrics).await?;
120 Ok(send_task)
121 }
122
123 pub async fn refresh_sends<Context>(
134 &mut self,
135 ctx: &mut Context,
136 runtime: &mut RuntimeInfo,
137 active_sessions: &HashMap<SessionIndex, Hash>,
138 metrics: &Metrics,
139 ) -> Result<bool> {
140 let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?;
141
142 let add_authorities: Vec<_> = new_authorities
144 .iter()
145 .filter(|a| !self.deliveries.contains_key(a))
146 .map(Clone::clone)
147 .collect();
148
149 gum::trace!(
151 target: LOG_TARGET,
152 already_running_deliveries = ?self.deliveries.len(),
153 "Cleaning up deliveries"
154 );
155 self.deliveries.retain(|k, _| new_authorities.contains(k));
156
157 gum::trace!(
159 target: LOG_TARGET,
160 new_and_failed_authorities = ?add_authorities.len(),
161 overall_authority_set_size = ?new_authorities.len(),
162 already_running_deliveries = ?self.deliveries.len(),
163 "Starting new send requests for authorities."
164 );
165 let new_statuses =
166 send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics)
167 .await?;
168
169 let was_empty = new_statuses.is_empty();
170 gum::trace!(
171 target: LOG_TARGET,
172 sent_requests = ?new_statuses.len(),
173 "Requests dispatched."
174 );
175
176 self.has_failed_sends = false;
177 self.deliveries.extend(new_statuses.into_iter());
178 Ok(!was_empty)
179 }
180
181 pub fn has_failed_sends(&self) -> bool {
183 self.has_failed_sends
184 }
185
186 pub fn on_finished_send(&mut self, authority: &AuthorityDiscoveryId, result: TaskResult) {
191 match result {
192 TaskResult::Failed(err) => {
193 gum::trace!(
194 target: LOG_TARGET,
195 ?authority,
196 candidate_hash = %self.request.0.candidate_receipt.hash(),
197 %err,
198 "Error sending dispute statements to node."
199 );
200
201 self.has_failed_sends = true;
202 self.deliveries.remove(authority);
204 },
205 TaskResult::Succeeded => {
206 let status = match self.deliveries.get_mut(&authority) {
207 None => {
208 gum::debug!(
211 target: LOG_TARGET,
212 candidate = ?self.request.0.candidate_receipt.hash(),
213 ?authority,
214 ?result,
215 "Received `FromSendingTask::Finished` for non existing task."
216 );
217 return
218 },
219 Some(status) => status,
220 };
221 *status = DeliveryStatus::Succeeded;
223 },
224 }
225 }
226
227 async fn get_relevant_validators<Context>(
232 &self,
233 ctx: &mut Context,
234 runtime: &mut RuntimeInfo,
235 active_sessions: &HashMap<SessionIndex, Hash>,
236 ) -> Result<HashSet<AuthorityDiscoveryId>> {
237 let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent();
238 let info = runtime
241 .get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
242 .await?;
243 let session_info = &info.session_info;
244 let validator_count = session_info.validators.len();
245 let mut authorities: HashSet<_> = session_info
246 .discovery_keys
247 .iter()
248 .take(validator_count)
249 .enumerate()
250 .filter(|(i, _)| Some(ValidatorIndex(*i as _)) != info.validator_info.our_index)
251 .map(|(_, v)| v.clone())
252 .collect();
253
254 for (session_index, head) in active_sessions.iter() {
257 let info =
258 runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
259 let session_info = &info.session_info;
260 let new_set = session_info
261 .discovery_keys
262 .iter()
263 .enumerate()
264 .filter(|(i, _)| Some(ValidatorIndex(*i as _)) != info.validator_info.our_index)
265 .map(|(_, v)| v.clone());
266 authorities.extend(new_set);
267 }
268 Ok(authorities)
269 }
270}
271
272#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
276async fn send_requests<Context, M: 'static + Send + Sync>(
277 ctx: &mut Context,
278 tx: NestingSender<M, TaskFinish>,
279 receivers: Vec<AuthorityDiscoveryId>,
280 req: DisputeRequest,
281 metrics: &Metrics,
282) -> Result<HashMap<AuthorityDiscoveryId, DeliveryStatus>> {
283 let mut statuses = HashMap::with_capacity(receivers.len());
284 let mut reqs = Vec::with_capacity(receivers.len());
285
286 for receiver in receivers {
287 let (outgoing, pending_response) =
288 OutgoingRequest::new(Recipient::Authority(receiver.clone()), req.clone());
289
290 reqs.push(Requests::DisputeSendingV1(outgoing));
291
292 let fut = wait_response_task(
293 pending_response,
294 req.0.candidate_receipt.hash(),
295 receiver.clone(),
296 tx.clone(),
297 metrics.time_dispute_request(),
298 );
299
300 ctx.spawn("dispute-sender", fut.boxed()).map_err(FatalError::SpawnTask)?;
301 statuses.insert(receiver, DeliveryStatus::Pending);
302 }
303
304 let msg = NetworkBridgeTxMessage::SendRequests(reqs, IfDisconnected::ImmediateError);
305 ctx.send_message(msg).await;
306 Ok(statuses)
307}
308
309async fn wait_response_task<M: 'static + Send + Sync>(
311 pending_response: impl Future<Output = OutgoingResult<DisputeResponse>>,
312 candidate_hash: CandidateHash,
313 receiver: AuthorityDiscoveryId,
314 mut tx: NestingSender<M, TaskFinish>,
315 _timer: Option<metrics::prometheus::prometheus::HistogramTimer>,
316) {
317 let result = pending_response.await;
318 let msg = match result {
319 Err(err) => TaskFinish { candidate_hash, receiver, result: TaskResult::Failed(err) },
320 Ok(DisputeResponse::Confirmed) =>
321 TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded },
322 };
323 if let Err(err) = tx.send_message(msg).await {
324 gum::debug!(
325 target: LOG_TARGET,
326 %err,
327 "Failed to notify subsystem about dispute sending result."
328 );
329 }
330}