use futures::{future::Either, FutureExt, StreamExt, TryFutureExt};
use sp_keystore::KeystorePtr;
use polkadot_node_network_protocol::request_response::{
v1, v2, IncomingRequestReceiver, ReqProtocolNames,
};
use polkadot_node_subsystem::{
messages::AvailabilityDistributionMessage, overseer, FromOrchestra, OverseerSignal,
SpawnedSubsystem, SubsystemError,
};
mod error;
use error::{log_error, FatalError, Result};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
mod requester;
use requester::Requester;
mod pov_requester;
mod responder;
use responder::{run_chunk_receivers, run_pov_receiver};
mod metrics;
pub use metrics::Metrics;
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "parachain::availability-distribution";
pub struct AvailabilityDistributionSubsystem {
runtime: RuntimeInfo,
recvs: IncomingRequestReceivers,
req_protocol_names: ReqProtocolNames,
metrics: Metrics,
}
pub struct IncomingRequestReceivers {
pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
pub chunk_req_v1_receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
pub chunk_req_v2_receiver: IncomingRequestReceiver<v2::ChunkFetchingRequest>,
}
#[overseer::subsystem(AvailabilityDistribution, error=SubsystemError, prefix=self::overseer)]
impl<Context> AvailabilityDistributionSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("availability-distribution", e))
.boxed();
SpawnedSubsystem { name: "availability-distribution-subsystem", future }
}
}
#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
impl AvailabilityDistributionSubsystem {
pub fn new(
keystore: KeystorePtr,
recvs: IncomingRequestReceivers,
req_protocol_names: ReqProtocolNames,
metrics: Metrics,
) -> Self {
let runtime = RuntimeInfo::new(Some(keystore));
Self { runtime, recvs, req_protocol_names, metrics }
}
async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> {
let Self { mut runtime, recvs, metrics, req_protocol_names } = self;
let IncomingRequestReceivers {
pov_req_receiver,
chunk_req_v1_receiver,
chunk_req_v2_receiver,
} = recvs;
let mut requester = Requester::new(req_protocol_names, metrics.clone()).fuse();
let mut warn_freq = gum::Freq::new();
{
let sender = ctx.sender().clone();
ctx.spawn(
"pov-receiver",
run_pov_receiver(sender.clone(), pov_req_receiver, metrics.clone()).boxed(),
)
.map_err(FatalError::SpawnTask)?;
ctx.spawn(
"chunk-receiver",
run_chunk_receivers(
sender,
chunk_req_v1_receiver,
chunk_req_v2_receiver,
metrics.clone(),
)
.boxed(),
)
.map_err(FatalError::SpawnTask)?;
}
loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
futures::select! {
subsystem_msg = subsystem_next => Either::Left(subsystem_msg),
from_task = requester.next() => Either::Right(from_task),
}
};
let message = match action {
Either::Left(subsystem_msg) =>
subsystem_msg.map_err(|e| FatalError::IncomingMessageChannel(e))?,
Either::Right(from_task) => {
let from_task = from_task.ok_or(FatalError::RequesterExhausted)?;
ctx.send_message(from_task).await;
continue
},
};
match message {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
log_error(
requester
.get_mut()
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await,
"Error in Requester::update_fetching_heads",
&mut warn_freq,
)?;
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, _finalized_number)) => {
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication {
msg:
AvailabilityDistributionMessage::FetchPoV {
relay_parent,
from_validator,
para_id,
candidate_hash,
pov_hash,
tx,
},
} => {
log_error(
pov_requester::fetch_pov(
&mut ctx,
&mut runtime,
relay_parent,
from_validator,
para_id,
candidate_hash,
pov_hash,
tx,
metrics.clone(),
)
.await,
"pov_requester::fetch_pov",
&mut warn_freq,
)?;
},
}
}
}
}