use std::collections::HashSet;
use futures::{
channel::{mpsc, oneshot},
future::select,
FutureExt, SinkExt,
};
use codec::Decode;
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
outgoing::{OutgoingRequest, Recipient, RequestError, Requests},
v1::{self, ChunkResponse},
v2,
};
use polkadot_node_primitives::ErasureChunk;
use polkadot_node_subsystem::{
messages::{AvailabilityStoreMessage, IfDisconnected, NetworkBridgeTxMessage},
overseer,
};
use polkadot_primitives::{
vstaging::OccupiedCore, AuthorityDiscoveryId, BlakeTwo256, CandidateHash, ChunkIndex,
GroupIndex, Hash, HashT, SessionIndex,
};
use sc_network::ProtocolName;
use crate::{
error::{FatalError, Result},
metrics::{Metrics, FAILED, SUCCEEDED},
requester::session_cache::{BadValidators, SessionInfo},
LOG_TARGET,
};
#[cfg(test)]
mod tests;
pub struct FetchTaskConfig {
prepared_running: Option<RunningTask>,
live_in: HashSet<Hash>,
}
pub struct FetchTask {
pub(crate) live_in: HashSet<Hash>,
state: FetchedState,
}
enum FetchedState {
Started(oneshot::Sender<()>),
Canceled,
}
pub enum FromFetchTask {
Message(overseer::AvailabilityDistributionOutgoingMessages),
Concluded(Option<BadValidators>),
Failed(CandidateHash),
}
struct RunningTask {
session_index: SessionIndex,
group_index: GroupIndex,
group: Vec<AuthorityDiscoveryId>,
request: v2::ChunkFetchingRequest,
erasure_root: Hash,
relay_parent: Hash,
sender: mpsc::Sender<FromFetchTask>,
metrics: Metrics,
chunk_index: ChunkIndex,
req_v1_protocol_name: ProtocolName,
req_v2_protocol_name: ProtocolName,
}
impl FetchTaskConfig {
pub fn new(
leaf: Hash,
core: &OccupiedCore,
sender: mpsc::Sender<FromFetchTask>,
metrics: Metrics,
session_info: &SessionInfo,
chunk_index: ChunkIndex,
req_v1_protocol_name: ProtocolName,
req_v2_protocol_name: ProtocolName,
) -> Self {
let live_in = vec![leaf].into_iter().collect();
if session_info.our_group == Some(core.group_responsible) {
return FetchTaskConfig { live_in, prepared_running: None }
}
let prepared_running = RunningTask {
session_index: session_info.session_index,
group_index: core.group_responsible,
group: session_info.validator_groups.get(core.group_responsible.0 as usize)
.expect("The responsible group of a candidate should be available in the corresponding session. qed.")
.clone(),
request: v2::ChunkFetchingRequest {
candidate_hash: core.candidate_hash,
index: session_info.our_index,
},
erasure_root: core.candidate_descriptor.erasure_root(),
relay_parent: core.candidate_descriptor.relay_parent(),
metrics,
sender,
chunk_index,
req_v1_protocol_name,
req_v2_protocol_name
};
FetchTaskConfig { live_in, prepared_running: Some(prepared_running) }
}
}
#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
impl FetchTask {
pub async fn start<Context>(config: FetchTaskConfig, ctx: &mut Context) -> Result<Self> {
let FetchTaskConfig { prepared_running, live_in } = config;
if let Some(running) = prepared_running {
let (handle, kill) = oneshot::channel();
ctx.spawn("chunk-fetcher", running.run(kill).boxed())
.map_err(|e| FatalError::SpawnTask(e))?;
Ok(FetchTask { live_in, state: FetchedState::Started(handle) })
} else {
Ok(FetchTask { live_in, state: FetchedState::Canceled })
}
}
pub fn add_leaf(&mut self, leaf: Hash) {
self.live_in.insert(leaf);
}
pub fn remove_leaves(&mut self, leaves: &HashSet<Hash>) {
for leaf in leaves {
self.live_in.remove(leaf);
}
if self.live_in.is_empty() && !self.is_finished() {
self.state = FetchedState::Canceled
}
}
pub fn is_live(&self) -> bool {
!self.live_in.is_empty()
}
pub fn is_finished(&self) -> bool {
match &self.state {
FetchedState::Canceled => true,
FetchedState::Started(sender) => sender.is_canceled(),
}
}
}
#[derive(Debug)]
enum TaskError {
PeerError,
ShuttingDown,
}
impl RunningTask {
async fn run(self, kill: oneshot::Receiver<()>) {
let run_it = self.run_inner();
futures::pin_mut!(run_it);
let _ = select(run_it, kill).await;
}
async fn run_inner(mut self) {
let mut bad_validators = Vec::new();
let mut succeeded = false;
let mut count: u32 = 0;
let mut network_error_freq = gum::Freq::new();
let mut canceled_freq = gum::Freq::new();
while let Some(validator) = self.group.pop() {
if count > 0 {
self.metrics.on_retry();
}
count += 1;
let resp = match self
.do_request(&validator, &mut network_error_freq, &mut canceled_freq)
.await
{
Ok(resp) => resp,
Err(TaskError::ShuttingDown) => {
gum::info!(
target: LOG_TARGET,
"Node seems to be shutting down, canceling fetch task"
);
self.metrics.on_fetch(FAILED);
return
},
Err(TaskError::PeerError) => {
bad_validators.push(validator);
continue
},
};
let chunk = match resp {
Some(chunk) => chunk,
None => {
gum::debug!(
target: LOG_TARGET,
validator = ?validator,
relay_parent = ?self.relay_parent,
group_index = ?self.group_index,
session_index = ?self.session_index,
chunk_index = ?self.request.index,
candidate_hash = ?self.request.candidate_hash,
"Validator did not have our chunk"
);
bad_validators.push(validator);
continue
},
};
if !self.validate_chunk(&validator, &chunk, self.chunk_index) {
bad_validators.push(validator);
continue
}
self.store_chunk(chunk).await;
succeeded = true;
break
}
if succeeded {
self.metrics.on_fetch(SUCCEEDED);
self.conclude(bad_validators).await;
} else {
self.metrics.on_fetch(FAILED);
self.conclude_fail().await
}
}
async fn do_request(
&mut self,
validator: &AuthorityDiscoveryId,
network_error_freq: &mut gum::Freq,
canceled_freq: &mut gum::Freq,
) -> std::result::Result<Option<ErasureChunk>, TaskError> {
gum::trace!(
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
group_index = ?self.group_index,
session_index = ?self.session_index,
chunk_index = ?self.request.index,
candidate_hash = ?self.request.candidate_hash,
"Starting chunk request",
);
let (full_request, response_recv) = OutgoingRequest::new_with_fallback(
Recipient::Authority(validator.clone()),
self.request,
v1::ChunkFetchingRequest::from(self.request),
);
let requests = Requests::ChunkFetching(full_request);
self.sender
.send(FromFetchTask::Message(
NetworkBridgeTxMessage::SendRequests(
vec![requests],
IfDisconnected::ImmediateError,
)
.into(),
))
.await
.map_err(|_| TaskError::ShuttingDown)?;
match response_recv.await {
Ok((bytes, protocol)) => match protocol {
_ if protocol == self.req_v2_protocol_name =>
match v2::ChunkFetchingResponse::decode(&mut &bytes[..]) {
Ok(chunk_response) => Ok(Option::<ErasureChunk>::from(chunk_response)),
Err(e) => {
gum::warn!(
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
group_index = ?self.group_index,
session_index = ?self.session_index,
chunk_index = ?self.request.index,
candidate_hash = ?self.request.candidate_hash,
err = ?e,
"Peer sent us invalid erasure chunk data (v2)"
);
Err(TaskError::PeerError)
},
},
_ if protocol == self.req_v1_protocol_name =>
match v1::ChunkFetchingResponse::decode(&mut &bytes[..]) {
Ok(chunk_response) => Ok(Option::<ChunkResponse>::from(chunk_response)
.map(|c| c.recombine_into_chunk(&self.request.into()))),
Err(e) => {
gum::warn!(
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
group_index = ?self.group_index,
session_index = ?self.session_index,
chunk_index = ?self.request.index,
candidate_hash = ?self.request.candidate_hash,
err = ?e,
"Peer sent us invalid erasure chunk data"
);
Err(TaskError::PeerError)
},
},
_ => {
gum::warn!(
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
group_index = ?self.group_index,
session_index = ?self.session_index,
chunk_index = ?self.request.index,
candidate_hash = ?self.request.candidate_hash,
"Peer sent us invalid erasure chunk data - unknown protocol"
);
Err(TaskError::PeerError)
},
},
Err(RequestError::InvalidResponse(err)) => {
gum::warn!(
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
group_index = ?self.group_index,
session_index = ?self.session_index,
chunk_index = ?self.request.index,
candidate_hash = ?self.request.candidate_hash,
err = ?err,
"Peer sent us invalid erasure chunk data"
);
Err(TaskError::PeerError)
},
Err(RequestError::NetworkError(err)) => {
gum::warn_if_frequent!(
freq: network_error_freq,
max_rate: gum::Times::PerHour(100),
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
group_index = ?self.group_index,
session_index = ?self.session_index,
chunk_index = ?self.request.index,
candidate_hash = ?self.request.candidate_hash,
err = ?err,
"Some network error occurred when fetching erasure chunk"
);
Err(TaskError::PeerError)
},
Err(RequestError::Canceled(oneshot::Canceled)) => {
gum::warn_if_frequent!(
freq: canceled_freq,
max_rate: gum::Times::PerHour(100),
target: LOG_TARGET,
origin = ?validator,
relay_parent = ?self.relay_parent,
group_index = ?self.group_index,
session_index = ?self.session_index,
chunk_index = ?self.request.index,
candidate_hash = ?self.request.candidate_hash,
"Erasure chunk request got canceled"
);
Err(TaskError::PeerError)
},
}
}
fn validate_chunk(
&self,
validator: &AuthorityDiscoveryId,
chunk: &ErasureChunk,
expected_chunk_index: ChunkIndex,
) -> bool {
if chunk.index != expected_chunk_index {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?self.request.candidate_hash,
origin = ?validator,
chunk_index = ?chunk.index,
expected_chunk_index = ?expected_chunk_index,
"Validator sent the wrong chunk",
);
return false
}
let anticipated_hash =
match branch_hash(&self.erasure_root, chunk.proof(), chunk.index.0 as usize) {
Ok(hash) => hash,
Err(e) => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?self.request.candidate_hash,
origin = ?validator,
error = ?e,
"Failed to calculate chunk merkle proof",
);
return false
},
};
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
gum::warn!(target: LOG_TARGET, origin = ?validator, "Received chunk does not match merkle tree");
return false
}
true
}
async fn store_chunk(&mut self, chunk: ErasureChunk) {
let (tx, rx) = oneshot::channel();
let r = self
.sender
.send(FromFetchTask::Message(
AvailabilityStoreMessage::StoreChunk {
candidate_hash: self.request.candidate_hash,
chunk,
validator_index: self.request.index,
tx,
}
.into(),
))
.await;
if let Err(err) = r {
gum::error!(target: LOG_TARGET, err= ?err, "Storing erasure chunk failed, system shutting down?");
}
if let Err(oneshot::Canceled) = rx.await {
gum::error!(target: LOG_TARGET, "Storing erasure chunk failed");
}
}
async fn conclude(&mut self, bad_validators: Vec<AuthorityDiscoveryId>) {
let payload = if bad_validators.is_empty() {
None
} else {
Some(BadValidators {
session_index: self.session_index,
group_index: self.group_index,
bad_validators,
})
};
if let Err(err) = self.sender.send(FromFetchTask::Concluded(payload)).await {
gum::warn!(
target: LOG_TARGET,
err= ?err,
"Sending concluded message for task failed"
);
}
}
async fn conclude_fail(&mut self) {
if let Err(err) = self.sender.send(FromFetchTask::Failed(self.request.candidate_hash)).await
{
gum::warn!(target: LOG_TARGET, ?err, "Sending `Failed` message for task failed");
}
}
}