referrerpolicy=no-referrer-when-downgrade

polkadot_dispute_distribution/sender/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::{
18	collections::{BTreeMap, HashMap, HashSet},
19	pin::Pin,
20	task::Poll,
21	time::Duration,
22};
23
24use futures::{channel::oneshot, future::poll_fn, Future};
25
26use futures_timer::Delay;
27use indexmap::{map::Entry, IndexMap};
28use polkadot_node_network_protocol::request_response::v1::DisputeRequest;
29use polkadot_node_primitives::{DisputeMessage, DisputeStatus};
30use polkadot_node_subsystem::{
31	messages::DisputeCoordinatorMessage, overseer, ActiveLeavesUpdate, SubsystemSender,
32};
33use polkadot_node_subsystem_util::{nesting_sender::NestingSender, runtime::RuntimeInfo};
34use polkadot_primitives::{CandidateHash, Hash, SessionIndex};
35
36/// For each ongoing dispute we have a `SendTask` which takes care of it.
37///
38/// It is going to spawn real tasks as it sees fit for getting the votes of the particular dispute
39/// out.
40///
41/// As we assume disputes have a priority, we start sending for disputes in the order
42/// `start_sender` got called.
43mod send_task;
44use send_task::SendTask;
45pub use send_task::TaskFinish;
46
47/// Error and [`Result`] type for sender.
48mod error;
49pub use error::{Error, FatalError, JfyiError, Result};
50
51use self::error::JfyiErrorResult;
52use crate::{Metrics, LOG_TARGET, SEND_RATE_LIMIT};
53
54/// Messages as sent by background tasks.
55#[derive(Debug)]
56pub enum DisputeSenderMessage {
57	/// A task finished.
58	TaskFinish(TaskFinish),
59	/// A request for active disputes to the dispute-coordinator finished.
60	ActiveDisputesReady(JfyiErrorResult<BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>>),
61}
62
63/// The `DisputeSender` keeps track of all ongoing disputes we need to send statements out.
64///
65/// For each dispute a `SendTask` is responsible for sending to the concerned validators for that
66/// particular dispute. The `DisputeSender` keeps track of those tasks, informs them about new
67/// sessions/validator sets and cleans them up when they become obsolete.
68///
69/// The unit of work for the  `DisputeSender` is a dispute, represented by `SendTask`s.
70pub struct DisputeSender<M> {
71	/// All heads we currently consider active.
72	active_heads: Vec<Hash>,
73
74	/// List of currently active sessions.
75	///
76	/// Value is the hash that was used for the query.
77	active_sessions: HashMap<SessionIndex, Hash>,
78
79	/// All ongoing dispute sending this subsystem is aware of.
80	///
81	/// Using an `IndexMap` so items can be iterated in the order of insertion.
82	disputes: IndexMap<CandidateHash, SendTask<M>>,
83
84	/// Sender to be cloned for `SendTask`s.
85	tx: NestingSender<M, DisputeSenderMessage>,
86
87	/// `Some` if we are waiting for a response `DisputeCoordinatorMessage::ActiveDisputes`.
88	waiting_for_active_disputes: Option<WaitForActiveDisputesState>,
89
90	/// Future for delaying too frequent creation of dispute sending tasks.
91	rate_limit: RateLimit,
92
93	/// Metrics for reporting stats about sent requests.
94	metrics: Metrics,
95}
96
97/// State we keep while waiting for active disputes.
98///
99/// When we send `DisputeCoordinatorMessage::ActiveDisputes`, this is the state we keep while
100/// waiting for the response.
101struct WaitForActiveDisputesState {
102	/// Have we seen any new sessions since last refresh?
103	have_new_sessions: bool,
104}
105
106#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
107impl<M: 'static + Send + Sync> DisputeSender<M> {
108	/// Create a new `DisputeSender` which can be used to start dispute sending.
109	pub fn new(tx: NestingSender<M, DisputeSenderMessage>, metrics: Metrics) -> Self {
110		Self {
111			active_heads: Vec::new(),
112			active_sessions: HashMap::new(),
113			disputes: IndexMap::new(),
114			tx,
115			waiting_for_active_disputes: None,
116			rate_limit: RateLimit::new(),
117			metrics,
118		}
119	}
120
121	/// Create a `SendTask` for a particular new dispute.
122	///
123	/// This function is rate-limited by `SEND_RATE_LIMIT`. It will block if called too frequently
124	/// in order to maintain the limit.
125	pub async fn start_sender<Context>(
126		&mut self,
127		ctx: &mut Context,
128		runtime: &mut RuntimeInfo,
129		msg: DisputeMessage,
130	) -> Result<()> {
131		let req: DisputeRequest = msg.into();
132		let candidate_hash = req.0.candidate_receipt.hash();
133		match self.disputes.entry(candidate_hash) {
134			Entry::Occupied(_) => {
135				gum::trace!(target: LOG_TARGET, ?candidate_hash, "Dispute sending already active.");
136				return Ok(())
137			},
138			Entry::Vacant(vacant) => {
139				self.rate_limit.limit("in start_sender", candidate_hash).await;
140
141				let send_task = SendTask::new(
142					ctx,
143					runtime,
144					&self.active_sessions,
145					NestingSender::new(self.tx.clone(), DisputeSenderMessage::TaskFinish),
146					req,
147					&self.metrics,
148				)
149				.await?;
150				vacant.insert(send_task);
151			},
152		}
153		Ok(())
154	}
155
156	/// Receive message from a background task.
157	pub async fn on_message<Context>(
158		&mut self,
159		ctx: &mut Context,
160		runtime: &mut RuntimeInfo,
161		msg: DisputeSenderMessage,
162	) -> Result<()> {
163		match msg {
164			DisputeSenderMessage::TaskFinish(msg) => {
165				let TaskFinish { candidate_hash, receiver, result } = msg;
166
167				self.metrics.on_sent_request(result.as_metrics_label());
168
169				let task = match self.disputes.get_mut(&candidate_hash) {
170					None => {
171						// Can happen when a dispute ends, with messages still in queue:
172						gum::trace!(
173							target: LOG_TARGET,
174							?result,
175							"Received `FromSendingTask::Finished` for non existing dispute."
176						);
177						return Ok(())
178					},
179					Some(task) => task,
180				};
181				task.on_finished_send(&receiver, result);
182			},
183			DisputeSenderMessage::ActiveDisputesReady(result) => {
184				let state = self.waiting_for_active_disputes.take();
185				let have_new_sessions = state.map(|s| s.have_new_sessions).unwrap_or(false);
186				let active_disputes = result?;
187				self.handle_new_active_disputes(ctx, runtime, active_disputes, have_new_sessions)
188					.await?;
189			},
190		}
191		Ok(())
192	}
193
194	/// Take care of a change in active leaves.
195	///
196	/// Update our knowledge on sessions and initiate fetching for new active disputes.
197	pub async fn update_leaves<Context>(
198		&mut self,
199		ctx: &mut Context,
200		runtime: &mut RuntimeInfo,
201		update: ActiveLeavesUpdate,
202	) -> Result<()> {
203		let ActiveLeavesUpdate { activated, deactivated } = update;
204		let deactivated: HashSet<_> = deactivated.into_iter().collect();
205		self.active_heads.retain(|h| !deactivated.contains(h));
206		self.active_heads.extend(activated.into_iter().map(|l| l.hash));
207
208		let have_new_sessions = self.refresh_sessions(ctx, runtime).await?;
209
210		// Not yet waiting for data, request an update:
211		match self.waiting_for_active_disputes.take() {
212			None => {
213				self.waiting_for_active_disputes =
214					Some(WaitForActiveDisputesState { have_new_sessions });
215				let mut sender = ctx.sender().clone();
216				let mut tx = self.tx.clone();
217
218				let get_active_disputes_task = async move {
219					let result = get_active_disputes(&mut sender).await;
220					let result =
221						tx.send_message(DisputeSenderMessage::ActiveDisputesReady(result)).await;
222					if let Err(err) = result {
223						gum::debug!(
224							target: LOG_TARGET,
225							?err,
226							"Sending `DisputeSenderMessage` from background task failed."
227						);
228					}
229				};
230
231				ctx.spawn("get_active_disputes", Box::pin(get_active_disputes_task))
232					.map_err(FatalError::SpawnTask)?;
233			},
234			Some(state) => {
235				let have_new_sessions = state.have_new_sessions || have_new_sessions;
236				let new_state = WaitForActiveDisputesState { have_new_sessions };
237				self.waiting_for_active_disputes = Some(new_state);
238				gum::debug!(
239					target: LOG_TARGET,
240					"Dispute coordinator slow? We are still waiting for data on next active leaves update."
241				);
242			},
243		}
244		Ok(())
245	}
246
247	/// Handle new active disputes response.
248	///
249	/// - Initiate a retry of failed sends which are still active.
250	/// - Get new authorities to send messages to.
251	/// - Get rid of obsolete tasks and disputes.
252	///
253	/// This function ensures the `SEND_RATE_LIMIT`, therefore it might block.
254	async fn handle_new_active_disputes<Context>(
255		&mut self,
256		ctx: &mut Context,
257		runtime: &mut RuntimeInfo,
258		active_disputes: BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>,
259		have_new_sessions: bool,
260	) -> Result<()> {
261		let active_disputes: HashSet<_> =
262			active_disputes.into_iter().map(|((_, c), _)| c).collect();
263
264		// Cleanup obsolete senders (retain keeps order of remaining elements):
265		self.disputes
266			.retain(|candidate_hash, _| active_disputes.contains(candidate_hash));
267
268		// Iterates in order of insertion:
269		let mut should_rate_limit = true;
270		for (candidate_hash, dispute) in self.disputes.iter_mut() {
271			if have_new_sessions || dispute.has_failed_sends() {
272				if should_rate_limit {
273					self.rate_limit
274						.limit("while going through new sessions/failed sends", *candidate_hash)
275						.await;
276				}
277				let sends_happened = dispute
278					.refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics)
279					.await?;
280				// Only rate limit if we actually sent something out _and_ it was not just because
281				// of errors on previous sends.
282				//
283				// Reasoning: It would not be acceptable to slow down the whole subsystem, just
284				// because of a few bad peers having problems. It is actually better to risk
285				// running into their rate limit in that case and accept a minor reputation change.
286				should_rate_limit = sends_happened && have_new_sessions;
287			}
288		}
289		Ok(())
290	}
291
292	/// Make active sessions correspond to currently active heads.
293	///
294	/// Returns: true if sessions changed.
295	async fn refresh_sessions<Context>(
296		&mut self,
297		ctx: &mut Context,
298		runtime: &mut RuntimeInfo,
299	) -> Result<bool> {
300		let new_sessions = get_active_session_indices(ctx, runtime, &self.active_heads).await?;
301		let new_sessions_raw: HashSet<_> = new_sessions.keys().collect();
302		let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect();
303		let updated = new_sessions_raw != old_sessions_raw;
304		// Update in any case, so we use current heads for queries:
305		self.active_sessions = new_sessions;
306		Ok(updated)
307	}
308}
309
310/// Rate limiting logic.
311///
312/// Suitable for the sending side.
313struct RateLimit {
314	limit: Delay,
315}
316
317impl RateLimit {
318	/// Create new `RateLimit` that is immediately ready.
319	fn new() -> Self {
320		// Start with an empty duration, as there has not been any previous call.
321		Self { limit: Delay::new(Duration::new(0, 0)) }
322	}
323
324	/// Initialized with actual `SEND_RATE_LIMIT` duration.
325	fn new_limit() -> Self {
326		Self { limit: Delay::new(SEND_RATE_LIMIT) }
327	}
328
329	/// Wait until ready and prepare for next call.
330	///
331	/// String given as occasion and candidate hash are logged in case the rate limit hit.
332	async fn limit(&mut self, occasion: &'static str, candidate_hash: CandidateHash) {
333		// Wait for rate limit and add some logging:
334		let mut num_wakes: u32 = 0;
335		poll_fn(|cx| {
336			let old_limit = Pin::new(&mut self.limit);
337			match old_limit.poll(cx) {
338				Poll::Pending => {
339					gum::debug!(
340						target: LOG_TARGET,
341						?occasion,
342						?candidate_hash,
343						?num_wakes,
344						"Sending rate limit hit, slowing down requests"
345					);
346					num_wakes += 1;
347					Poll::Pending
348				},
349				Poll::Ready(()) => Poll::Ready(()),
350			}
351		})
352		.await;
353		*self = Self::new_limit();
354	}
355}
356
357/// Retrieve the currently active sessions.
358///
359/// List is all indices of all active sessions together with the head that was used for the query.
360#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
361async fn get_active_session_indices<Context>(
362	ctx: &mut Context,
363	runtime: &mut RuntimeInfo,
364	active_heads: &Vec<Hash>,
365) -> Result<HashMap<SessionIndex, Hash>> {
366	let mut indices = HashMap::new();
367	// Iterate all heads we track as active and fetch the child' session indices.
368	for head in active_heads {
369		let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?;
370		// Cache session info
371		if let Err(err) =
372			runtime.get_session_info_by_index(ctx.sender(), *head, session_index).await
373		{
374			gum::debug!(target: LOG_TARGET, ?err, ?session_index, "Can't cache SessionInfo");
375		}
376		indices.insert(session_index, *head);
377	}
378	Ok(indices)
379}
380
381/// Retrieve Set of active disputes from the dispute coordinator.
382async fn get_active_disputes<Sender>(
383	sender: &mut Sender,
384) -> JfyiErrorResult<BTreeMap<(SessionIndex, CandidateHash), DisputeStatus>>
385where
386	Sender: SubsystemSender<DisputeCoordinatorMessage>,
387{
388	let (tx, rx) = oneshot::channel();
389
390	sender.send_message(DisputeCoordinatorMessage::ActiveDisputes(tx)).await;
391	rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
392}