polkadot_dispute_distribution/sender/
mod.rs1use std::{
18 collections::{BTreeMap, HashMap, HashSet},
19 pin::Pin,
20 task::Poll,
21 time::Duration,
22};
23
24use futures::{channel::oneshot, future::poll_fn, Future};
25
26use futures_timer::Delay;
27use indexmap::{map::Entry, IndexMap};
28use polkadot_node_network_protocol::request_response::v1::DisputeRequest;
29use polkadot_node_primitives::{DisputeMessage, DisputeStatus};
30use polkadot_node_subsystem::{
31 messages::DisputeCoordinatorMessage, overseer, ActiveLeavesUpdate, SubsystemSender,
32};
33use polkadot_node_subsystem_util::{nesting_sender::NestingSender, runtime::RuntimeInfo};
34use polkadot_primitives::{CandidateHash, Hash, SessionIndex};
35
36mod send_task;
44use send_task::SendTask;
45pub use send_task::TaskFinish;
46
47mod error;
49pub use error::{Error, FatalError, JfyiError, Result};
50
51use self::error::JfyiErrorResult;
52use crate::{Metrics, LOG_TARGET, SEND_RATE_LIMIT};
53
54#[derive(Debug)]
56pub enum DisputeSenderMessage {
57 TaskFinish(TaskFinish),
59 ActiveDisputesReady(JfyiErrorResult<BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>>),
61}
62
63pub struct DisputeSender<M> {
71 active_heads: Vec<Hash>,
73
74 active_sessions: HashMap<SessionIndex, Hash>,
78
79 disputes: IndexMap<CandidateHash, SendTask<M>>,
83
84 tx: NestingSender<M, DisputeSenderMessage>,
86
87 waiting_for_active_disputes: Option<WaitForActiveDisputesState>,
89
90 rate_limit: RateLimit,
92
93 metrics: Metrics,
95}
96
97struct WaitForActiveDisputesState {
102 have_new_sessions: bool,
104}
105
106#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
107impl<M: 'static + Send + Sync> DisputeSender<M> {
108 pub fn new(tx: NestingSender<M, DisputeSenderMessage>, metrics: Metrics) -> Self {
110 Self {
111 active_heads: Vec::new(),
112 active_sessions: HashMap::new(),
113 disputes: IndexMap::new(),
114 tx,
115 waiting_for_active_disputes: None,
116 rate_limit: RateLimit::new(),
117 metrics,
118 }
119 }
120
121 pub async fn start_sender<Context>(
126 &mut self,
127 ctx: &mut Context,
128 runtime: &mut RuntimeInfo,
129 msg: DisputeMessage,
130 ) -> Result<()> {
131 let req: DisputeRequest = msg.into();
132 let candidate_hash = req.0.candidate_receipt.hash();
133 match self.disputes.entry(candidate_hash) {
134 Entry::Occupied(_) => {
135 gum::trace!(target: LOG_TARGET, ?candidate_hash, "Dispute sending already active.");
136 return Ok(())
137 },
138 Entry::Vacant(vacant) => {
139 self.rate_limit.limit("in start_sender", candidate_hash).await;
140
141 let send_task = SendTask::new(
142 ctx,
143 runtime,
144 &self.active_sessions,
145 NestingSender::new(self.tx.clone(), DisputeSenderMessage::TaskFinish),
146 req,
147 &self.metrics,
148 )
149 .await?;
150 vacant.insert(send_task);
151 },
152 }
153 Ok(())
154 }
155
156 pub async fn on_message<Context>(
158 &mut self,
159 ctx: &mut Context,
160 runtime: &mut RuntimeInfo,
161 msg: DisputeSenderMessage,
162 ) -> Result<()> {
163 match msg {
164 DisputeSenderMessage::TaskFinish(msg) => {
165 let TaskFinish { candidate_hash, receiver, result } = msg;
166
167 self.metrics.on_sent_request(result.as_metrics_label());
168
169 let task = match self.disputes.get_mut(&candidate_hash) {
170 None => {
171 gum::trace!(
173 target: LOG_TARGET,
174 ?result,
175 "Received `FromSendingTask::Finished` for non existing dispute."
176 );
177 return Ok(())
178 },
179 Some(task) => task,
180 };
181 task.on_finished_send(&receiver, result);
182 },
183 DisputeSenderMessage::ActiveDisputesReady(result) => {
184 let state = self.waiting_for_active_disputes.take();
185 let have_new_sessions = state.map(|s| s.have_new_sessions).unwrap_or(false);
186 let active_disputes = result?;
187 self.handle_new_active_disputes(ctx, runtime, active_disputes, have_new_sessions)
188 .await?;
189 },
190 }
191 Ok(())
192 }
193
194 pub async fn update_leaves<Context>(
198 &mut self,
199 ctx: &mut Context,
200 runtime: &mut RuntimeInfo,
201 update: ActiveLeavesUpdate,
202 ) -> Result<()> {
203 let ActiveLeavesUpdate { activated, deactivated } = update;
204 let deactivated: HashSet<_> = deactivated.into_iter().collect();
205 self.active_heads.retain(|h| !deactivated.contains(h));
206 self.active_heads.extend(activated.into_iter().map(|l| l.hash));
207
208 let have_new_sessions = self.refresh_sessions(ctx, runtime).await?;
209
210 match self.waiting_for_active_disputes.take() {
212 None => {
213 self.waiting_for_active_disputes =
214 Some(WaitForActiveDisputesState { have_new_sessions });
215 let mut sender = ctx.sender().clone();
216 let mut tx = self.tx.clone();
217
218 let get_active_disputes_task = async move {
219 let result = get_active_disputes(&mut sender).await;
220 let result =
221 tx.send_message(DisputeSenderMessage::ActiveDisputesReady(result)).await;
222 if let Err(err) = result {
223 gum::debug!(
224 target: LOG_TARGET,
225 ?err,
226 "Sending `DisputeSenderMessage` from background task failed."
227 );
228 }
229 };
230
231 ctx.spawn("get_active_disputes", Box::pin(get_active_disputes_task))
232 .map_err(FatalError::SpawnTask)?;
233 },
234 Some(state) => {
235 let have_new_sessions = state.have_new_sessions || have_new_sessions;
236 let new_state = WaitForActiveDisputesState { have_new_sessions };
237 self.waiting_for_active_disputes = Some(new_state);
238 gum::debug!(
239 target: LOG_TARGET,
240 "Dispute coordinator slow? We are still waiting for data on next active leaves update."
241 );
242 },
243 }
244 Ok(())
245 }
246
247 async fn handle_new_active_disputes<Context>(
255 &mut self,
256 ctx: &mut Context,
257 runtime: &mut RuntimeInfo,
258 active_disputes: BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>,
259 have_new_sessions: bool,
260 ) -> Result<()> {
261 let active_disputes: HashSet<_> =
262 active_disputes.into_iter().map(|((_, c), _)| c).collect();
263
264 self.disputes
266 .retain(|candidate_hash, _| active_disputes.contains(candidate_hash));
267
268 let mut should_rate_limit = true;
270 for (candidate_hash, dispute) in self.disputes.iter_mut() {
271 if have_new_sessions || dispute.has_failed_sends() {
272 if should_rate_limit {
273 self.rate_limit
274 .limit("while going through new sessions/failed sends", *candidate_hash)
275 .await;
276 }
277 let sends_happened = dispute
278 .refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics)
279 .await?;
280 should_rate_limit = sends_happened && have_new_sessions;
287 }
288 }
289 Ok(())
290 }
291
292 async fn refresh_sessions<Context>(
296 &mut self,
297 ctx: &mut Context,
298 runtime: &mut RuntimeInfo,
299 ) -> Result<bool> {
300 let new_sessions = get_active_session_indices(ctx, runtime, &self.active_heads).await?;
301 let new_sessions_raw: HashSet<_> = new_sessions.keys().collect();
302 let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect();
303 let updated = new_sessions_raw != old_sessions_raw;
304 self.active_sessions = new_sessions;
306 Ok(updated)
307 }
308}
309
310struct RateLimit {
314 limit: Delay,
315}
316
317impl RateLimit {
318 fn new() -> Self {
320 Self { limit: Delay::new(Duration::new(0, 0)) }
322 }
323
324 fn new_limit() -> Self {
326 Self { limit: Delay::new(SEND_RATE_LIMIT) }
327 }
328
329 async fn limit(&mut self, occasion: &'static str, candidate_hash: CandidateHash) {
333 let mut num_wakes: u32 = 0;
335 poll_fn(|cx| {
336 let old_limit = Pin::new(&mut self.limit);
337 match old_limit.poll(cx) {
338 Poll::Pending => {
339 gum::debug!(
340 target: LOG_TARGET,
341 ?occasion,
342 ?candidate_hash,
343 ?num_wakes,
344 "Sending rate limit hit, slowing down requests"
345 );
346 num_wakes += 1;
347 Poll::Pending
348 },
349 Poll::Ready(()) => Poll::Ready(()),
350 }
351 })
352 .await;
353 *self = Self::new_limit();
354 }
355}
356
357#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
361async fn get_active_session_indices<Context>(
362 ctx: &mut Context,
363 runtime: &mut RuntimeInfo,
364 active_heads: &Vec<Hash>,
365) -> Result<HashMap<SessionIndex, Hash>> {
366 let mut indices = HashMap::new();
367 for head in active_heads {
369 let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?;
370 if let Err(err) =
372 runtime.get_session_info_by_index(ctx.sender(), *head, session_index).await
373 {
374 gum::debug!(target: LOG_TARGET, ?err, ?session_index, "Can't cache SessionInfo");
375 }
376 indices.insert(session_index, *head);
377 }
378 Ok(indices)
379}
380
381async fn get_active_disputes<Sender>(
383 sender: &mut Sender,
384) -> JfyiErrorResult<BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>>
385where
386 Sender: SubsystemSender<DisputeCoordinatorMessage>,
387{
388 let (tx, rx) = oneshot::channel();
389
390 sender.send_message(DisputeCoordinatorMessage::ActiveDisputes(tx)).await;
391 rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
392}