referrerpolicy=no-referrer-when-downgrade

polkadot_availability_distribution/requester/fetch_task/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::collections::HashSet;
18
19use futures::{
20	channel::{mpsc, oneshot},
21	future::select,
22	FutureExt, SinkExt,
23};
24
25use codec::Decode;
26use polkadot_erasure_coding::branch_hash;
27use polkadot_node_network_protocol::request_response::{
28	outgoing::{OutgoingRequest, Recipient, RequestError, Requests},
29	v1::{self, ChunkResponse},
30	v2,
31};
32use polkadot_node_primitives::ErasureChunk;
33use polkadot_node_subsystem::{
34	messages::{AvailabilityStoreMessage, IfDisconnected, NetworkBridgeTxMessage},
35	overseer,
36};
37use polkadot_primitives::{
38	AuthorityDiscoveryId, BlakeTwo256, CandidateHash, ChunkIndex, GroupIndex, Hash, HashT,
39	OccupiedCore, SessionIndex,
40};
41use sc_network::ProtocolName;
42
43use crate::{
44	error::{FatalError, Result},
45	metrics::{Metrics, FAILED, SUCCEEDED},
46	requester::session_cache::{BadValidators, SessionInfo},
47	LOG_TARGET,
48};
49
50#[cfg(test)]
51mod tests;
52
53/// Configuration for a `FetchTask`
54///
55/// This exists to separate preparation of a `FetchTask` from actual starting it, which is
56/// beneficial as this allows as for taking session info by reference.
57pub struct FetchTaskConfig {
58	prepared_running: Option<RunningTask>,
59	live_in: HashSet<Hash>,
60}
61
62/// Information about a task fetching an erasure chunk.
63pub struct FetchTask {
64	/// For what relay parents this task is relevant.
65	///
66	/// In other words, for which relay chain parents this candidate is considered live.
67	/// This is updated on every `ActiveLeavesUpdate` and enables us to know when we can safely
68	/// stop keeping track of that candidate/chunk.
69	pub(crate) live_in: HashSet<Hash>,
70
71	/// We keep the task around in until `live_in` becomes empty, to make
72	/// sure we won't re-fetch an already fetched candidate.
73	state: FetchedState,
74}
75
76/// State of a particular candidate chunk fetching process.
77enum FetchedState {
78	/// Chunk fetch has started.
79	///
80	/// Once the contained `Sender` is dropped, any still running task will be canceled.
81	Started(oneshot::Sender<()>),
82	/// All relevant `live_in` have been removed, before we were able to get our chunk.
83	Canceled,
84}
85
86/// Messages sent from `FetchTask`s to be handled/forwarded.
87pub enum FromFetchTask {
88	/// Message to other subsystem.
89	Message(overseer::AvailabilityDistributionOutgoingMessages),
90
91	/// Concluded with result.
92	///
93	/// In case of `None` everything was fine, in case of `Some`, some validators in the group
94	/// did not serve us our chunk as expected.
95	Concluded(Option<BadValidators>),
96
97	/// We were not able to fetch the desired chunk for the given `CandidateHash`.
98	Failed(CandidateHash),
99}
100
101/// Information a running task needs.
102struct RunningTask {
103	/// For what session we have been spawned.
104	session_index: SessionIndex,
105
106	/// Index of validator group to fetch the chunk from.
107	///
108	/// Needed for reporting bad validators.
109	group_index: GroupIndex,
110
111	/// Validators to request the chunk from.
112	///
113	/// This vector gets drained during execution of the task (it will be empty afterwards).
114	group: Vec<AuthorityDiscoveryId>,
115
116	/// The request to send. We can store it as either v1 or v2, they have the same payload.
117	request: v2::ChunkFetchingRequest,
118
119	/// Root hash, for verifying the chunks validity.
120	erasure_root: Hash,
121
122	/// Relay parent of the candidate to fetch.
123	relay_parent: Hash,
124
125	/// Sender for communicating with other subsystems and reporting results.
126	sender: mpsc::Sender<FromFetchTask>,
127
128	/// Prometheus metrics for reporting results.
129	metrics: Metrics,
130
131	/// Expected chunk index. We'll validate that the remote did send us the correct chunk (only
132	/// important for v2 requests).
133	chunk_index: ChunkIndex,
134
135	/// Full protocol name for ChunkFetchingV1.
136	req_v1_protocol_name: ProtocolName,
137
138	/// Full protocol name for ChunkFetchingV2.
139	req_v2_protocol_name: ProtocolName,
140}
141
142impl FetchTaskConfig {
143	/// Create a new configuration for a [`FetchTask`].
144	///
145	/// The result of this function can be passed into [`FetchTask::start`].
146	pub fn new(
147		leaf: Hash,
148		core: &OccupiedCore,
149		sender: mpsc::Sender<FromFetchTask>,
150		metrics: Metrics,
151		session_info: &SessionInfo,
152		chunk_index: ChunkIndex,
153		req_v1_protocol_name: ProtocolName,
154		req_v2_protocol_name: ProtocolName,
155	) -> Self {
156		let live_in = vec![leaf].into_iter().collect();
157
158		// Don't run tasks for our backing group:
159		if session_info.our_group == Some(core.group_responsible) {
160			return FetchTaskConfig { live_in, prepared_running: None }
161		}
162
163		let prepared_running = RunningTask {
164			session_index: session_info.session_index,
165			group_index: core.group_responsible,
166			group: session_info.validator_groups.get(core.group_responsible.0 as usize)
167				.expect("The responsible group of a candidate should be available in the corresponding session. qed.")
168				.clone(),
169			request: v2::ChunkFetchingRequest {
170				candidate_hash: core.candidate_hash,
171				index: session_info.our_index,
172			},
173			erasure_root: core.candidate_descriptor.erasure_root(),
174			relay_parent: core.candidate_descriptor.relay_parent(),
175			metrics,
176			sender,
177			chunk_index,
178			req_v1_protocol_name,
179			req_v2_protocol_name
180		};
181		FetchTaskConfig { live_in, prepared_running: Some(prepared_running) }
182	}
183}
184
185#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
186impl FetchTask {
187	/// Start fetching a chunk.
188	///
189	/// A task handling the fetching of the configured chunk will be spawned.
190	pub async fn start<Context>(config: FetchTaskConfig, ctx: &mut Context) -> Result<Self> {
191		let FetchTaskConfig { prepared_running, live_in } = config;
192
193		if let Some(running) = prepared_running {
194			let (handle, kill) = oneshot::channel();
195
196			ctx.spawn("chunk-fetcher", running.run(kill).boxed())
197				.map_err(|e| FatalError::SpawnTask(e))?;
198
199			Ok(FetchTask { live_in, state: FetchedState::Started(handle) })
200		} else {
201			Ok(FetchTask { live_in, state: FetchedState::Canceled })
202		}
203	}
204
205	/// Add the given leaf to the relay parents which are making this task relevant.
206	///
207	/// This is for book keeping, so we know we are already fetching a given chunk.
208	pub fn add_leaf(&mut self, leaf: Hash) {
209		self.live_in.insert(leaf);
210	}
211
212	/// Remove leaves and cancel the task, if it was the last one and the task has still been
213	/// fetching.
214	pub fn remove_leaves(&mut self, leaves: &HashSet<Hash>) {
215		for leaf in leaves {
216			self.live_in.remove(leaf);
217		}
218		if self.live_in.is_empty() && !self.is_finished() {
219			self.state = FetchedState::Canceled
220		}
221	}
222
223	/// Whether there are still relay parents around with this candidate pending
224	/// availability.
225	pub fn is_live(&self) -> bool {
226		!self.live_in.is_empty()
227	}
228
229	/// Whether this task can be considered finished.
230	///
231	/// That is, it is either canceled, succeeded or failed.
232	pub fn is_finished(&self) -> bool {
233		match &self.state {
234			FetchedState::Canceled => true,
235			FetchedState::Started(sender) => sender.is_canceled(),
236		}
237	}
238}
239
240/// Things that can go wrong in task execution.
241#[derive(Debug)]
242enum TaskError {
243	/// The peer failed to deliver a correct chunk for some reason (has been reported as
244	/// appropriate).
245	PeerError,
246	/// This very node is seemingly shutting down (sending of message failed).
247	ShuttingDown,
248}
249
250impl RunningTask {
251	async fn run(self, kill: oneshot::Receiver<()>) {
252		// Wait for completion/or cancel.
253		let run_it = self.run_inner();
254		futures::pin_mut!(run_it);
255		let _ = select(run_it, kill).await;
256	}
257
258	/// Fetch and store chunk.
259	///
260	/// Try validators in backing group in order.
261	async fn run_inner(mut self) {
262		let mut bad_validators = Vec::new();
263		let mut succeeded = false;
264		let mut count: u32 = 0;
265		let mut network_error_freq = gum::Freq::new();
266		let mut canceled_freq = gum::Freq::new();
267		// Try validators in reverse order:
268		while let Some(validator) = self.group.pop() {
269			// Report retries:
270			if count > 0 {
271				self.metrics.on_retry();
272			}
273			count += 1;
274
275			// Send request:
276			let resp = match self
277				.do_request(&validator, &mut network_error_freq, &mut canceled_freq)
278				.await
279			{
280				Ok(resp) => resp,
281				Err(TaskError::ShuttingDown) => {
282					gum::info!(
283						target: LOG_TARGET,
284						"Node seems to be shutting down, canceling fetch task"
285					);
286					self.metrics.on_fetch(FAILED);
287					return
288				},
289				Err(TaskError::PeerError) => {
290					bad_validators.push(validator);
291					continue
292				},
293			};
294
295			let chunk = match resp {
296				Some(chunk) => chunk,
297				None => {
298					gum::debug!(
299						target: LOG_TARGET,
300						validator = ?validator,
301						relay_parent = ?self.relay_parent,
302						group_index = ?self.group_index,
303						session_index = ?self.session_index,
304						chunk_index = ?self.request.index,
305						candidate_hash = ?self.request.candidate_hash,
306						"Validator did not have our chunk"
307					);
308					bad_validators.push(validator);
309					continue
310				},
311			};
312
313			// Data genuine?
314			if !self.validate_chunk(&validator, &chunk, self.chunk_index) {
315				bad_validators.push(validator);
316				continue
317			}
318
319			// Ok, let's store it and be happy:
320			self.store_chunk(chunk).await;
321			succeeded = true;
322			break
323		}
324		if succeeded {
325			self.metrics.on_fetch(SUCCEEDED);
326			self.conclude(bad_validators).await;
327		} else {
328			self.metrics.on_fetch(FAILED);
329			self.conclude_fail().await
330		}
331	}
332
333	/// Do request and return response, if successful.
334	async fn do_request(
335		&mut self,
336		validator: &AuthorityDiscoveryId,
337		network_error_freq: &mut gum::Freq,
338		canceled_freq: &mut gum::Freq,
339	) -> std::result::Result<Option<ErasureChunk>, TaskError> {
340		gum::trace!(
341			target: LOG_TARGET,
342			origin = ?validator,
343			relay_parent = ?self.relay_parent,
344			group_index = ?self.group_index,
345			session_index = ?self.session_index,
346			chunk_index = ?self.request.index,
347			candidate_hash = ?self.request.candidate_hash,
348			"Starting chunk request",
349		);
350
351		let (full_request, response_recv) = OutgoingRequest::new_with_fallback(
352			Recipient::Authority(validator.clone()),
353			self.request,
354			// Fallback to v1, for backwards compatibility.
355			v1::ChunkFetchingRequest::from(self.request),
356		);
357		let requests = Requests::ChunkFetching(full_request);
358
359		self.sender
360			.send(FromFetchTask::Message(
361				NetworkBridgeTxMessage::SendRequests(
362					vec![requests],
363					IfDisconnected::ImmediateError,
364				)
365				.into(),
366			))
367			.await
368			.map_err(|_| TaskError::ShuttingDown)?;
369
370		match response_recv.await {
371			Ok((bytes, protocol)) => match protocol {
372				_ if protocol == self.req_v2_protocol_name =>
373					match v2::ChunkFetchingResponse::decode(&mut &bytes[..]) {
374						Ok(chunk_response) => Ok(Option::<ErasureChunk>::from(chunk_response)),
375						Err(e) => {
376							gum::warn!(
377								target: LOG_TARGET,
378								origin = ?validator,
379								relay_parent = ?self.relay_parent,
380								group_index = ?self.group_index,
381								session_index = ?self.session_index,
382								chunk_index = ?self.request.index,
383								candidate_hash = ?self.request.candidate_hash,
384								err = ?e,
385								"Peer sent us invalid erasure chunk data (v2)"
386							);
387							Err(TaskError::PeerError)
388						},
389					},
390				_ if protocol == self.req_v1_protocol_name =>
391					match v1::ChunkFetchingResponse::decode(&mut &bytes[..]) {
392						Ok(chunk_response) => Ok(Option::<ChunkResponse>::from(chunk_response)
393							.map(|c| c.recombine_into_chunk(&self.request.into()))),
394						Err(e) => {
395							gum::warn!(
396								target: LOG_TARGET,
397								origin = ?validator,
398								relay_parent = ?self.relay_parent,
399								group_index = ?self.group_index,
400								session_index = ?self.session_index,
401								chunk_index = ?self.request.index,
402								candidate_hash = ?self.request.candidate_hash,
403								err = ?e,
404								"Peer sent us invalid erasure chunk data"
405							);
406							Err(TaskError::PeerError)
407						},
408					},
409				_ => {
410					gum::warn!(
411						target: LOG_TARGET,
412						origin = ?validator,
413						relay_parent = ?self.relay_parent,
414						group_index = ?self.group_index,
415						session_index = ?self.session_index,
416						chunk_index = ?self.request.index,
417						candidate_hash = ?self.request.candidate_hash,
418						"Peer sent us invalid erasure chunk data - unknown protocol"
419					);
420					Err(TaskError::PeerError)
421				},
422			},
423			Err(RequestError::InvalidResponse(err)) => {
424				gum::warn!(
425					target: LOG_TARGET,
426					origin = ?validator,
427					relay_parent = ?self.relay_parent,
428					group_index = ?self.group_index,
429					session_index = ?self.session_index,
430					chunk_index = ?self.request.index,
431					candidate_hash = ?self.request.candidate_hash,
432					err = ?err,
433					"Peer sent us invalid erasure chunk data"
434				);
435				Err(TaskError::PeerError)
436			},
437			Err(RequestError::NetworkError(err)) => {
438				gum::warn_if_frequent!(
439					freq: network_error_freq,
440					max_rate: gum::Times::PerHour(100),
441					target: LOG_TARGET,
442					origin = ?validator,
443					relay_parent = ?self.relay_parent,
444					group_index = ?self.group_index,
445					session_index = ?self.session_index,
446					chunk_index = ?self.request.index,
447					candidate_hash = ?self.request.candidate_hash,
448					err = ?err,
449					"Some network error occurred when fetching erasure chunk"
450				);
451				Err(TaskError::PeerError)
452			},
453			Err(RequestError::Canceled(oneshot::Canceled)) => {
454				gum::warn_if_frequent!(
455					freq: canceled_freq,
456					max_rate: gum::Times::PerHour(100),
457					target: LOG_TARGET,
458					origin = ?validator,
459					relay_parent = ?self.relay_parent,
460					group_index = ?self.group_index,
461					session_index = ?self.session_index,
462					chunk_index = ?self.request.index,
463					candidate_hash = ?self.request.candidate_hash,
464					"Erasure chunk request got canceled"
465				);
466				Err(TaskError::PeerError)
467			},
468		}
469	}
470
471	fn validate_chunk(
472		&self,
473		validator: &AuthorityDiscoveryId,
474		chunk: &ErasureChunk,
475		expected_chunk_index: ChunkIndex,
476	) -> bool {
477		if chunk.index != expected_chunk_index {
478			gum::warn!(
479				target: LOG_TARGET,
480				candidate_hash = ?self.request.candidate_hash,
481				origin = ?validator,
482				chunk_index = ?chunk.index,
483				expected_chunk_index = ?expected_chunk_index,
484				"Validator sent the wrong chunk",
485			);
486			return false
487		}
488		let anticipated_hash =
489			match branch_hash(&self.erasure_root, chunk.proof(), chunk.index.0 as usize) {
490				Ok(hash) => hash,
491				Err(e) => {
492					gum::warn!(
493						target: LOG_TARGET,
494						candidate_hash = ?self.request.candidate_hash,
495						origin = ?validator,
496						error = ?e,
497						"Failed to calculate chunk merkle proof",
498					);
499					return false
500				},
501			};
502		let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
503		if anticipated_hash != erasure_chunk_hash {
504			gum::warn!(target: LOG_TARGET, origin = ?validator,  "Received chunk does not match merkle tree");
505			return false
506		}
507		true
508	}
509
510	/// Store given chunk and log any error.
511	async fn store_chunk(&mut self, chunk: ErasureChunk) {
512		let (tx, rx) = oneshot::channel();
513		let r = self
514			.sender
515			.send(FromFetchTask::Message(
516				AvailabilityStoreMessage::StoreChunk {
517					candidate_hash: self.request.candidate_hash,
518					chunk,
519					validator_index: self.request.index,
520					tx,
521				}
522				.into(),
523			))
524			.await;
525		if let Err(err) = r {
526			gum::error!(target: LOG_TARGET, err= ?err, "Storing erasure chunk failed, system shutting down?");
527		}
528
529		if let Err(oneshot::Canceled) = rx.await {
530			gum::error!(target: LOG_TARGET, "Storing erasure chunk failed");
531		}
532	}
533
534	/// Tell subsystem we are done.
535	async fn conclude(&mut self, bad_validators: Vec<AuthorityDiscoveryId>) {
536		let payload = if bad_validators.is_empty() {
537			None
538		} else {
539			Some(BadValidators {
540				session_index: self.session_index,
541				group_index: self.group_index,
542				bad_validators,
543			})
544		};
545		if let Err(err) = self.sender.send(FromFetchTask::Concluded(payload)).await {
546			gum::warn!(
547				target: LOG_TARGET,
548				err= ?err,
549				"Sending concluded message for task failed"
550			);
551		}
552	}
553
554	async fn conclude_fail(&mut self) {
555		if let Err(err) = self.sender.send(FromFetchTask::Failed(self.request.candidate_hash)).await
556		{
557			gum::warn!(target: LOG_TARGET, ?err, "Sending `Failed` message for task failed");
558		}
559	}
560}