1#![warn(missing_docs)]
27
28pub use overseer::{
29 gen::{OrchestraError as OverseerError, Timeout},
30 Subsystem, TimeoutExt,
31};
32use polkadot_node_subsystem::{
33 errors::{RuntimeApiError, SubsystemError},
34 messages::{ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
35 overseer, SubsystemSender,
36};
37
38pub use polkadot_node_metrics::{metrics, Metronome};
39
40use codec::Encode;
41use futures::channel::{mpsc, oneshot};
42
43use polkadot_primitives::{
44 async_backing::{BackingState, Constraints},
45 slashing,
46 vstaging::RelayParentInfo,
47 ApprovalVotingParams, AsyncBackingParams, AuthorityDiscoveryId, BlockNumber, CandidateEvent,
48 CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState,
49 EncodeAs, ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures,
50 OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionIndex,
51 SessionInfo, Signed, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId,
52 ValidatorIndex, ValidatorSignature,
53};
54pub use rand;
55use sp_application_crypto::AppCrypto;
56use sp_core::ByteArray;
57use sp_keystore::{Error as KeystoreError, KeystorePtr};
58use std::{
59 collections::{BTreeMap, VecDeque},
60 time::Duration,
61};
62use thiserror::Error;
63
64pub use determine_new_blocks::determine_new_blocks;
65pub use metered;
66pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
67
68pub mod reexports {
71 pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
72}
73
74pub mod availability_chunks;
76pub mod backing_implicit_view;
80pub mod database;
82pub mod inclusion_emulator;
85pub mod runtime;
87
88pub mod vstaging;
90
91pub mod nesting_sender;
96
97pub mod reputation;
98
99mod determine_new_blocks;
100
101mod controlled_validator_indices;
102pub use controlled_validator_indices::ControlledValidatorIndices;
103
104#[cfg(test)]
105mod tests;
106
107const LOG_TARGET: &'static str = "parachain::subsystem-util";
108
109pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
111pub const JOB_CHANNEL_CAPACITY: usize = 64;
113
114#[derive(Debug, Error)]
116pub enum Error {
117 #[error(transparent)]
119 Oneshot(#[from] oneshot::Canceled),
120 #[error(transparent)]
122 Mpsc(#[from] mpsc::SendError),
123 #[error(transparent)]
125 Subsystem(#[from] SubsystemError),
126 #[error(transparent)]
128 RuntimeApi(#[from] RuntimeApiError),
129 #[error(transparent)]
131 Infallible(#[from] std::convert::Infallible),
132 #[error("AllMessage not relevant to Job")]
134 SenderConversion(String),
135 #[error("Node is not a validator")]
137 NotAValidator,
138 #[error("AlreadyForwarding")]
140 AlreadyForwarding,
141 #[error("Data are not available")]
143 DataNotAvailable,
144}
145
146impl From<OverseerError> for Error {
147 fn from(e: OverseerError) -> Self {
148 Self::from(SubsystemError::from(e))
149 }
150}
151
152impl TryFrom<crate::runtime::Error> for Error {
153 type Error = ();
154
155 fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
156 use crate::runtime::Error;
157
158 match e {
159 Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
160 Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
161 Error::NoSuchSession(_) => Err(()),
162 }
163 }
164}
165
166pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
168
169pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
171 parent: Hash,
172 sender: &mut Sender,
173 request_builder: RequestBuilder,
174) -> RuntimeApiReceiver<Response>
175where
176 RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
177 Sender: SubsystemSender<RuntimeApiMessage>,
178{
179 let (tx, rx) = oneshot::channel();
180
181 sender
182 .send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
183 .await;
184
185 rx
186}
187
188pub async fn has_required_runtime<Sender>(
191 sender: &mut Sender,
192 relay_parent: Hash,
193 required_runtime_version: u32,
194) -> bool
195where
196 Sender: SubsystemSender<RuntimeApiMessage>,
197{
198 gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
199
200 let (tx, rx) = oneshot::channel();
201 sender
202 .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
203 .await;
204
205 match rx.await {
206 Result::Ok(Ok(runtime_version)) => {
207 gum::trace!(
208 target: LOG_TARGET,
209 ?relay_parent,
210 ?runtime_version,
211 ?required_runtime_version,
212 "Fetched ParachainHost runtime api version"
213 );
214 runtime_version >= required_runtime_version
215 },
216 Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
217 gum::trace!(
218 target: LOG_TARGET,
219 ?relay_parent,
220 ?error,
221 "Execution error while fetching ParachainHost runtime api version"
222 );
223 false
224 },
225 Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
226 gum::trace!(
227 target: LOG_TARGET,
228 ?relay_parent,
229 "NotSupported error while fetching ParachainHost runtime api version"
230 );
231 false
232 },
233 Result::Err(_) => {
234 gum::trace!(
235 target: LOG_TARGET,
236 ?relay_parent,
237 "Cancelled error while fetching ParachainHost runtime api version"
238 );
239 false
240 },
241 }
242}
243
244macro_rules! specialize_requests {
248 (fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
250 specialize_requests!{
251 named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
252 }
253 };
254
255 (named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
257 #[doc = "Request `"]
258 #[doc = $doc_name]
259 #[doc = "` from the runtime"]
260 pub async fn $func_name (
261 parent: Hash,
262 $(
263 $param_name: $param_ty,
264 )*
265 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
266 ) -> RuntimeApiReceiver<$return_ty>
267 {
268 request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
269 $( $param_name, )* tx
270 )).await
271 }
272 };
273
274 (
276 fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
277 $(
278 fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
279 )+
280 ) => {
281 specialize_requests!{
282 fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
283 }
284 specialize_requests!{
285 $(
286 fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
287 )+
288 }
289 };
290}
291
292specialize_requests! {
293 fn request_runtime_api_version() -> u32; Version;
294 fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
295 fn request_validators() -> Vec<ValidatorId>; Validators;
296 fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
297 fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
298 fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
299 fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
300 fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
301 fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
302 fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
303 fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
304 fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
305 fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
306 fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
307 fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
308 -> Option<ValidationCodeHash>; ValidationCodeHash;
309 fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
310 fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
311 fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::LegacyPendingSlashes)>; UnappliedSlashes;
312 fn request_unapplied_slashes_v2() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashesV2;
313 fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
314 fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
315 fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
316 fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
317 fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
318 fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
319 fn request_backing_constraints(para_id: ParaId) -> Option<Constraints>; BackingConstraints;
320 fn request_min_backing_votes(session_index: SessionIndex) -> u32; MinimumBackingVotes;
321 fn request_node_features(session_index: SessionIndex) -> NodeFeatures; NodeFeatures;
322 fn request_approval_voting_params(session_index: SessionIndex) -> ApprovalVotingParams; ApprovalVotingParams;
323 fn request_para_ids(session_index: SessionIndex) -> Vec<ParaId>; ParaIds;
324
325}
326
327pub enum CheckRelayParentSessionResult {
329 Valid,
331 NotFound,
334 NotSupported,
337 RuntimeError(String),
339}
340
341pub async fn check_relay_parent_session(
350 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
351 query_at: Hash,
352 session_index: SessionIndex,
353 relay_parent: Hash,
354) -> CheckRelayParentSessionResult {
355 if query_at == relay_parent {
356 return match request_session_index_for_child(relay_parent, sender).await.await {
359 Ok(Ok(session)) if session == session_index => CheckRelayParentSessionResult::Valid,
360 Ok(Ok(_)) => CheckRelayParentSessionResult::NotFound,
361 Ok(Err(err)) => CheckRelayParentSessionResult::RuntimeError(format!(
362 "SessionIndexForChild error: {err}"
363 )),
364 Err(_) => CheckRelayParentSessionResult::RuntimeError(
365 "SessionIndexForChild request cancelled".into(),
366 ),
367 };
368 }
369
370 match request_from_runtime(query_at, sender, |tx| {
372 RuntimeApiRequest::AncestorRelayParentInfo(session_index, relay_parent, tx)
373 })
374 .await
375 .await
376 {
377 Ok(Ok(Some(_))) => CheckRelayParentSessionResult::Valid,
378 Ok(Ok(None)) => CheckRelayParentSessionResult::NotFound,
379 Ok(Err(RuntimeApiError::NotSupported { .. })) => {
380 CheckRelayParentSessionResult::NotSupported
381 },
382 Ok(Err(err)) => CheckRelayParentSessionResult::RuntimeError(format!(
383 "AncestorRelayParentInfo error: {err}"
384 )),
385 Err(_) => CheckRelayParentSessionResult::RuntimeError(
386 "AncestorRelayParentInfo request cancelled".into(),
387 ),
388 }
389}
390
391pub async fn fetch_relay_parent_info<Sender>(
400 sender: &mut Sender,
401 query_at: Hash,
402 session_index: SessionIndex,
403 relay_parent: Hash,
404) -> Result<Option<RelayParentInfo<Hash, BlockNumber>>, runtime::Error>
405where
406 Sender: SubsystemSender<RuntimeApiMessage> + SubsystemSender<ChainApiMessage>,
407{
408 if query_at == relay_parent {
409 return get_scheduling_parent_info(sender, session_index, relay_parent).await;
413 }
414
415 match request_from_runtime(query_at, sender, |tx| {
417 RuntimeApiRequest::AncestorRelayParentInfo(session_index, relay_parent, tx)
418 })
419 .await
420 .await?
421 {
422 Ok(info) => Ok(info),
423 Err(RuntimeApiError::NotSupported { .. }) => {
424 get_scheduling_parent_info(sender, session_index, relay_parent).await
427 },
428 Err(err) => Err(runtime::Error::RuntimeRequest(err)),
429 }
430}
431
432async fn get_scheduling_parent_info<Sender>(
433 sender: &mut Sender,
434 session_index: SessionIndex,
435 hash: Hash,
436) -> Result<Option<RelayParentInfo<Hash, BlockNumber>>, runtime::Error>
437where
438 Sender: SubsystemSender<RuntimeApiMessage> + SubsystemSender<ChainApiMessage>,
439{
440 let session_ok = request_session_index_for_child(hash, sender).await.await?? == session_index;
441 if !session_ok {
442 return Ok(None);
443 }
444
445 let (tx, rx) = oneshot::channel();
446 sender.send_message(ChainApiMessage::BlockHeader(hash, tx)).await;
447 match rx.await? {
448 Ok(Some(header)) => {
449 Ok(Some(RelayParentInfo { number: header.number, state_root: header.state_root }))
450 },
451 _ => Ok(None),
452 }
453}
454
455pub async fn executor_params_at_relay_parent(
465 relay_parent: Hash,
466 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
467) -> Result<ExecutorParams, Error> {
468 match request_session_index_for_child(relay_parent, sender).await.await {
469 Err(err) => {
470 Err(Error::Oneshot(err))
472 },
473 Ok(Err(err)) => {
474 Err(Error::RuntimeApi(err))
476 },
477 Ok(Ok(session_index)) => {
478 match request_session_executor_params(relay_parent, session_index, sender).await.await {
479 Err(err) => {
480 Err(Error::Oneshot(err))
482 },
483 Ok(Err(RuntimeApiError::NotSupported { .. })) => {
484 Ok(ExecutorParams::default())
487 },
488 Ok(Err(err)) => {
489 Err(Error::RuntimeApi(err))
491 },
492 Ok(Ok(None)) => {
493 Err(Error::DataNotAvailable)
496 },
497 Ok(Ok(Some(executor_params))) => Ok(executor_params),
498 }
499 },
500 }
501}
502
503pub fn signing_key<'a>(
505 validators: impl IntoIterator<Item = &'a ValidatorId>,
506 keystore: &KeystorePtr,
507) -> Option<ValidatorId> {
508 signing_key_and_index(validators, keystore).map(|(k, _)| k)
509}
510
511pub fn signing_key_and_index<'a>(
514 validators: impl IntoIterator<Item = &'a ValidatorId>,
515 keystore: &KeystorePtr,
516) -> Option<(ValidatorId, ValidatorIndex)> {
517 for (i, v) in validators.into_iter().enumerate() {
518 if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
519 return Some((v.clone(), ValidatorIndex(i as _)));
520 }
521 }
522 None
523}
524
525pub fn sign(
530 keystore: &KeystorePtr,
531 key: &ValidatorId,
532 data: &[u8],
533) -> Result<Option<ValidatorSignature>, KeystoreError> {
534 let signature = keystore
535 .sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
536 .map(|sig| sig.into());
537 Ok(signature)
538}
539
540pub fn find_validator_group(
542 groups: &[Vec<ValidatorIndex>],
543 index: ValidatorIndex,
544) -> Option<GroupIndex> {
545 groups.iter().enumerate().find_map(|(i, g)| {
546 if g.contains(&index) {
547 Some(GroupIndex(i as _))
548 } else {
549 None
550 }
551 })
552}
553
554pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
557 choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
558}
559
560pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
563 is_priority: F,
564 v: &mut Vec<T>,
565 rng: &mut R,
566 min: usize,
567) {
568 use rand::seq::SliceRandom as _;
569
570 let i = itertools::partition(v.iter_mut(), is_priority);
573
574 if i >= min || v.len() <= i {
575 v.truncate(i);
576 return;
577 }
578
579 v[i..].shuffle(rng);
580
581 v.truncate(min);
582}
583
584pub fn gen_ratio(a: usize, b: usize) -> bool {
586 gen_ratio_rng(a, b, &mut rand::thread_rng())
587}
588
589pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
591 rng.gen_ratio(a as u32, b as u32)
592}
593
594#[derive(Debug)]
599pub struct Validator {
600 signing_context: SigningContext,
601 key: ValidatorId,
602 index: ValidatorIndex,
603 disabled: bool,
604}
605
606impl Validator {
607 pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
610 where
611 S: SubsystemSender<RuntimeApiMessage>,
612 {
613 let (validators, disabled_validators, session_index) = futures::try_join!(
617 request_validators(parent, sender).await,
618 request_disabled_validators(parent, sender).await,
619 request_session_index_for_child(parent, sender).await,
620 )?;
621
622 let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
623
624 let validators = validators?;
625
626 let disabled_validators = disabled_validators?;
627
628 Self::construct(&validators, &disabled_validators, signing_context, keystore)
629 }
630
631 pub fn construct(
635 validators: &[ValidatorId],
636 disabled_validators: &[ValidatorIndex],
637 signing_context: SigningContext,
638 keystore: KeystorePtr,
639 ) -> Result<Self, Error> {
640 let (key, index) =
641 signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
642
643 let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
644
645 Ok(Validator { signing_context, key, index, disabled })
646 }
647
648 pub fn id(&self) -> ValidatorId {
650 self.key.clone()
651 }
652
653 pub fn index(&self) -> ValidatorIndex {
655 self.index
656 }
657
658 pub fn disabled(&self) -> bool {
660 self.disabled
661 }
662
663 pub fn signing_context(&self) -> &SigningContext {
665 &self.signing_context
666 }
667
668 pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
670 &self,
671 keystore: KeystorePtr,
672 payload: Payload,
673 ) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
674 Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
675 }
676}