1#![warn(missing_docs)]
27
28pub use overseer::{
29 gen::{OrchestraError as OverseerError, Timeout},
30 Subsystem, TimeoutExt,
31};
32use polkadot_node_subsystem::{
33 errors::{RuntimeApiError, SubsystemError},
34 messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
35 overseer, SubsystemSender,
36};
37
38pub use polkadot_node_metrics::{metrics, Metronome};
39
40use codec::Encode;
41use futures::channel::{mpsc, oneshot};
42
43use polkadot_primitives::{
44 async_backing::{BackingState, Constraints},
45 slashing, AsyncBackingParams, AuthorityDiscoveryId, CandidateEvent, CandidateHash,
46 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState, EncodeAs,
47 ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures,
48 OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionIndex,
49 SessionInfo, Signed, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId,
50 ValidatorIndex, ValidatorSignature,
51};
52pub use rand;
53use sp_application_crypto::AppCrypto;
54use sp_core::ByteArray;
55use sp_keystore::{Error as KeystoreError, KeystorePtr};
56use std::{
57 collections::{BTreeMap, VecDeque},
58 time::Duration,
59};
60use thiserror::Error;
61
62pub use determine_new_blocks::determine_new_blocks;
63pub use metered;
64pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
65
66pub mod reexports {
69 pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
70}
71
72pub mod availability_chunks;
74pub mod backing_implicit_view;
78pub mod database;
80pub mod inclusion_emulator;
83pub mod runtime;
85
86pub mod vstaging;
88
89pub mod nesting_sender;
94
95pub mod reputation;
96
97mod determine_new_blocks;
98
99mod controlled_validator_indices;
100pub use controlled_validator_indices::ControlledValidatorIndices;
101
102#[cfg(test)]
103mod tests;
104
105const LOG_TARGET: &'static str = "parachain::subsystem-util";
106
107pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
109pub const JOB_CHANNEL_CAPACITY: usize = 64;
111
112#[derive(Debug, Error)]
114pub enum Error {
115 #[error(transparent)]
117 Oneshot(#[from] oneshot::Canceled),
118 #[error(transparent)]
120 Mpsc(#[from] mpsc::SendError),
121 #[error(transparent)]
123 Subsystem(#[from] SubsystemError),
124 #[error(transparent)]
126 RuntimeApi(#[from] RuntimeApiError),
127 #[error(transparent)]
129 Infallible(#[from] std::convert::Infallible),
130 #[error("AllMessage not relevant to Job")]
132 SenderConversion(String),
133 #[error("Node is not a validator")]
135 NotAValidator,
136 #[error("AlreadyForwarding")]
138 AlreadyForwarding,
139 #[error("Data are not available")]
141 DataNotAvailable,
142}
143
144impl From<OverseerError> for Error {
145 fn from(e: OverseerError) -> Self {
146 Self::from(SubsystemError::from(e))
147 }
148}
149
150impl TryFrom<crate::runtime::Error> for Error {
151 type Error = ();
152
153 fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
154 use crate::runtime::Error;
155
156 match e {
157 Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
158 Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
159 Error::NoSuchSession(_) | Error::NoExecutorParams(_) => Err(()),
160 }
161 }
162}
163
164pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
166
167pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
169 parent: Hash,
170 sender: &mut Sender,
171 request_builder: RequestBuilder,
172) -> RuntimeApiReceiver<Response>
173where
174 RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
175 Sender: SubsystemSender<RuntimeApiMessage>,
176{
177 let (tx, rx) = oneshot::channel();
178
179 sender
180 .send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
181 .await;
182
183 rx
184}
185
186pub async fn has_required_runtime<Sender>(
189 sender: &mut Sender,
190 relay_parent: Hash,
191 required_runtime_version: u32,
192) -> bool
193where
194 Sender: SubsystemSender<RuntimeApiMessage>,
195{
196 gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
197
198 let (tx, rx) = oneshot::channel();
199 sender
200 .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
201 .await;
202
203 match rx.await {
204 Result::Ok(Ok(runtime_version)) => {
205 gum::trace!(
206 target: LOG_TARGET,
207 ?relay_parent,
208 ?runtime_version,
209 ?required_runtime_version,
210 "Fetched ParachainHost runtime api version"
211 );
212 runtime_version >= required_runtime_version
213 },
214 Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
215 gum::trace!(
216 target: LOG_TARGET,
217 ?relay_parent,
218 ?error,
219 "Execution error while fetching ParachainHost runtime api version"
220 );
221 false
222 },
223 Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
224 gum::trace!(
225 target: LOG_TARGET,
226 ?relay_parent,
227 "NotSupported error while fetching ParachainHost runtime api version"
228 );
229 false
230 },
231 Result::Err(_) => {
232 gum::trace!(
233 target: LOG_TARGET,
234 ?relay_parent,
235 "Cancelled error while fetching ParachainHost runtime api version"
236 );
237 false
238 },
239 }
240}
241
242macro_rules! specialize_requests {
246 (fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
248 specialize_requests!{
249 named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
250 }
251 };
252
253 (named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
255 #[doc = "Request `"]
256 #[doc = $doc_name]
257 #[doc = "` from the runtime"]
258 pub async fn $func_name (
259 parent: Hash,
260 $(
261 $param_name: $param_ty,
262 )*
263 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
264 ) -> RuntimeApiReceiver<$return_ty>
265 {
266 request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
267 $( $param_name, )* tx
268 )).await
269 }
270 };
271
272 (
274 fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
275 $(
276 fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
277 )+
278 ) => {
279 specialize_requests!{
280 fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
281 }
282 specialize_requests!{
283 $(
284 fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
285 )+
286 }
287 };
288}
289
290specialize_requests! {
291 fn request_runtime_api_version() -> u32; Version;
292 fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
293 fn request_validators() -> Vec<ValidatorId>; Validators;
294 fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
295 fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
296 fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
297 fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
298 fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
299 fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
300 fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
301 fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
302 fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
303 fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
304 fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
305 fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
306 -> Option<ValidationCodeHash>; ValidationCodeHash;
307 fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
308 fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
309 fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashes;
310 fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
311 fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
312 fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
313 fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
314 fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
315 fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
316 fn request_backing_constraints(para_id: ParaId) -> Option<Constraints>; BackingConstraints;
317 fn request_min_backing_votes(session_index: SessionIndex) -> u32; MinimumBackingVotes;
318 fn request_node_features(session_index: SessionIndex) -> NodeFeatures; NodeFeatures;
319 fn request_para_ids(session_index: SessionIndex) -> Vec<ParaId>; ParaIds;
320
321}
322
323pub async fn executor_params_at_relay_parent(
333 relay_parent: Hash,
334 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
335) -> Result<ExecutorParams, Error> {
336 match request_session_index_for_child(relay_parent, sender).await.await {
337 Err(err) => {
338 Err(Error::Oneshot(err))
340 },
341 Ok(Err(err)) => {
342 Err(Error::RuntimeApi(err))
344 },
345 Ok(Ok(session_index)) => {
346 match request_session_executor_params(relay_parent, session_index, sender).await.await {
347 Err(err) => {
348 Err(Error::Oneshot(err))
350 },
351 Ok(Err(RuntimeApiError::NotSupported { .. })) => {
352 Ok(ExecutorParams::default())
355 },
356 Ok(Err(err)) => {
357 Err(Error::RuntimeApi(err))
359 },
360 Ok(Ok(None)) => {
361 Err(Error::DataNotAvailable)
364 },
365 Ok(Ok(Some(executor_params))) => Ok(executor_params),
366 }
367 },
368 }
369}
370
371pub fn signing_key<'a>(
373 validators: impl IntoIterator<Item = &'a ValidatorId>,
374 keystore: &KeystorePtr,
375) -> Option<ValidatorId> {
376 signing_key_and_index(validators, keystore).map(|(k, _)| k)
377}
378
379pub fn signing_key_and_index<'a>(
382 validators: impl IntoIterator<Item = &'a ValidatorId>,
383 keystore: &KeystorePtr,
384) -> Option<(ValidatorId, ValidatorIndex)> {
385 for (i, v) in validators.into_iter().enumerate() {
386 if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
387 return Some((v.clone(), ValidatorIndex(i as _)))
388 }
389 }
390 None
391}
392
393pub fn sign(
398 keystore: &KeystorePtr,
399 key: &ValidatorId,
400 data: &[u8],
401) -> Result<Option<ValidatorSignature>, KeystoreError> {
402 let signature = keystore
403 .sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
404 .map(|sig| sig.into());
405 Ok(signature)
406}
407
408pub fn find_validator_group(
410 groups: &[Vec<ValidatorIndex>],
411 index: ValidatorIndex,
412) -> Option<GroupIndex> {
413 groups.iter().enumerate().find_map(|(i, g)| {
414 if g.contains(&index) {
415 Some(GroupIndex(i as _))
416 } else {
417 None
418 }
419 })
420}
421
422pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
425 choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
426}
427
428pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
431 is_priority: F,
432 v: &mut Vec<T>,
433 rng: &mut R,
434 min: usize,
435) {
436 use rand::seq::SliceRandom as _;
437
438 let i = itertools::partition(v.iter_mut(), is_priority);
441
442 if i >= min || v.len() <= i {
443 v.truncate(i);
444 return
445 }
446
447 v[i..].shuffle(rng);
448
449 v.truncate(min);
450}
451
452pub fn gen_ratio(a: usize, b: usize) -> bool {
454 gen_ratio_rng(a, b, &mut rand::thread_rng())
455}
456
457pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
459 rng.gen_ratio(a as u32, b as u32)
460}
461
462#[derive(Debug)]
467pub struct Validator {
468 signing_context: SigningContext,
469 key: ValidatorId,
470 index: ValidatorIndex,
471 disabled: bool,
472}
473
474impl Validator {
475 pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
478 where
479 S: SubsystemSender<RuntimeApiMessage>,
480 {
481 let (validators, disabled_validators, session_index) = futures::try_join!(
485 request_validators(parent, sender).await,
486 request_disabled_validators(parent, sender).await,
487 request_session_index_for_child(parent, sender).await,
488 )?;
489
490 let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
491
492 let validators = validators?;
493
494 let disabled_validators = disabled_validators?;
495
496 Self::construct(&validators, &disabled_validators, signing_context, keystore)
497 }
498
499 pub fn construct(
503 validators: &[ValidatorId],
504 disabled_validators: &[ValidatorIndex],
505 signing_context: SigningContext,
506 keystore: KeystorePtr,
507 ) -> Result<Self, Error> {
508 let (key, index) =
509 signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
510
511 let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
512
513 Ok(Validator { signing_context, key, index, disabled })
514 }
515
516 pub fn id(&self) -> ValidatorId {
518 self.key.clone()
519 }
520
521 pub fn index(&self) -> ValidatorIndex {
523 self.index
524 }
525
526 pub fn disabled(&self) -> bool {
528 self.disabled
529 }
530
531 pub fn signing_context(&self) -> &SigningContext {
533 &self.signing_context
534 }
535
536 pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
538 &self,
539 keystore: KeystorePtr,
540 payload: Payload,
541 ) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
542 Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
543 }
544}