1use std::collections::HashSet;
18
19use futures::{
20 channel::{mpsc, oneshot},
21 future::select,
22 FutureExt, SinkExt,
23};
24
25use codec::Decode;
26use polkadot_erasure_coding::branch_hash;
27use polkadot_node_network_protocol::request_response::{
28 outgoing::{OutgoingRequest, Recipient, RequestError, Requests},
29 v1::{self, ChunkResponse},
30 v2,
31};
32use polkadot_node_primitives::ErasureChunk;
33use polkadot_node_subsystem::{
34 messages::{AvailabilityStoreMessage, IfDisconnected, NetworkBridgeTxMessage},
35 overseer,
36};
37use polkadot_primitives::{
38 AuthorityDiscoveryId, BlakeTwo256, CandidateHash, ChunkIndex, GroupIndex, Hash, HashT,
39 OccupiedCore, SessionIndex,
40};
41use sc_network::ProtocolName;
42
43use crate::{
44 error::{FatalError, Result},
45 metrics::{Metrics, FAILED, SUCCEEDED},
46 requester::session_cache::{BadValidators, SessionInfo},
47 LOG_TARGET,
48};
49
50#[cfg(test)]
51mod tests;
52
53pub struct FetchTaskConfig {
58 prepared_running: Option<RunningTask>,
59 live_in: HashSet<Hash>,
60}
61
62pub struct FetchTask {
64 pub(crate) live_in: HashSet<Hash>,
70
71 state: FetchedState,
74}
75
76enum FetchedState {
78 Started(oneshot::Sender<()>),
82 Canceled,
84}
85
86pub enum FromFetchTask {
88 Message(overseer::AvailabilityDistributionOutgoingMessages),
90
91 Concluded(Option<BadValidators>),
96
97 Failed(CandidateHash),
99}
100
101struct RunningTask {
103 session_index: SessionIndex,
105
106 group_index: GroupIndex,
110
111 group: Vec<AuthorityDiscoveryId>,
115
116 request: v2::ChunkFetchingRequest,
118
119 erasure_root: Hash,
121
122 relay_parent: Hash,
124
125 sender: mpsc::Sender<FromFetchTask>,
127
128 metrics: Metrics,
130
131 chunk_index: ChunkIndex,
134
135 req_v1_protocol_name: ProtocolName,
137
138 req_v2_protocol_name: ProtocolName,
140}
141
142impl FetchTaskConfig {
143 pub fn new(
147 leaf: Hash,
148 core: &OccupiedCore,
149 sender: mpsc::Sender<FromFetchTask>,
150 metrics: Metrics,
151 session_info: &SessionInfo,
152 chunk_index: ChunkIndex,
153 req_v1_protocol_name: ProtocolName,
154 req_v2_protocol_name: ProtocolName,
155 ) -> Self {
156 let live_in = vec![leaf].into_iter().collect();
157
158 if session_info.our_group == Some(core.group_responsible) {
160 return FetchTaskConfig { live_in, prepared_running: None }
161 }
162
163 let prepared_running = RunningTask {
164 session_index: session_info.session_index,
165 group_index: core.group_responsible,
166 group: session_info.validator_groups.get(core.group_responsible.0 as usize)
167 .expect("The responsible group of a candidate should be available in the corresponding session. qed.")
168 .clone(),
169 request: v2::ChunkFetchingRequest {
170 candidate_hash: core.candidate_hash,
171 index: session_info.our_index,
172 },
173 erasure_root: core.candidate_descriptor.erasure_root(),
174 relay_parent: core.candidate_descriptor.relay_parent(),
175 metrics,
176 sender,
177 chunk_index,
178 req_v1_protocol_name,
179 req_v2_protocol_name
180 };
181 FetchTaskConfig { live_in, prepared_running: Some(prepared_running) }
182 }
183}
184
185#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
186impl FetchTask {
187 pub async fn start<Context>(config: FetchTaskConfig, ctx: &mut Context) -> Result<Self> {
191 let FetchTaskConfig { prepared_running, live_in } = config;
192
193 if let Some(running) = prepared_running {
194 let (handle, kill) = oneshot::channel();
195
196 ctx.spawn("chunk-fetcher", running.run(kill).boxed())
197 .map_err(|e| FatalError::SpawnTask(e))?;
198
199 Ok(FetchTask { live_in, state: FetchedState::Started(handle) })
200 } else {
201 Ok(FetchTask { live_in, state: FetchedState::Canceled })
202 }
203 }
204
205 pub fn add_leaf(&mut self, leaf: Hash) {
209 self.live_in.insert(leaf);
210 }
211
212 pub fn remove_leaves(&mut self, leaves: &HashSet<Hash>) {
215 for leaf in leaves {
216 self.live_in.remove(leaf);
217 }
218 if self.live_in.is_empty() && !self.is_finished() {
219 self.state = FetchedState::Canceled
220 }
221 }
222
223 pub fn is_live(&self) -> bool {
226 !self.live_in.is_empty()
227 }
228
229 pub fn is_finished(&self) -> bool {
233 match &self.state {
234 FetchedState::Canceled => true,
235 FetchedState::Started(sender) => sender.is_canceled(),
236 }
237 }
238}
239
240#[derive(Debug)]
242enum TaskError {
243 PeerError,
246 ShuttingDown,
248}
249
250impl RunningTask {
251 async fn run(self, kill: oneshot::Receiver<()>) {
252 let run_it = self.run_inner();
254 futures::pin_mut!(run_it);
255 let _ = select(run_it, kill).await;
256 }
257
258 async fn run_inner(mut self) {
262 let mut bad_validators = Vec::new();
263 let mut succeeded = false;
264 let mut count: u32 = 0;
265 let mut network_error_freq = gum::Freq::new();
266 let mut canceled_freq = gum::Freq::new();
267 while let Some(validator) = self.group.pop() {
269 if count > 0 {
271 self.metrics.on_retry();
272 }
273 count += 1;
274
275 let resp = match self
277 .do_request(&validator, &mut network_error_freq, &mut canceled_freq)
278 .await
279 {
280 Ok(resp) => resp,
281 Err(TaskError::ShuttingDown) => {
282 gum::info!(
283 target: LOG_TARGET,
284 "Node seems to be shutting down, canceling fetch task"
285 );
286 self.metrics.on_fetch(FAILED);
287 return
288 },
289 Err(TaskError::PeerError) => {
290 bad_validators.push(validator);
291 continue
292 },
293 };
294
295 let chunk = match resp {
296 Some(chunk) => chunk,
297 None => {
298 gum::debug!(
299 target: LOG_TARGET,
300 validator = ?validator,
301 relay_parent = ?self.relay_parent,
302 group_index = ?self.group_index,
303 session_index = ?self.session_index,
304 chunk_index = ?self.request.index,
305 candidate_hash = ?self.request.candidate_hash,
306 "Validator did not have our chunk"
307 );
308 bad_validators.push(validator);
309 continue
310 },
311 };
312
313 if !self.validate_chunk(&validator, &chunk, self.chunk_index) {
315 bad_validators.push(validator);
316 continue
317 }
318
319 self.store_chunk(chunk).await;
321 succeeded = true;
322 break
323 }
324 if succeeded {
325 self.metrics.on_fetch(SUCCEEDED);
326 self.conclude(bad_validators).await;
327 } else {
328 self.metrics.on_fetch(FAILED);
329 self.conclude_fail().await
330 }
331 }
332
333 async fn do_request(
335 &mut self,
336 validator: &AuthorityDiscoveryId,
337 network_error_freq: &mut gum::Freq,
338 canceled_freq: &mut gum::Freq,
339 ) -> std::result::Result<Option<ErasureChunk>, TaskError> {
340 gum::trace!(
341 target: LOG_TARGET,
342 origin = ?validator,
343 relay_parent = ?self.relay_parent,
344 group_index = ?self.group_index,
345 session_index = ?self.session_index,
346 chunk_index = ?self.request.index,
347 candidate_hash = ?self.request.candidate_hash,
348 "Starting chunk request",
349 );
350
351 let (full_request, response_recv) = OutgoingRequest::new_with_fallback(
352 Recipient::Authority(validator.clone()),
353 self.request,
354 v1::ChunkFetchingRequest::from(self.request),
356 );
357 let requests = Requests::ChunkFetching(full_request);
358
359 self.sender
360 .send(FromFetchTask::Message(
361 NetworkBridgeTxMessage::SendRequests(
362 vec![requests],
363 IfDisconnected::ImmediateError,
364 )
365 .into(),
366 ))
367 .await
368 .map_err(|_| TaskError::ShuttingDown)?;
369
370 match response_recv.await {
371 Ok((bytes, protocol)) => match protocol {
372 _ if protocol == self.req_v2_protocol_name =>
373 match v2::ChunkFetchingResponse::decode(&mut &bytes[..]) {
374 Ok(chunk_response) => Ok(Option::<ErasureChunk>::from(chunk_response)),
375 Err(e) => {
376 gum::warn!(
377 target: LOG_TARGET,
378 origin = ?validator,
379 relay_parent = ?self.relay_parent,
380 group_index = ?self.group_index,
381 session_index = ?self.session_index,
382 chunk_index = ?self.request.index,
383 candidate_hash = ?self.request.candidate_hash,
384 err = ?e,
385 "Peer sent us invalid erasure chunk data (v2)"
386 );
387 Err(TaskError::PeerError)
388 },
389 },
390 _ if protocol == self.req_v1_protocol_name =>
391 match v1::ChunkFetchingResponse::decode(&mut &bytes[..]) {
392 Ok(chunk_response) => Ok(Option::<ChunkResponse>::from(chunk_response)
393 .map(|c| c.recombine_into_chunk(&self.request.into()))),
394 Err(e) => {
395 gum::warn!(
396 target: LOG_TARGET,
397 origin = ?validator,
398 relay_parent = ?self.relay_parent,
399 group_index = ?self.group_index,
400 session_index = ?self.session_index,
401 chunk_index = ?self.request.index,
402 candidate_hash = ?self.request.candidate_hash,
403 err = ?e,
404 "Peer sent us invalid erasure chunk data"
405 );
406 Err(TaskError::PeerError)
407 },
408 },
409 _ => {
410 gum::warn!(
411 target: LOG_TARGET,
412 origin = ?validator,
413 relay_parent = ?self.relay_parent,
414 group_index = ?self.group_index,
415 session_index = ?self.session_index,
416 chunk_index = ?self.request.index,
417 candidate_hash = ?self.request.candidate_hash,
418 "Peer sent us invalid erasure chunk data - unknown protocol"
419 );
420 Err(TaskError::PeerError)
421 },
422 },
423 Err(RequestError::InvalidResponse(err)) => {
424 gum::warn!(
425 target: LOG_TARGET,
426 origin = ?validator,
427 relay_parent = ?self.relay_parent,
428 group_index = ?self.group_index,
429 session_index = ?self.session_index,
430 chunk_index = ?self.request.index,
431 candidate_hash = ?self.request.candidate_hash,
432 err = ?err,
433 "Peer sent us invalid erasure chunk data"
434 );
435 Err(TaskError::PeerError)
436 },
437 Err(RequestError::NetworkError(err)) => {
438 gum::warn_if_frequent!(
439 freq: network_error_freq,
440 max_rate: gum::Times::PerHour(100),
441 target: LOG_TARGET,
442 origin = ?validator,
443 relay_parent = ?self.relay_parent,
444 group_index = ?self.group_index,
445 session_index = ?self.session_index,
446 chunk_index = ?self.request.index,
447 candidate_hash = ?self.request.candidate_hash,
448 err = ?err,
449 "Some network error occurred when fetching erasure chunk"
450 );
451 Err(TaskError::PeerError)
452 },
453 Err(RequestError::Canceled(oneshot::Canceled)) => {
454 gum::warn_if_frequent!(
455 freq: canceled_freq,
456 max_rate: gum::Times::PerHour(100),
457 target: LOG_TARGET,
458 origin = ?validator,
459 relay_parent = ?self.relay_parent,
460 group_index = ?self.group_index,
461 session_index = ?self.session_index,
462 chunk_index = ?self.request.index,
463 candidate_hash = ?self.request.candidate_hash,
464 "Erasure chunk request got canceled"
465 );
466 Err(TaskError::PeerError)
467 },
468 }
469 }
470
471 fn validate_chunk(
472 &self,
473 validator: &AuthorityDiscoveryId,
474 chunk: &ErasureChunk,
475 expected_chunk_index: ChunkIndex,
476 ) -> bool {
477 if chunk.index != expected_chunk_index {
478 gum::warn!(
479 target: LOG_TARGET,
480 candidate_hash = ?self.request.candidate_hash,
481 origin = ?validator,
482 chunk_index = ?chunk.index,
483 expected_chunk_index = ?expected_chunk_index,
484 "Validator sent the wrong chunk",
485 );
486 return false
487 }
488 let anticipated_hash =
489 match branch_hash(&self.erasure_root, chunk.proof(), chunk.index.0 as usize) {
490 Ok(hash) => hash,
491 Err(e) => {
492 gum::warn!(
493 target: LOG_TARGET,
494 candidate_hash = ?self.request.candidate_hash,
495 origin = ?validator,
496 error = ?e,
497 "Failed to calculate chunk merkle proof",
498 );
499 return false
500 },
501 };
502 let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
503 if anticipated_hash != erasure_chunk_hash {
504 gum::warn!(target: LOG_TARGET, origin = ?validator, "Received chunk does not match merkle tree");
505 return false
506 }
507 true
508 }
509
510 async fn store_chunk(&mut self, chunk: ErasureChunk) {
512 let (tx, rx) = oneshot::channel();
513 let r = self
514 .sender
515 .send(FromFetchTask::Message(
516 AvailabilityStoreMessage::StoreChunk {
517 candidate_hash: self.request.candidate_hash,
518 chunk,
519 validator_index: self.request.index,
520 tx,
521 }
522 .into(),
523 ))
524 .await;
525 if let Err(err) = r {
526 gum::error!(target: LOG_TARGET, err= ?err, "Storing erasure chunk failed, system shutting down?");
527 }
528
529 if let Err(oneshot::Canceled) = rx.await {
530 gum::error!(target: LOG_TARGET, "Storing erasure chunk failed");
531 }
532 }
533
534 async fn conclude(&mut self, bad_validators: Vec<AuthorityDiscoveryId>) {
536 let payload = if bad_validators.is_empty() {
537 None
538 } else {
539 Some(BadValidators {
540 session_index: self.session_index,
541 group_index: self.group_index,
542 bad_validators,
543 })
544 };
545 if let Err(err) = self.sender.send(FromFetchTask::Concluded(payload)).await {
546 gum::warn!(
547 target: LOG_TARGET,
548 err= ?err,
549 "Sending concluded message for task failed"
550 );
551 }
552 }
553
554 async fn conclude_fail(&mut self) {
555 if let Err(err) = self.sender.send(FromFetchTask::Failed(self.request.candidate_hash)).await
556 {
557 gum::warn!(target: LOG_TARGET, ?err, "Sending `Failed` message for task failed");
558 }
559 }
560}