referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_pvf_checker/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the PVF pre-checking subsystem.
18//!
19//! This subsystem is responsible for scanning the chain for PVFs that are pending for the approval
20//! as well as submitting statements regarding them passing or not the PVF pre-checking.
21
22use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered};
23
24use polkadot_node_subsystem::{
25	messages::{CandidateValidationMessage, PreCheckOutcome, PvfCheckerMessage, RuntimeApiMessage},
26	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
27	SubsystemResult, SubsystemSender,
28};
29use polkadot_primitives::{
30	BlockNumber, Hash, PvfCheckStatement, SessionIndex, ValidationCodeHash, ValidatorId,
31	ValidatorIndex,
32};
33use sp_keystore::KeystorePtr;
34use std::collections::HashSet;
35
36const LOG_TARGET: &str = "parachain::pvf-checker";
37
38mod interest_view;
39mod metrics;
40mod runtime_api;
41
42#[cfg(test)]
43mod tests;
44
45use self::{
46	interest_view::{InterestView, Judgement},
47	metrics::Metrics,
48};
49
50/// PVF pre-checking subsystem.
51pub struct PvfCheckerSubsystem {
52	keystore: KeystorePtr,
53	metrics: Metrics,
54}
55
56impl PvfCheckerSubsystem {
57	pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
58		PvfCheckerSubsystem { keystore, metrics }
59	}
60}
61
62#[overseer::subsystem(PvfChecker, error=SubsystemError, prefix = self::overseer)]
63impl<Context> PvfCheckerSubsystem {
64	fn start(self, ctx: Context) -> SpawnedSubsystem {
65		let future = run(ctx, self.keystore, self.metrics)
66			.map_err(|e| SubsystemError::with_origin("pvf-checker", e))
67			.boxed();
68
69		SpawnedSubsystem { name: "pvf-checker-subsystem", future }
70	}
71}
72
73/// A struct that holds the credentials required to sign the PVF check statements. These credentials
74/// are implicitly to pinned to a session where our node acts as a validator.
75struct SigningCredentials {
76	/// The validator public key.
77	validator_key: ValidatorId,
78	/// The validator index in the current session.
79	validator_index: ValidatorIndex,
80}
81
82struct State {
83	/// If `Some` then our node is in the active validator set during the current session.
84	///
85	/// Updated when a new session index is detected in one of the heads.
86	credentials: Option<SigningCredentials>,
87
88	/// The number and the hash of the most recent block that we have seen.
89	///
90	/// This is only updated when the PVF pre-checking API is detected in a new leaf block.
91	recent_block: Option<(BlockNumber, Hash)>,
92
93	/// The session index of the most recent session that we have seen.
94	///
95	/// This is only updated when the PVF pre-checking API is detected in a new leaf block.
96	latest_session: Option<SessionIndex>,
97
98	/// The set of PVF hashes that we cast a vote for within the current session.
99	voted: HashSet<ValidationCodeHash>,
100
101	/// The collection of PVFs that are observed throughout the active heads.
102	view: InterestView,
103
104	/// The container for the futures that are waiting for the outcome of the pre-checking.
105	///
106	/// Here are some fun facts about these futures:
107	///
108	/// - Pre-checking can take quite some time, in the matter of tens of seconds, so the futures
109	///   here can soak for quite some time.
110	/// - Pre-checking of one PVF can take drastically more time than pre-checking of another PVF.
111	///   This leads to results coming out of order.
112	///
113	/// Resolving to `None` means that the request was dropped before replying.
114	currently_checking:
115		FuturesUnordered<BoxFuture<'static, Option<(PreCheckOutcome, ValidationCodeHash)>>>,
116}
117
118#[overseer::contextbounds(PvfChecker, prefix = self::overseer)]
119async fn run<Context>(
120	mut ctx: Context,
121	keystore: KeystorePtr,
122	metrics: Metrics,
123) -> SubsystemResult<()> {
124	let mut state = State {
125		credentials: None,
126		recent_block: None,
127		latest_session: None,
128		voted: HashSet::with_capacity(16),
129		view: InterestView::new(),
130		currently_checking: FuturesUnordered::new(),
131	};
132
133	loop {
134		let mut sender = ctx.sender().clone();
135		futures::select! {
136			precheck_response = state.currently_checking.select_next_some() => {
137				if let Some((outcome, validation_code_hash)) = precheck_response {
138					handle_pvf_check(
139						&mut state,
140						&mut sender,
141						&keystore,
142						&metrics,
143						outcome,
144						validation_code_hash,
145					).await;
146				} else {
147					// See note in `initiate_precheck` for why this is possible and why we do not
148					// care here.
149				}
150			}
151			from_overseer = ctx.recv().fuse() => {
152				let outcome = handle_from_overseer(
153					&mut state,
154					&mut sender,
155					&keystore,
156					&metrics,
157					from_overseer?,
158				)
159				.await;
160				if let Some(Conclude) = outcome {
161					return Ok(());
162				}
163			}
164		}
165	}
166}
167
168/// Handle an incoming PVF pre-check result from the candidate-validation subsystem.
169async fn handle_pvf_check(
170	state: &mut State,
171	sender: &mut impl overseer::PvfCheckerSenderTrait,
172	keystore: &KeystorePtr,
173	metrics: &Metrics,
174	outcome: PreCheckOutcome,
175	validation_code_hash: ValidationCodeHash,
176) {
177	gum::debug!(
178		target: LOG_TARGET,
179		?validation_code_hash,
180		"Received pre-check result: {:?}",
181		outcome,
182	);
183
184	let judgement = match outcome {
185		PreCheckOutcome::Valid => Judgement::Valid,
186		PreCheckOutcome::Invalid => Judgement::Invalid,
187		PreCheckOutcome::Failed => {
188			// Always vote against in case of failures. Voting against a PVF when encountering a
189			// timeout (or an unlikely node-specific issue) can be considered safe, since
190			// there is no slashing for being on the wrong side on a pre-check vote.
191			//
192			// Also, by being more strict here, we can safely be more lenient during preparation and
193			// avoid the risk of getting slashed there.
194			gum::info!(
195				target: LOG_TARGET,
196				?validation_code_hash,
197				"Pre-check failed, voting against",
198			);
199			Judgement::Invalid
200		},
201	};
202
203	match state.view.on_judgement(validation_code_hash, judgement) {
204		Ok(()) => (),
205		Err(()) => {
206			gum::debug!(
207				target: LOG_TARGET,
208				?validation_code_hash,
209				"received judgement for an unknown (or removed) PVF hash",
210			);
211			return
212		},
213	}
214
215	match (state.credentials.as_ref(), state.recent_block, state.latest_session) {
216		// Note, the availability of credentials implies the availability of the recent block and
217		// the session index.
218		(Some(credentials), Some(recent_block), Some(session_index)) => {
219			sign_and_submit_pvf_check_statement(
220				sender,
221				keystore,
222				&mut state.voted,
223				credentials,
224				metrics,
225				recent_block.1,
226				session_index,
227				judgement,
228				validation_code_hash,
229			)
230			.await;
231		},
232		_ => (),
233	}
234}
235
236/// A marker for the outer loop that the subsystem should stop.
237struct Conclude;
238
239async fn handle_from_overseer(
240	state: &mut State,
241	sender: &mut impl overseer::PvfCheckerSenderTrait,
242	keystore: &KeystorePtr,
243	metrics: &Metrics,
244	from_overseer: FromOrchestra<PvfCheckerMessage>,
245) -> Option<Conclude> {
246	match from_overseer {
247		FromOrchestra::Signal(OverseerSignal::Conclude) => {
248			gum::info!(target: LOG_TARGET, "Received `Conclude` signal, exiting");
249			Some(Conclude)
250		},
251		FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, _)) => {
252			// ignore
253			None
254		},
255		FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
256			handle_leaves_update(state, sender, keystore, metrics, update).await;
257			None
258		},
259		FromOrchestra::Communication { msg } => match msg {
260				// uninhabited type, thus statically unreachable.
261			},
262	}
263}
264
265async fn handle_leaves_update(
266	state: &mut State,
267	sender: &mut impl overseer::PvfCheckerSenderTrait,
268	keystore: &KeystorePtr,
269	metrics: &Metrics,
270	update: ActiveLeavesUpdate,
271) {
272	if let Some(activated) = update.activated {
273		let ActivationEffect { new_session_index, recent_block, pending_pvfs } =
274			match examine_activation(state, sender, keystore, activated.hash, activated.number)
275				.await
276			{
277				None => {
278					// None indicates that the pre-checking runtime API is not supported.
279					return
280				},
281				Some(e) => e,
282			};
283
284		// Note that this is not necessarily the newly activated leaf.
285		let recent_block_hash = recent_block.1;
286		state.recent_block = Some(recent_block);
287
288		// Update the PVF view and get the previously unseen PVFs and start working on them.
289		let outcome = state
290			.view
291			.on_leaves_update(Some((activated.hash, pending_pvfs)), &update.deactivated);
292		metrics.on_pvf_observed(outcome.newcomers.len());
293		metrics.on_pvf_left(outcome.left_num);
294		for newcomer in outcome.newcomers {
295			initiate_precheck(state, sender, activated.hash, newcomer, metrics).await;
296		}
297
298		if let Some((new_session_index, credentials)) = new_session_index {
299			// New session change:
300			// - update the session index
301			// - reset the set of all PVFs we voted.
302			// - set (or reset) the credentials.
303			state.latest_session = Some(new_session_index);
304			state.voted.clear();
305			state.credentials = credentials;
306
307			// If our node is a validator in the new session, we need to re-sign and submit all
308			// previously obtained judgements.
309			if let Some(ref credentials) = state.credentials {
310				for (code_hash, judgement) in state.view.judgements() {
311					sign_and_submit_pvf_check_statement(
312						sender,
313						keystore,
314						&mut state.voted,
315						credentials,
316						metrics,
317						recent_block_hash,
318						new_session_index,
319						judgement,
320						code_hash,
321					)
322					.await;
323				}
324			}
325		}
326	} else {
327		state.view.on_leaves_update(None, &update.deactivated);
328	}
329}
330
331struct ActivationEffect {
332	/// If the activated leaf is in a new session, the index of the new session. If the new session
333	/// has a validator in the set our node happened to have private key for, the signing
334	new_session_index: Option<(SessionIndex, Option<SigningCredentials>)>,
335	/// This is the block hash and number of the newly activated block if it's "better" than the
336	/// last one we've seen. The block is better if it's number is higher or if there are no blocks
337	/// observed whatsoever. If the leaf is not better then this holds the existing recent block.
338	recent_block: (BlockNumber, Hash),
339	/// The full list of PVFs that are pending pre-checking according to the runtime API. In case
340	/// the API returned an error this list is empty.
341	pending_pvfs: Vec<ValidationCodeHash>,
342}
343
344/// Examines the new leaf and returns the effects of the examination.
345///
346/// Returns `None` if the PVF pre-checking runtime API is not supported for the given leaf hash.
347async fn examine_activation(
348	state: &mut State,
349	sender: &mut impl overseer::PvfCheckerSenderTrait,
350	keystore: &KeystorePtr,
351	leaf_hash: Hash,
352	leaf_number: BlockNumber,
353) -> Option<ActivationEffect> {
354	gum::debug!(
355		target: LOG_TARGET,
356		"Examining activation of leaf {:?} ({})",
357		leaf_hash,
358		leaf_number,
359	);
360
361	let pending_pvfs = match runtime_api::pvfs_require_precheck(sender, leaf_hash).await {
362		Err(runtime_api::RuntimeRequestError::NotSupported) => return None,
363		Err(_) => {
364			gum::debug!(
365				target: LOG_TARGET,
366				relay_parent = ?leaf_hash,
367				"cannot fetch PVFs that require pre-checking from runtime API",
368			);
369			Vec::new()
370		},
371		Ok(v) => v,
372	};
373
374	let recent_block = match state.recent_block {
375		Some((recent_block_num, recent_block_hash)) if leaf_number < recent_block_num => {
376			// the existing recent block is not worse than the new activation, so leave it.
377			(recent_block_num, recent_block_hash)
378		},
379		_ => (leaf_number, leaf_hash),
380	};
381
382	let new_session_index = match runtime_api::session_index_for_child(sender, leaf_hash).await {
383		Ok(session_index) =>
384			if state.latest_session.map_or(true, |l| l < session_index) {
385				let signing_credentials =
386					check_signing_credentials(sender, keystore, leaf_hash).await;
387				Some((session_index, signing_credentials))
388			} else {
389				None
390			},
391		Err(e) => {
392			gum::warn!(
393				target: LOG_TARGET,
394				relay_parent = ?leaf_hash,
395				"cannot fetch session index from runtime API: {:?}",
396				e,
397			);
398			None
399		},
400	};
401
402	Some(ActivationEffect { new_session_index, recent_block, pending_pvfs })
403}
404
405/// Checks the active validators for the given leaf. If we have a signing key for one of them,
406/// returns the [`SigningCredentials`].
407async fn check_signing_credentials(
408	sender: &mut impl SubsystemSender<RuntimeApiMessage>,
409	keystore: &KeystorePtr,
410	leaf: Hash,
411) -> Option<SigningCredentials> {
412	let validators = match runtime_api::validators(sender, leaf).await {
413		Ok(v) => v,
414		Err(e) => {
415			gum::warn!(
416				target: LOG_TARGET,
417				relay_parent = ?leaf,
418				"error occurred during requesting validators: {:?}",
419				e
420			);
421			return None
422		},
423	};
424
425	polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore).map(
426		|(validator_key, validator_index)| SigningCredentials { validator_key, validator_index },
427	)
428}
429
430/// Signs and submits a vote for or against a given validation code.
431///
432/// If the validator already voted for the given code, this function does nothing.
433async fn sign_and_submit_pvf_check_statement(
434	sender: &mut impl overseer::PvfCheckerSenderTrait,
435	keystore: &KeystorePtr,
436	voted: &mut HashSet<ValidationCodeHash>,
437	credentials: &SigningCredentials,
438	metrics: &Metrics,
439	relay_parent: Hash,
440	session_index: SessionIndex,
441	judgement: Judgement,
442	validation_code_hash: ValidationCodeHash,
443) {
444	gum::debug!(
445		target: LOG_TARGET,
446		?validation_code_hash,
447		?relay_parent,
448		"submitting a PVF check statement for validation code = {:?}",
449		judgement,
450	);
451
452	metrics.on_vote_submission_started();
453
454	if voted.contains(&validation_code_hash) {
455		gum::trace!(
456			target: LOG_TARGET,
457			relay_parent = ?relay_parent,
458			?validation_code_hash,
459			"already voted for this validation code",
460		);
461		metrics.on_vote_duplicate();
462		return
463	}
464
465	voted.insert(validation_code_hash);
466
467	let stmt = PvfCheckStatement {
468		accept: judgement.is_valid(),
469		session_index,
470		subject: validation_code_hash,
471		validator_index: credentials.validator_index,
472	};
473	let signature = match polkadot_node_subsystem_util::sign(
474		keystore,
475		&credentials.validator_key,
476		&stmt.signing_payload(),
477	) {
478		Ok(Some(signature)) => signature,
479		Ok(None) => {
480			gum::warn!(
481				target: LOG_TARGET,
482				?relay_parent,
483				validator_index = ?credentials.validator_index,
484				?validation_code_hash,
485				"private key for signing is not available",
486			);
487			return
488		},
489		Err(e) => {
490			gum::warn!(
491				target: LOG_TARGET,
492				?relay_parent,
493				validator_index = ?credentials.validator_index,
494				?validation_code_hash,
495				"error signing the statement: {:?}",
496				e,
497			);
498			return
499		},
500	};
501
502	match runtime_api::submit_pvf_check_statement(sender, relay_parent, stmt, signature).await {
503		Ok(()) => {
504			metrics.on_vote_submitted();
505		},
506		Err(e) => {
507			gum::warn!(
508				target: LOG_TARGET,
509				?relay_parent,
510				?validation_code_hash,
511				"error occurred during submitting a vote: {:?}",
512				e,
513			);
514		},
515	}
516}
517
518/// Sends a request to the candidate-validation subsystem to validate the given PVF.
519///
520/// The relay-parent is used as an anchor from where to fetch the PVF code. The request will be put
521/// into the `currently_checking` set.
522async fn initiate_precheck(
523	state: &mut State,
524	sender: &mut impl overseer::PvfCheckerSenderTrait,
525	relay_parent: Hash,
526	validation_code_hash: ValidationCodeHash,
527	metrics: &Metrics,
528) {
529	gum::debug!(target: LOG_TARGET, ?validation_code_hash, ?relay_parent, "initiating a precheck",);
530
531	let (tx, rx) = oneshot::channel();
532	sender
533		.send_message(CandidateValidationMessage::PreCheck {
534			relay_parent,
535			validation_code_hash,
536			response_sender: tx,
537		})
538		.await;
539
540	let timer = metrics.time_pre_check_judgement();
541	state.currently_checking.push(Box::pin(async move {
542		let _timer = timer;
543		match rx.await {
544			Ok(accept) => Some((accept, validation_code_hash)),
545			Err(oneshot::Canceled) => {
546				// Pre-checking request dropped before replying. That can happen in case the
547				// overseer is shutting down. Our part of shutdown will be handled by the
548				// overseer conclude signal. Log it here just in case.
549				gum::debug!(
550					target: LOG_TARGET,
551					?validation_code_hash,
552					?relay_parent,
553					"precheck request was canceled",
554				);
555				None
556			},
557		}
558	}));
559}