referrerpolicy=no-referrer-when-downgrade

polkadot_dispute_distribution/sender/
send_task.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::collections::{HashMap, HashSet};
18
19use futures::{Future, FutureExt};
20
21use polkadot_node_network_protocol::{
22	request_response::{
23		outgoing::RequestError,
24		v1::{DisputeRequest, DisputeResponse},
25		OutgoingRequest, OutgoingResult, Recipient, Requests,
26	},
27	IfDisconnected,
28};
29use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, overseer};
30use polkadot_node_subsystem_util::{metrics, nesting_sender::NestingSender, runtime::RuntimeInfo};
31use polkadot_primitives::{
32	AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex,
33};
34
35use super::error::{FatalError, Result};
36
37use crate::{
38	metrics::{FAILED, SUCCEEDED},
39	Metrics, LOG_TARGET,
40};
41
42/// Delivery status for a particular dispute.
43///
44/// Keeps track of all the validators that have to be reached for a dispute.
45///
46/// The unit of work for a `SendTask` is an authority/validator.
47pub struct SendTask<M> {
48	/// The request we are supposed to get out to all `parachain` validators of the dispute's
49	/// session and to all current authorities.
50	request: DisputeRequest,
51
52	/// The set of authorities we need to send our messages to. This set will change at session
53	/// boundaries. It will always be at least the `parachain` validators of the session where the
54	/// dispute happened and the authorities of the current sessions as determined by active heads.
55	deliveries: HashMap<AuthorityDiscoveryId, DeliveryStatus>,
56
57	/// Whether we have any tasks failed since the last refresh.
58	has_failed_sends: bool,
59
60	/// Sender to be cloned for tasks.
61	tx: NestingSender<M, TaskFinish>,
62}
63
64/// Status of a particular vote/statement delivery to a particular validator.
65enum DeliveryStatus {
66	/// Request is still in flight.
67	Pending,
68	/// Succeeded - no need to send request to this peer anymore.
69	Succeeded,
70}
71
72/// A sending task finishes with this result:
73#[derive(Debug)]
74pub struct TaskFinish {
75	/// The candidate this task was running for.
76	pub candidate_hash: CandidateHash,
77	/// The authority the request was sent to.
78	pub receiver: AuthorityDiscoveryId,
79	/// The result of the delivery attempt.
80	pub result: TaskResult,
81}
82
83#[derive(Debug)]
84pub enum TaskResult {
85	/// Task succeeded in getting the request to its peer.
86	Succeeded,
87	/// Task was not able to get the request out to its peer.
88	///
89	/// It should be retried in that case.
90	Failed(RequestError),
91}
92
93impl TaskResult {
94	pub fn as_metrics_label(&self) -> &'static str {
95		match self {
96			Self::Succeeded => SUCCEEDED,
97			Self::Failed(_) => FAILED,
98		}
99	}
100}
101
102#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
103impl<M: 'static + Send + Sync> SendTask<M> {
104	/// Initiates sending a dispute message to peers.
105	///
106	/// Creation of new `SendTask`s is subject to rate limiting. As each `SendTask` will trigger
107	/// sending a message to each validator, hence for employing a per-peer rate limit, we need to
108	/// limit the construction of new `SendTask`s.
109	pub async fn new<Context>(
110		ctx: &mut Context,
111		runtime: &mut RuntimeInfo,
112		active_sessions: &HashMap<SessionIndex, Hash>,
113		tx: NestingSender<M, TaskFinish>,
114		request: DisputeRequest,
115		metrics: &Metrics,
116	) -> Result<Self> {
117		let mut send_task =
118			Self { request, deliveries: HashMap::new(), has_failed_sends: false, tx };
119		send_task.refresh_sends(ctx, runtime, active_sessions, metrics).await?;
120		Ok(send_task)
121	}
122
123	/// Make sure we are sending to all relevant authorities.
124	///
125	/// This function is called at construction and should also be called whenever a session change
126	/// happens and on a regular basis to ensure we are retrying failed attempts.
127	///
128	/// This might resend to validators and is thus subject to any rate limiting we might want.
129	/// Calls to this function for different instances should be rate limited according to
130	/// `SEND_RATE_LIMIT`.
131	///
132	/// Returns: `True` if this call resulted in new requests.
133	pub async fn refresh_sends<Context>(
134		&mut self,
135		ctx: &mut Context,
136		runtime: &mut RuntimeInfo,
137		active_sessions: &HashMap<SessionIndex, Hash>,
138		metrics: &Metrics,
139	) -> Result<bool> {
140		let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?;
141
142		// Note this will also contain all authorities for which sending failed previously:
143		let add_authorities: Vec<_> = new_authorities
144			.iter()
145			.filter(|a| !self.deliveries.contains_key(a))
146			.map(Clone::clone)
147			.collect();
148
149		// Get rid of dead/irrelevant tasks/statuses:
150		gum::trace!(
151			target: LOG_TARGET,
152			already_running_deliveries = ?self.deliveries.len(),
153			"Cleaning up deliveries"
154		);
155		self.deliveries.retain(|k, _| new_authorities.contains(k));
156
157		// Start any new tasks that are needed:
158		gum::trace!(
159			target: LOG_TARGET,
160			new_and_failed_authorities = ?add_authorities.len(),
161			overall_authority_set_size = ?new_authorities.len(),
162			already_running_deliveries = ?self.deliveries.len(),
163			"Starting new send requests for authorities."
164		);
165		let new_statuses =
166			send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics)
167				.await?;
168
169		let was_empty = new_statuses.is_empty();
170		gum::trace!(
171			target: LOG_TARGET,
172			sent_requests = ?new_statuses.len(),
173			"Requests dispatched."
174		);
175
176		self.has_failed_sends = false;
177		self.deliveries.extend(new_statuses.into_iter());
178		Ok(!was_empty)
179	}
180
181	/// Whether any sends have failed since the last refresh.
182	pub fn has_failed_sends(&self) -> bool {
183		self.has_failed_sends
184	}
185
186	/// Handle a finished response waiting task.
187	///
188	/// Called by `DisputeSender` upon reception of the corresponding message from our spawned
189	/// `wait_response_task`.
190	pub fn on_finished_send(&mut self, authority: &AuthorityDiscoveryId, result: TaskResult) {
191		match result {
192			TaskResult::Failed(err) => {
193				gum::trace!(
194					target: LOG_TARGET,
195					?authority,
196					candidate_hash = %self.request.0.candidate_receipt.hash(),
197					%err,
198					"Error sending dispute statements to node."
199				);
200
201				self.has_failed_sends = true;
202				// Remove state, so we know what to try again:
203				self.deliveries.remove(authority);
204			},
205			TaskResult::Succeeded => {
206				let status = match self.deliveries.get_mut(&authority) {
207					None => {
208						// Can happen when a sending became irrelevant while the response was
209						// already queued.
210						gum::debug!(
211							target: LOG_TARGET,
212							candidate = ?self.request.0.candidate_receipt.hash(),
213							?authority,
214							?result,
215							"Received `FromSendingTask::Finished` for non existing task."
216						);
217						return
218					},
219					Some(status) => status,
220				};
221				// We are done here:
222				*status = DeliveryStatus::Succeeded;
223			},
224		}
225	}
226
227	/// Determine all validators that should receive the given dispute requests.
228	///
229	/// This is all `parachain` validators of the session the candidate occurred and all authorities
230	/// of all currently active sessions, determined by currently active heads.
231	async fn get_relevant_validators<Context>(
232		&self,
233		ctx: &mut Context,
234		runtime: &mut RuntimeInfo,
235		active_sessions: &HashMap<SessionIndex, Hash>,
236	) -> Result<HashSet<AuthorityDiscoveryId>> {
237		let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent();
238		// Retrieve all authorities which participated in the parachain consensus of the session
239		// in which the candidate was backed.
240		let info = runtime
241			.get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
242			.await?;
243		let session_info = &info.session_info;
244		let validator_count = session_info.validators.len();
245		let mut authorities: HashSet<_> = session_info
246			.discovery_keys
247			.iter()
248			.take(validator_count)
249			.enumerate()
250			.filter(|(i, _)| Some(ValidatorIndex(*i as _)) != info.validator_info.our_index)
251			.map(|(_, v)| v.clone())
252			.collect();
253
254		// Retrieve all authorities for the current session as indicated by the active
255		// heads we are tracking.
256		for (session_index, head) in active_sessions.iter() {
257			let info =
258				runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
259			let session_info = &info.session_info;
260			let new_set = session_info
261				.discovery_keys
262				.iter()
263				.enumerate()
264				.filter(|(i, _)| Some(ValidatorIndex(*i as _)) != info.validator_info.our_index)
265				.map(|(_, v)| v.clone());
266			authorities.extend(new_set);
267		}
268		Ok(authorities)
269	}
270}
271
272/// Start sending of the given message to all given authorities.
273///
274/// And spawn tasks for handling the response.
275#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)]
276async fn send_requests<Context, M: 'static + Send + Sync>(
277	ctx: &mut Context,
278	tx: NestingSender<M, TaskFinish>,
279	receivers: Vec<AuthorityDiscoveryId>,
280	req: DisputeRequest,
281	metrics: &Metrics,
282) -> Result<HashMap<AuthorityDiscoveryId, DeliveryStatus>> {
283	let mut statuses = HashMap::with_capacity(receivers.len());
284	let mut reqs = Vec::with_capacity(receivers.len());
285
286	for receiver in receivers {
287		let (outgoing, pending_response) =
288			OutgoingRequest::new(Recipient::Authority(receiver.clone()), req.clone());
289
290		reqs.push(Requests::DisputeSendingV1(outgoing));
291
292		let fut = wait_response_task(
293			pending_response,
294			req.0.candidate_receipt.hash(),
295			receiver.clone(),
296			tx.clone(),
297			metrics.time_dispute_request(),
298		);
299
300		ctx.spawn("dispute-sender", fut.boxed()).map_err(FatalError::SpawnTask)?;
301		statuses.insert(receiver, DeliveryStatus::Pending);
302	}
303
304	let msg = NetworkBridgeTxMessage::SendRequests(reqs, IfDisconnected::ImmediateError);
305	ctx.send_message(msg).await;
306	Ok(statuses)
307}
308
309/// Future to be spawned in a task for awaiting a response.
310async fn wait_response_task<M: 'static + Send + Sync>(
311	pending_response: impl Future<Output = OutgoingResult<DisputeResponse>>,
312	candidate_hash: CandidateHash,
313	receiver: AuthorityDiscoveryId,
314	mut tx: NestingSender<M, TaskFinish>,
315	_timer: Option<metrics::prometheus::prometheus::HistogramTimer>,
316) {
317	let result = pending_response.await;
318	let msg = match result {
319		Err(err) => TaskFinish { candidate_hash, receiver, result: TaskResult::Failed(err) },
320		Ok(DisputeResponse::Confirmed) =>
321			TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded },
322	};
323	if let Err(err) = tx.send_message(msg).await {
324		gum::debug!(
325			target: LOG_TARGET,
326			%err,
327			"Failed to notify subsystem about dispute sending result."
328		);
329	}
330}