1#![warn(missing_docs)]
27
28pub use overseer::{
29 gen::{OrchestraError as OverseerError, Timeout},
30 Subsystem, TimeoutExt,
31};
32use polkadot_node_subsystem::{
33 errors::{RuntimeApiError, SubsystemError},
34 messages::{ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
35 overseer, SubsystemSender,
36};
37
38pub use polkadot_node_metrics::{metrics, Metronome};
39
40use codec::Encode;
41use futures::channel::{mpsc, oneshot};
42
43use polkadot_primitives::{
44 async_backing::{BackingState, Constraints},
45 slashing,
46 vstaging::RelayParentInfo,
47 AsyncBackingParams, AuthorityDiscoveryId, BlockNumber, CandidateEvent, CandidateHash,
48 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState, EncodeAs,
49 ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures,
50 OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionIndex,
51 SessionInfo, Signed, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId,
52 ValidatorIndex, ValidatorSignature,
53};
54pub use rand;
55use sp_application_crypto::AppCrypto;
56use sp_core::ByteArray;
57use sp_keystore::{Error as KeystoreError, KeystorePtr};
58use std::{
59 collections::{BTreeMap, VecDeque},
60 time::Duration,
61};
62use thiserror::Error;
63
64pub use determine_new_blocks::determine_new_blocks;
65pub use metered;
66pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
67
68pub mod reexports {
71 pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
72}
73
74pub mod availability_chunks;
76pub mod backing_implicit_view;
80pub mod database;
82pub mod inclusion_emulator;
85pub mod runtime;
87
88pub mod vstaging;
90
91pub mod nesting_sender;
96
97pub mod reputation;
98
99mod determine_new_blocks;
100
101mod controlled_validator_indices;
102pub use controlled_validator_indices::ControlledValidatorIndices;
103
104#[cfg(test)]
105mod tests;
106
107const LOG_TARGET: &'static str = "parachain::subsystem-util";
108
109pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
111pub const JOB_CHANNEL_CAPACITY: usize = 64;
113
114#[derive(Debug, Error)]
116pub enum Error {
117 #[error(transparent)]
119 Oneshot(#[from] oneshot::Canceled),
120 #[error(transparent)]
122 Mpsc(#[from] mpsc::SendError),
123 #[error(transparent)]
125 Subsystem(#[from] SubsystemError),
126 #[error(transparent)]
128 RuntimeApi(#[from] RuntimeApiError),
129 #[error(transparent)]
131 Infallible(#[from] std::convert::Infallible),
132 #[error("AllMessage not relevant to Job")]
134 SenderConversion(String),
135 #[error("Node is not a validator")]
137 NotAValidator,
138 #[error("AlreadyForwarding")]
140 AlreadyForwarding,
141 #[error("Data are not available")]
143 DataNotAvailable,
144}
145
146impl From<OverseerError> for Error {
147 fn from(e: OverseerError) -> Self {
148 Self::from(SubsystemError::from(e))
149 }
150}
151
152impl TryFrom<crate::runtime::Error> for Error {
153 type Error = ();
154
155 fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
156 use crate::runtime::Error;
157
158 match e {
159 Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
160 Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
161 Error::NoSuchSession(_) => Err(()),
162 }
163 }
164}
165
166pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
168
169pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
171 parent: Hash,
172 sender: &mut Sender,
173 request_builder: RequestBuilder,
174) -> RuntimeApiReceiver<Response>
175where
176 RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
177 Sender: SubsystemSender<RuntimeApiMessage>,
178{
179 let (tx, rx) = oneshot::channel();
180
181 sender
182 .send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
183 .await;
184
185 rx
186}
187
188pub async fn has_required_runtime<Sender>(
191 sender: &mut Sender,
192 relay_parent: Hash,
193 required_runtime_version: u32,
194) -> bool
195where
196 Sender: SubsystemSender<RuntimeApiMessage>,
197{
198 gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
199
200 let (tx, rx) = oneshot::channel();
201 sender
202 .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
203 .await;
204
205 match rx.await {
206 Result::Ok(Ok(runtime_version)) => {
207 gum::trace!(
208 target: LOG_TARGET,
209 ?relay_parent,
210 ?runtime_version,
211 ?required_runtime_version,
212 "Fetched ParachainHost runtime api version"
213 );
214 runtime_version >= required_runtime_version
215 },
216 Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
217 gum::trace!(
218 target: LOG_TARGET,
219 ?relay_parent,
220 ?error,
221 "Execution error while fetching ParachainHost runtime api version"
222 );
223 false
224 },
225 Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
226 gum::trace!(
227 target: LOG_TARGET,
228 ?relay_parent,
229 "NotSupported error while fetching ParachainHost runtime api version"
230 );
231 false
232 },
233 Result::Err(_) => {
234 gum::trace!(
235 target: LOG_TARGET,
236 ?relay_parent,
237 "Cancelled error while fetching ParachainHost runtime api version"
238 );
239 false
240 },
241 }
242}
243
244macro_rules! specialize_requests {
248 (fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
250 specialize_requests!{
251 named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
252 }
253 };
254
255 (named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
257 #[doc = "Request `"]
258 #[doc = $doc_name]
259 #[doc = "` from the runtime"]
260 pub async fn $func_name (
261 parent: Hash,
262 $(
263 $param_name: $param_ty,
264 )*
265 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
266 ) -> RuntimeApiReceiver<$return_ty>
267 {
268 request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
269 $( $param_name, )* tx
270 )).await
271 }
272 };
273
274 (
276 fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
277 $(
278 fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
279 )+
280 ) => {
281 specialize_requests!{
282 fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
283 }
284 specialize_requests!{
285 $(
286 fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
287 )+
288 }
289 };
290}
291
292specialize_requests! {
293 fn request_runtime_api_version() -> u32; Version;
294 fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
295 fn request_validators() -> Vec<ValidatorId>; Validators;
296 fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
297 fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
298 fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
299 fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
300 fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
301 fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
302 fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
303 fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
304 fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
305 fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
306 fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
307 fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
308 -> Option<ValidationCodeHash>; ValidationCodeHash;
309 fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
310 fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
311 fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::LegacyPendingSlashes)>; UnappliedSlashes;
312 fn request_unapplied_slashes_v2() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashesV2;
313 fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
314 fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
315 fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
316 fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
317 fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
318 fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
319 fn request_backing_constraints(para_id: ParaId) -> Option<Constraints>; BackingConstraints;
320 fn request_min_backing_votes(session_index: SessionIndex) -> u32; MinimumBackingVotes;
321 fn request_node_features(session_index: SessionIndex) -> NodeFeatures; NodeFeatures;
322 fn request_para_ids(session_index: SessionIndex) -> Vec<ParaId>; ParaIds;
323
324}
325
326pub enum CheckRelayParentSessionResult {
328 Valid,
330 NotFound,
333 NotSupported,
336 RuntimeError(String),
338}
339
340pub async fn check_relay_parent_session(
349 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
350 query_at: Hash,
351 session_index: SessionIndex,
352 relay_parent: Hash,
353) -> CheckRelayParentSessionResult {
354 if query_at == relay_parent {
355 return match request_session_index_for_child(relay_parent, sender).await.await {
358 Ok(Ok(session)) if session == session_index => CheckRelayParentSessionResult::Valid,
359 Ok(Ok(_)) => CheckRelayParentSessionResult::NotFound,
360 Ok(Err(err)) => CheckRelayParentSessionResult::RuntimeError(format!(
361 "SessionIndexForChild error: {err}"
362 )),
363 Err(_) => CheckRelayParentSessionResult::RuntimeError(
364 "SessionIndexForChild request cancelled".into(),
365 ),
366 };
367 }
368
369 match request_from_runtime(query_at, sender, |tx| {
371 RuntimeApiRequest::AncestorRelayParentInfo(session_index, relay_parent, tx)
372 })
373 .await
374 .await
375 {
376 Ok(Ok(Some(_))) => CheckRelayParentSessionResult::Valid,
377 Ok(Ok(None)) => CheckRelayParentSessionResult::NotFound,
378 Ok(Err(RuntimeApiError::NotSupported { .. })) => {
379 CheckRelayParentSessionResult::NotSupported
380 },
381 Ok(Err(err)) => CheckRelayParentSessionResult::RuntimeError(format!(
382 "AncestorRelayParentInfo error: {err}"
383 )),
384 Err(_) => CheckRelayParentSessionResult::RuntimeError(
385 "AncestorRelayParentInfo request cancelled".into(),
386 ),
387 }
388}
389
390pub async fn fetch_relay_parent_info<Sender>(
399 sender: &mut Sender,
400 query_at: Hash,
401 session_index: SessionIndex,
402 relay_parent: Hash,
403) -> Result<Option<RelayParentInfo<Hash, BlockNumber>>, runtime::Error>
404where
405 Sender: SubsystemSender<RuntimeApiMessage> + SubsystemSender<ChainApiMessage>,
406{
407 if query_at == relay_parent {
408 return get_scheduling_parent_info(sender, session_index, relay_parent).await;
412 }
413
414 match request_from_runtime(query_at, sender, |tx| {
416 RuntimeApiRequest::AncestorRelayParentInfo(session_index, relay_parent, tx)
417 })
418 .await
419 .await?
420 {
421 Ok(info) => Ok(info),
422 Err(RuntimeApiError::NotSupported { .. }) => {
423 get_scheduling_parent_info(sender, session_index, relay_parent).await
426 },
427 Err(err) => Err(runtime::Error::RuntimeRequest(err)),
428 }
429}
430
431async fn get_scheduling_parent_info<Sender>(
432 sender: &mut Sender,
433 session_index: SessionIndex,
434 hash: Hash,
435) -> Result<Option<RelayParentInfo<Hash, BlockNumber>>, runtime::Error>
436where
437 Sender: SubsystemSender<RuntimeApiMessage> + SubsystemSender<ChainApiMessage>,
438{
439 let session_ok = request_session_index_for_child(hash, sender).await.await?? == session_index;
440 if !session_ok {
441 return Ok(None);
442 }
443
444 let (tx, rx) = oneshot::channel();
445 sender.send_message(ChainApiMessage::BlockHeader(hash, tx)).await;
446 match rx.await? {
447 Ok(Some(header)) => {
448 Ok(Some(RelayParentInfo { number: header.number, state_root: header.state_root }))
449 },
450 _ => Ok(None),
451 }
452}
453
454pub async fn executor_params_at_relay_parent(
464 relay_parent: Hash,
465 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
466) -> Result<ExecutorParams, Error> {
467 match request_session_index_for_child(relay_parent, sender).await.await {
468 Err(err) => {
469 Err(Error::Oneshot(err))
471 },
472 Ok(Err(err)) => {
473 Err(Error::RuntimeApi(err))
475 },
476 Ok(Ok(session_index)) => {
477 match request_session_executor_params(relay_parent, session_index, sender).await.await {
478 Err(err) => {
479 Err(Error::Oneshot(err))
481 },
482 Ok(Err(RuntimeApiError::NotSupported { .. })) => {
483 Ok(ExecutorParams::default())
486 },
487 Ok(Err(err)) => {
488 Err(Error::RuntimeApi(err))
490 },
491 Ok(Ok(None)) => {
492 Err(Error::DataNotAvailable)
495 },
496 Ok(Ok(Some(executor_params))) => Ok(executor_params),
497 }
498 },
499 }
500}
501
502pub fn signing_key<'a>(
504 validators: impl IntoIterator<Item = &'a ValidatorId>,
505 keystore: &KeystorePtr,
506) -> Option<ValidatorId> {
507 signing_key_and_index(validators, keystore).map(|(k, _)| k)
508}
509
510pub fn signing_key_and_index<'a>(
513 validators: impl IntoIterator<Item = &'a ValidatorId>,
514 keystore: &KeystorePtr,
515) -> Option<(ValidatorId, ValidatorIndex)> {
516 for (i, v) in validators.into_iter().enumerate() {
517 if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
518 return Some((v.clone(), ValidatorIndex(i as _)));
519 }
520 }
521 None
522}
523
524pub fn sign(
529 keystore: &KeystorePtr,
530 key: &ValidatorId,
531 data: &[u8],
532) -> Result<Option<ValidatorSignature>, KeystoreError> {
533 let signature = keystore
534 .sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
535 .map(|sig| sig.into());
536 Ok(signature)
537}
538
539pub fn find_validator_group(
541 groups: &[Vec<ValidatorIndex>],
542 index: ValidatorIndex,
543) -> Option<GroupIndex> {
544 groups.iter().enumerate().find_map(|(i, g)| {
545 if g.contains(&index) {
546 Some(GroupIndex(i as _))
547 } else {
548 None
549 }
550 })
551}
552
553pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
556 choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
557}
558
559pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
562 is_priority: F,
563 v: &mut Vec<T>,
564 rng: &mut R,
565 min: usize,
566) {
567 use rand::seq::SliceRandom as _;
568
569 let i = itertools::partition(v.iter_mut(), is_priority);
572
573 if i >= min || v.len() <= i {
574 v.truncate(i);
575 return;
576 }
577
578 v[i..].shuffle(rng);
579
580 v.truncate(min);
581}
582
583pub fn gen_ratio(a: usize, b: usize) -> bool {
585 gen_ratio_rng(a, b, &mut rand::thread_rng())
586}
587
588pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
590 rng.gen_ratio(a as u32, b as u32)
591}
592
593#[derive(Debug)]
598pub struct Validator {
599 signing_context: SigningContext,
600 key: ValidatorId,
601 index: ValidatorIndex,
602 disabled: bool,
603}
604
605impl Validator {
606 pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
609 where
610 S: SubsystemSender<RuntimeApiMessage>,
611 {
612 let (validators, disabled_validators, session_index) = futures::try_join!(
616 request_validators(parent, sender).await,
617 request_disabled_validators(parent, sender).await,
618 request_session_index_for_child(parent, sender).await,
619 )?;
620
621 let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
622
623 let validators = validators?;
624
625 let disabled_validators = disabled_validators?;
626
627 Self::construct(&validators, &disabled_validators, signing_context, keystore)
628 }
629
630 pub fn construct(
634 validators: &[ValidatorId],
635 disabled_validators: &[ValidatorIndex],
636 signing_context: SigningContext,
637 keystore: KeystorePtr,
638 ) -> Result<Self, Error> {
639 let (key, index) =
640 signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
641
642 let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
643
644 Ok(Validator { signing_context, key, index, disabled })
645 }
646
647 pub fn id(&self) -> ValidatorId {
649 self.key.clone()
650 }
651
652 pub fn index(&self) -> ValidatorIndex {
654 self.index
655 }
656
657 pub fn disabled(&self) -> bool {
659 self.disabled
660 }
661
662 pub fn signing_context(&self) -> &SigningContext {
664 &self.signing_context
665 }
666
667 pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
669 &self,
670 keystore: KeystorePtr,
671 payload: Payload,
672 ) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
673 Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
674 }
675}