#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
#![recursion_limit = "256"]
use futures::{
channel::{mpsc, oneshot},
future,
lock::Mutex,
FutureExt,
};
use polkadot_node_subsystem::{
messages::{AvailabilityStoreMessage, BitfieldDistributionMessage},
overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
SubsystemResult,
};
use polkadot_node_subsystem_util::{
self as util, request_availability_cores, runtime::recv_runtime, Validator,
};
use polkadot_primitives::{vstaging::CoreState, AvailabilityBitfield, Hash, ValidatorIndex};
use sp_keystore::{Error as KeystoreError, KeystorePtr};
use std::{collections::HashMap, time::Duration};
use wasm_timer::{Delay, Instant};
mod metrics;
use self::metrics::Metrics;
#[cfg(test)]
mod tests;
const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1500);
const LOG_TARGET: &str = "parachain::bitfield-signing";
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
Util(#[from] util::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error(transparent)]
MpscSend(#[from] mpsc::SendError),
#[error(transparent)]
Runtime(#[from] util::runtime::Error),
#[error("Keystore failed: {0:?}")]
Keystore(KeystoreError),
}
async fn get_core_availability(
core: &CoreState,
validator_index: ValidatorIndex,
sender: &Mutex<&mut impl overseer::BitfieldSigningSenderTrait>,
) -> Result<bool, Error> {
if let CoreState::Occupied(core) = core {
let (tx, rx) = oneshot::channel();
sender
.lock()
.await
.send_message(AvailabilityStoreMessage::QueryChunkAvailability(
core.candidate_hash,
validator_index,
tx,
))
.await;
let res = rx.await.map_err(Into::into);
gum::trace!(
target: LOG_TARGET,
para_id = %core.para_id(),
availability = ?res,
?core.candidate_hash,
"Candidate availability",
);
res
} else {
Ok(false)
}
}
async fn construct_availability_bitfield(
relay_parent: Hash,
validator_idx: ValidatorIndex,
sender: &mut impl overseer::BitfieldSigningSenderTrait,
) -> Result<AvailabilityBitfield, Error> {
let availability_cores =
{ recv_runtime(request_availability_cores(relay_parent, sender).await).await? };
let sender = Mutex::new(sender);
let results = future::try_join_all(
availability_cores
.iter()
.map(|core| get_core_availability(core, validator_idx, &sender)),
)
.await?;
let core_bits = FromIterator::from_iter(results.into_iter());
gum::debug!(
target: LOG_TARGET,
?relay_parent,
"Signing Bitfield for {core_count} cores: {core_bits}",
core_count = availability_cores.len(),
core_bits = core_bits,
);
Ok(AvailabilityBitfield(core_bits))
}
pub struct BitfieldSigningSubsystem {
keystore: KeystorePtr,
metrics: Metrics,
}
impl BitfieldSigningSubsystem {
pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
}
}
#[overseer::subsystem(BitfieldSigning, error=SubsystemError, prefix=self::overseer)]
impl<Context> BitfieldSigningSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
run(ctx, self.keystore, self.metrics)
.await
.map_err(|e| SubsystemError::with_origin("bitfield-signing", e))
}
.boxed();
SpawnedSubsystem { name: "bitfield-signing-subsystem", future }
}
}
#[overseer::contextbounds(BitfieldSigning, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
keystore: KeystorePtr,
metrics: Metrics,
) -> SubsystemResult<()> {
let mut running = HashMap::<Hash, future::AbortHandle>::new();
loop {
match ctx.recv().await? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
for leaf in &update.deactivated {
if let Some(handle) = running.remove(leaf) {
handle.abort();
}
}
if let Some(leaf) = update.activated {
let sender = ctx.sender().clone();
let leaf_hash = leaf.hash;
let (fut, handle) = future::abortable(handle_active_leaves_update(
sender,
leaf,
keystore.clone(),
metrics.clone(),
));
running.insert(leaf_hash, handle);
ctx.spawn("bitfield-signing-job", fut.map(drop).boxed())?;
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { .. } => {},
}
}
}
async fn handle_active_leaves_update<Sender>(
mut sender: Sender,
leaf: ActivatedLeaf,
keystore: KeystorePtr,
metrics: Metrics,
) -> Result<(), Error>
where
Sender: overseer::BitfieldSigningSenderTrait,
{
let wait_until = Instant::now() + SPAWNED_TASK_DELAY;
let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};
Delay::new_at(wait_until).await?;
let _timer = metrics.time_run();
let bitfield =
match construct_availability_bitfield(leaf.hash, validator.index(), &mut sender).await {
Err(Error::Runtime(runtime_err)) => {
gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
return Ok(())
},
Err(err) => return Err(err),
Ok(bitfield) => bitfield,
};
let signed_bitfield =
match validator.sign(keystore, bitfield).map_err(|e| Error::Keystore(e))? {
Some(b) => b,
None => {
gum::error!(
target: LOG_TARGET,
"Key was found at construction, but while signing it could not be found.",
);
return Ok(())
},
};
metrics.on_bitfield_signed();
sender
.send_message(BitfieldDistributionMessage::DistributeBitfield(leaf.hash, signed_bitfield))
.await;
Ok(())
}