#![warn(missing_docs)]
pub use overseer::{
gen::{OrchestraError as OverseerError, Timeout},
Subsystem, TimeoutExt,
};
use polkadot_node_subsystem::{
errors::{RuntimeApiError, SubsystemError},
messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
overseer, SubsystemSender,
};
pub use polkadot_node_metrics::{metrics, Metronome};
use codec::Encode;
use futures::channel::{mpsc, oneshot};
use polkadot_primitives::{
slashing,
vstaging::{
async_backing::BackingState, CandidateEvent,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreState, ScrapedOnChainVotes,
},
AsyncBackingParams, AuthorityDiscoveryId, CandidateHash, CoreIndex, EncodeAs, ExecutorParams,
GroupIndex, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, SessionIndex, SessionInfo, Signed, SigningContext, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
};
pub use rand;
use runtime::get_disabled_validators_with_fallback;
use sp_application_crypto::AppCrypto;
use sp_core::ByteArray;
use sp_keystore::{Error as KeystoreError, KeystorePtr};
use std::{
collections::{BTreeMap, VecDeque},
time::Duration,
};
use thiserror::Error;
pub use determine_new_blocks::determine_new_blocks;
pub use metered;
pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
pub mod reexports {
pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
}
pub mod availability_chunks;
pub mod backing_implicit_view;
pub mod database;
pub mod inclusion_emulator;
pub mod runtime;
pub mod vstaging;
pub mod nesting_sender;
pub mod reputation;
mod determine_new_blocks;
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "parachain::subsystem-util";
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
pub const JOB_CHANNEL_CAPACITY: usize = 64;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error(transparent)]
Mpsc(#[from] mpsc::SendError),
#[error(transparent)]
Subsystem(#[from] SubsystemError),
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
#[error(transparent)]
Infallible(#[from] std::convert::Infallible),
#[error("AllMessage not relevant to Job")]
SenderConversion(String),
#[error("Node is not a validator")]
NotAValidator,
#[error("AlreadyForwarding")]
AlreadyForwarding,
#[error("Data are not available")]
DataNotAvailable,
}
impl From<OverseerError> for Error {
fn from(e: OverseerError) -> Self {
Self::from(SubsystemError::from(e))
}
}
impl TryFrom<crate::runtime::Error> for Error {
type Error = ();
fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
use crate::runtime::Error;
match e {
Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
Error::NoSuchSession(_) | Error::NoExecutorParams(_) => Err(()),
}
}
}
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
parent: Hash,
sender: &mut Sender,
request_builder: RequestBuilder,
) -> RuntimeApiReceiver<Response>
where
RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
Sender: SubsystemSender<RuntimeApiMessage>,
{
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
.await;
rx
}
pub async fn has_required_runtime<Sender>(
sender: &mut Sender,
relay_parent: Hash,
required_runtime_version: u32,
) -> bool
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
.await;
match rx.await {
Result::Ok(Ok(runtime_version)) => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
?runtime_version,
?required_runtime_version,
"Fetched ParachainHost runtime api version"
);
runtime_version >= required_runtime_version
},
Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
?error,
"Execution error while fetching ParachainHost runtime api version"
);
false
},
Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
"NotSupported error while fetching ParachainHost runtime api version"
);
false
},
Result::Err(_) => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
"Cancelled error while fetching ParachainHost runtime api version"
);
false
},
}
}
macro_rules! specialize_requests {
(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
specialize_requests!{
named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
}
};
(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime"]
pub async fn $func_name (
parent: Hash,
$(
$param_name: $param_ty,
)*
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> RuntimeApiReceiver<$return_ty>
{
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
}
};
(
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
$(
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
)+
) => {
specialize_requests!{
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
}
specialize_requests!{
$(
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
)+
}
};
}
specialize_requests! {
fn request_runtime_api_version() -> u32; Version;
fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
fn request_validators() -> Vec<ValidatorId>; Validators;
fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
-> Option<ValidationCodeHash>; ValidationCodeHash;
fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashes;
fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
}
pub async fn executor_params_at_relay_parent(
relay_parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<ExecutorParams, Error> {
match request_session_index_for_child(relay_parent, sender).await.await {
Err(err) => {
Err(Error::Oneshot(err))
},
Ok(Err(err)) => {
Err(Error::RuntimeApi(err))
},
Ok(Ok(session_index)) => {
match request_session_executor_params(relay_parent, session_index, sender).await.await {
Err(err) => {
Err(Error::Oneshot(err))
},
Ok(Err(RuntimeApiError::NotSupported { .. })) => {
Ok(ExecutorParams::default())
},
Ok(Err(err)) => {
Err(Error::RuntimeApi(err))
},
Ok(Ok(None)) => {
Err(Error::DataNotAvailable)
},
Ok(Ok(Some(executor_params))) => Ok(executor_params),
}
},
}
}
pub fn signing_key<'a>(
validators: impl IntoIterator<Item = &'a ValidatorId>,
keystore: &KeystorePtr,
) -> Option<ValidatorId> {
signing_key_and_index(validators, keystore).map(|(k, _)| k)
}
pub fn signing_key_and_index<'a>(
validators: impl IntoIterator<Item = &'a ValidatorId>,
keystore: &KeystorePtr,
) -> Option<(ValidatorId, ValidatorIndex)> {
for (i, v) in validators.into_iter().enumerate() {
if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
return Some((v.clone(), ValidatorIndex(i as _)))
}
}
None
}
pub fn sign(
keystore: &KeystorePtr,
key: &ValidatorId,
data: &[u8],
) -> Result<Option<ValidatorSignature>, KeystoreError> {
let signature = keystore
.sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
.map(|sig| sig.into());
Ok(signature)
}
pub fn find_validator_group(
groups: &[Vec<ValidatorIndex>],
index: ValidatorIndex,
) -> Option<GroupIndex> {
groups.iter().enumerate().find_map(|(i, g)| {
if g.contains(&index) {
Some(GroupIndex(i as _))
} else {
None
}
})
}
pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
}
pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
is_priority: F,
v: &mut Vec<T>,
rng: &mut R,
min: usize,
) {
use rand::seq::SliceRandom as _;
let i = itertools::partition(v.iter_mut(), is_priority);
if i >= min || v.len() <= i {
v.truncate(i);
return
}
v[i..].shuffle(rng);
v.truncate(min);
}
pub fn gen_ratio(a: usize, b: usize) -> bool {
gen_ratio_rng(a, b, &mut rand::thread_rng())
}
pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
rng.gen_ratio(a as u32, b as u32)
}
#[derive(Debug)]
pub struct Validator {
signing_context: SigningContext,
key: ValidatorId,
index: ValidatorIndex,
disabled: bool,
}
impl Validator {
pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
where
S: SubsystemSender<RuntimeApiMessage>,
{
let (validators, session_index) = futures::try_join!(
request_validators(parent, sender).await,
request_session_index_for_child(parent, sender).await,
)?;
let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
let validators = validators?;
let disabled_validators = get_disabled_validators_with_fallback(sender, parent)
.await
.map_err(|e| Error::try_from(e).expect("the conversion is infallible; qed"))?;
Self::construct(&validators, &disabled_validators, signing_context, keystore)
}
pub fn construct(
validators: &[ValidatorId],
disabled_validators: &[ValidatorIndex],
signing_context: SigningContext,
keystore: KeystorePtr,
) -> Result<Self, Error> {
let (key, index) =
signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
Ok(Validator { signing_context, key, index, disabled })
}
pub fn id(&self) -> ValidatorId {
self.key.clone()
}
pub fn index(&self) -> ValidatorIndex {
self.index
}
pub fn disabled(&self) -> bool {
self.disabled
}
pub fn signing_context(&self) -> &SigningContext {
&self.signing_context
}
pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
&self,
keystore: KeystorePtr,
payload: Payload,
) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
}
}