referrerpolicy=no-referrer-when-downgrade

polkadot_node_subsystem_util/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Utility module for subsystems
18//!
19//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
20//! or determining what their validator ID is. These common interests are factored into
21//! this module.
22//!
23//! This crate also reexports Prometheus metric types which are expected to be implemented by
24//! subsystems.
25
26#![warn(missing_docs)]
27
28pub use overseer::{
29	gen::{OrchestraError as OverseerError, Timeout},
30	Subsystem, TimeoutExt,
31};
32use polkadot_node_subsystem::{
33	errors::{RuntimeApiError, SubsystemError},
34	messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
35	overseer, SubsystemSender,
36};
37
38pub use polkadot_node_metrics::{metrics, Metronome};
39
40use codec::Encode;
41use futures::channel::{mpsc, oneshot};
42
43use polkadot_primitives::{
44	async_backing::{BackingState, Constraints},
45	slashing, AsyncBackingParams, AuthorityDiscoveryId, CandidateEvent, CandidateHash,
46	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState, EncodeAs,
47	ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures,
48	OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionIndex,
49	SessionInfo, Signed, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId,
50	ValidatorIndex, ValidatorSignature,
51};
52pub use rand;
53use sp_application_crypto::AppCrypto;
54use sp_core::ByteArray;
55use sp_keystore::{Error as KeystoreError, KeystorePtr};
56use std::{
57	collections::{BTreeMap, VecDeque},
58	time::Duration,
59};
60use thiserror::Error;
61
62pub use determine_new_blocks::determine_new_blocks;
63pub use metered;
64pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
65
66/// These reexports are required so that external crates can use the `delegated_subsystem` macro
67/// properly.
68pub mod reexports {
69	pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
70}
71
72/// Helpers for the validator->chunk index mapping.
73pub mod availability_chunks;
74/// A utility for managing the implicit view of the relay-chain derived from active
75/// leaves and the minimum allowed relay-parents that parachain candidates can have
76/// and be backed in those leaves' children.
77pub mod backing_implicit_view;
78/// Database trait for subsystem.
79pub mod database;
80/// An emulator for node-side code to predict the results of on-chain parachain inclusion
81/// and predict future constraints.
82pub mod inclusion_emulator;
83/// Convenient and efficient runtime info access.
84pub mod runtime;
85
86/// Helpers for working with unreleased runtime calls
87pub mod vstaging;
88
89/// Nested message sending
90///
91/// Useful for having mostly synchronous code, with submodules spawning short lived asynchronous
92/// tasks, sending messages back.
93pub mod nesting_sender;
94
95pub mod reputation;
96
97mod determine_new_blocks;
98
99mod controlled_validator_indices;
100pub use controlled_validator_indices::ControlledValidatorIndices;
101
102#[cfg(test)]
103mod tests;
104
105const LOG_TARGET: &'static str = "parachain::subsystem-util";
106
107/// Duration a job will wait after sending a stop signal before hard-aborting.
108pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
109/// Capacity of channels to and from individual jobs
110pub const JOB_CHANNEL_CAPACITY: usize = 64;
111
112/// Utility errors
113#[derive(Debug, Error)]
114pub enum Error {
115	/// Attempted to send or receive on a oneshot channel which had been canceled
116	#[error(transparent)]
117	Oneshot(#[from] oneshot::Canceled),
118	/// Attempted to send on a MPSC channel which has been canceled
119	#[error(transparent)]
120	Mpsc(#[from] mpsc::SendError),
121	/// A subsystem error
122	#[error(transparent)]
123	Subsystem(#[from] SubsystemError),
124	/// An error in the Runtime API.
125	#[error(transparent)]
126	RuntimeApi(#[from] RuntimeApiError),
127	/// The type system wants this even though it doesn't make sense
128	#[error(transparent)]
129	Infallible(#[from] std::convert::Infallible),
130	/// Attempted to convert from an `AllMessages` to a `FromJob`, and failed.
131	#[error("AllMessage not relevant to Job")]
132	SenderConversion(String),
133	/// The local node is not a validator.
134	#[error("Node is not a validator")]
135	NotAValidator,
136	/// Already forwarding errors to another sender
137	#[error("AlreadyForwarding")]
138	AlreadyForwarding,
139	/// Data that are supposed to be there a not there
140	#[error("Data are not available")]
141	DataNotAvailable,
142}
143
144impl From<OverseerError> for Error {
145	fn from(e: OverseerError) -> Self {
146		Self::from(SubsystemError::from(e))
147	}
148}
149
150impl TryFrom<crate::runtime::Error> for Error {
151	type Error = ();
152
153	fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
154		use crate::runtime::Error;
155
156		match e {
157			Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
158			Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
159			Error::NoSuchSession(_) | Error::NoExecutorParams(_) => Err(()),
160		}
161	}
162}
163
164/// A type alias for Runtime API receivers.
165pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
166
167/// Request some data from the `RuntimeApi`.
168pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
169	parent: Hash,
170	sender: &mut Sender,
171	request_builder: RequestBuilder,
172) -> RuntimeApiReceiver<Response>
173where
174	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
175	Sender: SubsystemSender<RuntimeApiMessage>,
176{
177	let (tx, rx) = oneshot::channel();
178
179	sender
180		.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
181		.await;
182
183	rx
184}
185
186/// Verifies if `ParachainHost` runtime api is at least at version `required_runtime_version`. This
187/// method is used to determine if a given runtime call is supported by the runtime.
188pub async fn has_required_runtime<Sender>(
189	sender: &mut Sender,
190	relay_parent: Hash,
191	required_runtime_version: u32,
192) -> bool
193where
194	Sender: SubsystemSender<RuntimeApiMessage>,
195{
196	gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
197
198	let (tx, rx) = oneshot::channel();
199	sender
200		.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
201		.await;
202
203	match rx.await {
204		Result::Ok(Ok(runtime_version)) => {
205			gum::trace!(
206				target: LOG_TARGET,
207				?relay_parent,
208				?runtime_version,
209				?required_runtime_version,
210				"Fetched  ParachainHost runtime api version"
211			);
212			runtime_version >= required_runtime_version
213		},
214		Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
215			gum::trace!(
216				target: LOG_TARGET,
217				?relay_parent,
218				?error,
219				"Execution error while fetching ParachainHost runtime api version"
220			);
221			false
222		},
223		Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
224			gum::trace!(
225				target: LOG_TARGET,
226				?relay_parent,
227				"NotSupported error while fetching ParachainHost runtime api version"
228			);
229			false
230		},
231		Result::Err(_) => {
232			gum::trace!(
233				target: LOG_TARGET,
234				?relay_parent,
235				"Cancelled error while fetching ParachainHost runtime api version"
236			);
237			false
238		},
239	}
240}
241
242/// Construct specialized request functions for the runtime.
243///
244/// These would otherwise get pretty repetitive.
245macro_rules! specialize_requests {
246	// expand return type name for documentation purposes
247	(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
248		specialize_requests!{
249			named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
250		}
251	};
252
253	// create a single specialized request function
254	(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
255		#[doc = "Request `"]
256		#[doc = $doc_name]
257		#[doc = "` from the runtime"]
258		pub async fn $func_name (
259			parent: Hash,
260			$(
261				$param_name: $param_ty,
262			)*
263			sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
264		) -> RuntimeApiReceiver<$return_ty>
265		{
266			request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
267				$( $param_name, )* tx
268			)).await
269		}
270	};
271
272	// recursive decompose
273	(
274		fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
275		$(
276			fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
277		)+
278	) => {
279		specialize_requests!{
280			fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
281		}
282		specialize_requests!{
283			$(
284				fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
285			)+
286		}
287	};
288}
289
290specialize_requests! {
291	fn request_runtime_api_version() -> u32; Version;
292	fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
293	fn request_validators() -> Vec<ValidatorId>; Validators;
294	fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
295	fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
296	fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
297	fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
298	fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
299	fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
300	fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
301	fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
302	fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
303	fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
304	fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
305	fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
306		-> Option<ValidationCodeHash>; ValidationCodeHash;
307	fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
308	fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
309	fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashes;
310	fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
311	fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
312	fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
313	fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
314	fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
315	fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
316	fn request_backing_constraints(para_id: ParaId) -> Option<Constraints>; BackingConstraints;
317	fn request_min_backing_votes(session_index: SessionIndex) -> u32; MinimumBackingVotes;
318	fn request_node_features(session_index: SessionIndex) -> NodeFeatures; NodeFeatures;
319	fn request_para_ids(session_index: SessionIndex) -> Vec<ParaId>; ParaIds;
320
321}
322
323/// Requests executor parameters from the runtime effective at given relay-parent. First obtains
324/// session index at the relay-parent, relying on the fact that it should be cached by the runtime
325/// API caching layer even if the block itself has already been pruned. Then requests executor
326/// parameters by session index.
327/// Returns an error if failed to communicate to the runtime, or the parameters are not in the
328/// storage, which should never happen.
329/// Returns default execution parameters if the runtime doesn't yet support `SessionExecutorParams`
330/// API call.
331/// Otherwise, returns execution parameters returned by the runtime.
332pub async fn executor_params_at_relay_parent(
333	relay_parent: Hash,
334	sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
335) -> Result<ExecutorParams, Error> {
336	match request_session_index_for_child(relay_parent, sender).await.await {
337		Err(err) => {
338			// Failed to communicate with the runtime
339			Err(Error::Oneshot(err))
340		},
341		Ok(Err(err)) => {
342			// Runtime has failed to obtain a session index at the relay-parent.
343			Err(Error::RuntimeApi(err))
344		},
345		Ok(Ok(session_index)) => {
346			match request_session_executor_params(relay_parent, session_index, sender).await.await {
347				Err(err) => {
348					// Failed to communicate with the runtime
349					Err(Error::Oneshot(err))
350				},
351				Ok(Err(RuntimeApiError::NotSupported { .. })) => {
352					// Runtime doesn't yet support the api requested, should execute anyway
353					// with default set of parameters
354					Ok(ExecutorParams::default())
355				},
356				Ok(Err(err)) => {
357					// Runtime failed to execute the request
358					Err(Error::RuntimeApi(err))
359				},
360				Ok(Ok(None)) => {
361					// Storage doesn't contain a parameter set for the given session; should
362					// never happen
363					Err(Error::DataNotAvailable)
364				},
365				Ok(Ok(Some(executor_params))) => Ok(executor_params),
366			}
367		},
368	}
369}
370
371/// From the given set of validators, find the first key we can sign with, if any.
372pub fn signing_key<'a>(
373	validators: impl IntoIterator<Item = &'a ValidatorId>,
374	keystore: &KeystorePtr,
375) -> Option<ValidatorId> {
376	signing_key_and_index(validators, keystore).map(|(k, _)| k)
377}
378
379/// From the given set of validators, find the first key we can sign with, if any, and return it
380/// along with the validator index.
381pub fn signing_key_and_index<'a>(
382	validators: impl IntoIterator<Item = &'a ValidatorId>,
383	keystore: &KeystorePtr,
384) -> Option<(ValidatorId, ValidatorIndex)> {
385	for (i, v) in validators.into_iter().enumerate() {
386		if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
387			return Some((v.clone(), ValidatorIndex(i as _)))
388		}
389	}
390	None
391}
392
393/// Sign the given data with the given validator ID.
394///
395/// Returns `Ok(None)` if the private key that corresponds to that validator ID is not found in the
396/// given keystore. Returns an error if the key could not be used for signing.
397pub fn sign(
398	keystore: &KeystorePtr,
399	key: &ValidatorId,
400	data: &[u8],
401) -> Result<Option<ValidatorSignature>, KeystoreError> {
402	let signature = keystore
403		.sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
404		.map(|sig| sig.into());
405	Ok(signature)
406}
407
408/// Find the validator group the given validator index belongs to.
409pub fn find_validator_group(
410	groups: &[Vec<ValidatorIndex>],
411	index: ValidatorIndex,
412) -> Option<GroupIndex> {
413	groups.iter().enumerate().find_map(|(i, g)| {
414		if g.contains(&index) {
415			Some(GroupIndex(i as _))
416		} else {
417			None
418		}
419	})
420}
421
422/// Choose a random subset of `min` elements.
423/// But always include `is_priority` elements.
424pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
425	choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
426}
427
428/// Choose a random subset of `min` elements using a specific Random Generator `Rng`
429/// But always include `is_priority` elements.
430pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
431	is_priority: F,
432	v: &mut Vec<T>,
433	rng: &mut R,
434	min: usize,
435) {
436	use rand::seq::SliceRandom as _;
437
438	// partition the elements into priority first
439	// the returned index is when non_priority elements start
440	let i = itertools::partition(v.iter_mut(), is_priority);
441
442	if i >= min || v.len() <= i {
443		v.truncate(i);
444		return
445	}
446
447	v[i..].shuffle(rng);
448
449	v.truncate(min);
450}
451
452/// Returns a `bool` with a probability of `a / b` of being true.
453pub fn gen_ratio(a: usize, b: usize) -> bool {
454	gen_ratio_rng(a, b, &mut rand::thread_rng())
455}
456
457/// Returns a `bool` with a probability of `a / b` of being true.
458pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
459	rng.gen_ratio(a as u32, b as u32)
460}
461
462/// Local validator information
463///
464/// It can be created if the local node is a validator in the context of a particular
465/// relay chain block.
466#[derive(Debug)]
467pub struct Validator {
468	signing_context: SigningContext,
469	key: ValidatorId,
470	index: ValidatorIndex,
471	disabled: bool,
472}
473
474impl Validator {
475	/// Get a struct representing this node's validator if this node is in fact a validator in the
476	/// context of the given block.
477	pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
478	where
479		S: SubsystemSender<RuntimeApiMessage>,
480	{
481		// Note: request_validators, request_disabled_validators and request_session_index_for_child
482		// do not and cannot run concurrently: they both have a mutable handle to the same sender.
483		// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
484		let (validators, disabled_validators, session_index) = futures::try_join!(
485			request_validators(parent, sender).await,
486			request_disabled_validators(parent, sender).await,
487			request_session_index_for_child(parent, sender).await,
488		)?;
489
490		let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
491
492		let validators = validators?;
493
494		let disabled_validators = disabled_validators?;
495
496		Self::construct(&validators, &disabled_validators, signing_context, keystore)
497	}
498
499	/// Construct a validator instance without performing runtime fetches.
500	///
501	/// This can be useful if external code also needs the same data.
502	pub fn construct(
503		validators: &[ValidatorId],
504		disabled_validators: &[ValidatorIndex],
505		signing_context: SigningContext,
506		keystore: KeystorePtr,
507	) -> Result<Self, Error> {
508		let (key, index) =
509			signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
510
511		let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
512
513		Ok(Validator { signing_context, key, index, disabled })
514	}
515
516	/// Get this validator's id.
517	pub fn id(&self) -> ValidatorId {
518		self.key.clone()
519	}
520
521	/// Get this validator's local index.
522	pub fn index(&self) -> ValidatorIndex {
523		self.index
524	}
525
526	/// Get the enabled/disabled state of this validator
527	pub fn disabled(&self) -> bool {
528		self.disabled
529	}
530
531	/// Get the current signing context.
532	pub fn signing_context(&self) -> &SigningContext {
533		&self.signing_context
534	}
535
536	/// Sign a payload with this validator
537	pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
538		&self,
539		keystore: KeystorePtr,
540		payload: Payload,
541	) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
542		Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
543	}
544}