1use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered};
23
24use polkadot_node_subsystem::{
25 messages::{CandidateValidationMessage, PreCheckOutcome, PvfCheckerMessage, RuntimeApiMessage},
26 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
27 SubsystemResult, SubsystemSender,
28};
29use polkadot_primitives::{
30 BlockNumber, Hash, PvfCheckStatement, SessionIndex, ValidationCodeHash, ValidatorId,
31 ValidatorIndex,
32};
33use sp_keystore::KeystorePtr;
34use std::collections::HashSet;
35
36const LOG_TARGET: &str = "parachain::pvf-checker";
37
38mod interest_view;
39mod metrics;
40mod runtime_api;
41
42#[cfg(test)]
43mod tests;
44
45use self::{
46 interest_view::{InterestView, Judgement},
47 metrics::Metrics,
48};
49
50pub struct PvfCheckerSubsystem {
52 keystore: KeystorePtr,
53 metrics: Metrics,
54}
55
56impl PvfCheckerSubsystem {
57 pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
58 PvfCheckerSubsystem { keystore, metrics }
59 }
60}
61
62#[overseer::subsystem(PvfChecker, error=SubsystemError, prefix = self::overseer)]
63impl<Context> PvfCheckerSubsystem {
64 fn start(self, ctx: Context) -> SpawnedSubsystem {
65 let future = run(ctx, self.keystore, self.metrics)
66 .map_err(|e| SubsystemError::with_origin("pvf-checker", e))
67 .boxed();
68
69 SpawnedSubsystem { name: "pvf-checker-subsystem", future }
70 }
71}
72
73struct SigningCredentials {
76 validator_key: ValidatorId,
78 validator_index: ValidatorIndex,
80}
81
82struct State {
83 credentials: Option<SigningCredentials>,
87
88 recent_block: Option<(BlockNumber, Hash)>,
92
93 latest_session: Option<SessionIndex>,
97
98 voted: HashSet<ValidationCodeHash>,
100
101 view: InterestView,
103
104 currently_checking:
115 FuturesUnordered<BoxFuture<'static, Option<(PreCheckOutcome, ValidationCodeHash)>>>,
116}
117
118#[overseer::contextbounds(PvfChecker, prefix = self::overseer)]
119async fn run<Context>(
120 mut ctx: Context,
121 keystore: KeystorePtr,
122 metrics: Metrics,
123) -> SubsystemResult<()> {
124 let mut state = State {
125 credentials: None,
126 recent_block: None,
127 latest_session: None,
128 voted: HashSet::with_capacity(16),
129 view: InterestView::new(),
130 currently_checking: FuturesUnordered::new(),
131 };
132
133 loop {
134 let mut sender = ctx.sender().clone();
135 futures::select! {
136 precheck_response = state.currently_checking.select_next_some() => {
137 if let Some((outcome, validation_code_hash)) = precheck_response {
138 handle_pvf_check(
139 &mut state,
140 &mut sender,
141 &keystore,
142 &metrics,
143 outcome,
144 validation_code_hash,
145 ).await;
146 } else {
147 }
150 }
151 from_overseer = ctx.recv().fuse() => {
152 let outcome = handle_from_overseer(
153 &mut state,
154 &mut sender,
155 &keystore,
156 &metrics,
157 from_overseer?,
158 )
159 .await;
160 if let Some(Conclude) = outcome {
161 return Ok(());
162 }
163 }
164 }
165 }
166}
167
168async fn handle_pvf_check(
170 state: &mut State,
171 sender: &mut impl overseer::PvfCheckerSenderTrait,
172 keystore: &KeystorePtr,
173 metrics: &Metrics,
174 outcome: PreCheckOutcome,
175 validation_code_hash: ValidationCodeHash,
176) {
177 gum::debug!(
178 target: LOG_TARGET,
179 ?validation_code_hash,
180 "Received pre-check result: {:?}",
181 outcome,
182 );
183
184 let judgement = match outcome {
185 PreCheckOutcome::Valid => Judgement::Valid,
186 PreCheckOutcome::Invalid => Judgement::Invalid,
187 PreCheckOutcome::Failed => {
188 gum::info!(
195 target: LOG_TARGET,
196 ?validation_code_hash,
197 "Pre-check failed, voting against",
198 );
199 Judgement::Invalid
200 },
201 };
202
203 match state.view.on_judgement(validation_code_hash, judgement) {
204 Ok(()) => (),
205 Err(()) => {
206 gum::debug!(
207 target: LOG_TARGET,
208 ?validation_code_hash,
209 "received judgement for an unknown (or removed) PVF hash",
210 );
211 return
212 },
213 }
214
215 match (state.credentials.as_ref(), state.recent_block, state.latest_session) {
216 (Some(credentials), Some(recent_block), Some(session_index)) => {
219 sign_and_submit_pvf_check_statement(
220 sender,
221 keystore,
222 &mut state.voted,
223 credentials,
224 metrics,
225 recent_block.1,
226 session_index,
227 judgement,
228 validation_code_hash,
229 )
230 .await;
231 },
232 _ => (),
233 }
234}
235
236struct Conclude;
238
239async fn handle_from_overseer(
240 state: &mut State,
241 sender: &mut impl overseer::PvfCheckerSenderTrait,
242 keystore: &KeystorePtr,
243 metrics: &Metrics,
244 from_overseer: FromOrchestra<PvfCheckerMessage>,
245) -> Option<Conclude> {
246 match from_overseer {
247 FromOrchestra::Signal(OverseerSignal::Conclude) => {
248 gum::info!(target: LOG_TARGET, "Received `Conclude` signal, exiting");
249 Some(Conclude)
250 },
251 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, _)) => {
252 None
254 },
255 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
256 handle_leaves_update(state, sender, keystore, metrics, update).await;
257 None
258 },
259 FromOrchestra::Communication { msg } => match msg {
260 },
262 }
263}
264
265async fn handle_leaves_update(
266 state: &mut State,
267 sender: &mut impl overseer::PvfCheckerSenderTrait,
268 keystore: &KeystorePtr,
269 metrics: &Metrics,
270 update: ActiveLeavesUpdate,
271) {
272 if let Some(activated) = update.activated {
273 let ActivationEffect { new_session_index, recent_block, pending_pvfs } =
274 match examine_activation(state, sender, keystore, activated.hash, activated.number)
275 .await
276 {
277 None => {
278 return
280 },
281 Some(e) => e,
282 };
283
284 let recent_block_hash = recent_block.1;
286 state.recent_block = Some(recent_block);
287
288 let outcome = state
290 .view
291 .on_leaves_update(Some((activated.hash, pending_pvfs)), &update.deactivated);
292 metrics.on_pvf_observed(outcome.newcomers.len());
293 metrics.on_pvf_left(outcome.left_num);
294 for newcomer in outcome.newcomers {
295 initiate_precheck(state, sender, activated.hash, newcomer, metrics).await;
296 }
297
298 if let Some((new_session_index, credentials)) = new_session_index {
299 state.latest_session = Some(new_session_index);
304 state.voted.clear();
305 state.credentials = credentials;
306
307 if let Some(ref credentials) = state.credentials {
310 for (code_hash, judgement) in state.view.judgements() {
311 sign_and_submit_pvf_check_statement(
312 sender,
313 keystore,
314 &mut state.voted,
315 credentials,
316 metrics,
317 recent_block_hash,
318 new_session_index,
319 judgement,
320 code_hash,
321 )
322 .await;
323 }
324 }
325 }
326 } else {
327 state.view.on_leaves_update(None, &update.deactivated);
328 }
329}
330
331struct ActivationEffect {
332 new_session_index: Option<(SessionIndex, Option<SigningCredentials>)>,
335 recent_block: (BlockNumber, Hash),
339 pending_pvfs: Vec<ValidationCodeHash>,
342}
343
344async fn examine_activation(
348 state: &mut State,
349 sender: &mut impl overseer::PvfCheckerSenderTrait,
350 keystore: &KeystorePtr,
351 leaf_hash: Hash,
352 leaf_number: BlockNumber,
353) -> Option<ActivationEffect> {
354 gum::debug!(
355 target: LOG_TARGET,
356 "Examining activation of leaf {:?} ({})",
357 leaf_hash,
358 leaf_number,
359 );
360
361 let pending_pvfs = match runtime_api::pvfs_require_precheck(sender, leaf_hash).await {
362 Err(runtime_api::RuntimeRequestError::NotSupported) => return None,
363 Err(_) => {
364 gum::debug!(
365 target: LOG_TARGET,
366 relay_parent = ?leaf_hash,
367 "cannot fetch PVFs that require pre-checking from runtime API",
368 );
369 Vec::new()
370 },
371 Ok(v) => v,
372 };
373
374 let recent_block = match state.recent_block {
375 Some((recent_block_num, recent_block_hash)) if leaf_number < recent_block_num => {
376 (recent_block_num, recent_block_hash)
378 },
379 _ => (leaf_number, leaf_hash),
380 };
381
382 let new_session_index = match runtime_api::session_index_for_child(sender, leaf_hash).await {
383 Ok(session_index) =>
384 if state.latest_session.map_or(true, |l| l < session_index) {
385 let signing_credentials =
386 check_signing_credentials(sender, keystore, leaf_hash).await;
387 Some((session_index, signing_credentials))
388 } else {
389 None
390 },
391 Err(e) => {
392 gum::warn!(
393 target: LOG_TARGET,
394 relay_parent = ?leaf_hash,
395 "cannot fetch session index from runtime API: {:?}",
396 e,
397 );
398 None
399 },
400 };
401
402 Some(ActivationEffect { new_session_index, recent_block, pending_pvfs })
403}
404
405async fn check_signing_credentials(
408 sender: &mut impl SubsystemSender<RuntimeApiMessage>,
409 keystore: &KeystorePtr,
410 leaf: Hash,
411) -> Option<SigningCredentials> {
412 let validators = match runtime_api::validators(sender, leaf).await {
413 Ok(v) => v,
414 Err(e) => {
415 gum::warn!(
416 target: LOG_TARGET,
417 relay_parent = ?leaf,
418 "error occurred during requesting validators: {:?}",
419 e
420 );
421 return None
422 },
423 };
424
425 polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore).map(
426 |(validator_key, validator_index)| SigningCredentials { validator_key, validator_index },
427 )
428}
429
430async fn sign_and_submit_pvf_check_statement(
434 sender: &mut impl overseer::PvfCheckerSenderTrait,
435 keystore: &KeystorePtr,
436 voted: &mut HashSet<ValidationCodeHash>,
437 credentials: &SigningCredentials,
438 metrics: &Metrics,
439 relay_parent: Hash,
440 session_index: SessionIndex,
441 judgement: Judgement,
442 validation_code_hash: ValidationCodeHash,
443) {
444 gum::debug!(
445 target: LOG_TARGET,
446 ?validation_code_hash,
447 ?relay_parent,
448 "submitting a PVF check statement for validation code = {:?}",
449 judgement,
450 );
451
452 metrics.on_vote_submission_started();
453
454 if voted.contains(&validation_code_hash) {
455 gum::trace!(
456 target: LOG_TARGET,
457 relay_parent = ?relay_parent,
458 ?validation_code_hash,
459 "already voted for this validation code",
460 );
461 metrics.on_vote_duplicate();
462 return
463 }
464
465 voted.insert(validation_code_hash);
466
467 let stmt = PvfCheckStatement {
468 accept: judgement.is_valid(),
469 session_index,
470 subject: validation_code_hash,
471 validator_index: credentials.validator_index,
472 };
473 let signature = match polkadot_node_subsystem_util::sign(
474 keystore,
475 &credentials.validator_key,
476 &stmt.signing_payload(),
477 ) {
478 Ok(Some(signature)) => signature,
479 Ok(None) => {
480 gum::warn!(
481 target: LOG_TARGET,
482 ?relay_parent,
483 validator_index = ?credentials.validator_index,
484 ?validation_code_hash,
485 "private key for signing is not available",
486 );
487 return
488 },
489 Err(e) => {
490 gum::warn!(
491 target: LOG_TARGET,
492 ?relay_parent,
493 validator_index = ?credentials.validator_index,
494 ?validation_code_hash,
495 "error signing the statement: {:?}",
496 e,
497 );
498 return
499 },
500 };
501
502 match runtime_api::submit_pvf_check_statement(sender, relay_parent, stmt, signature).await {
503 Ok(()) => {
504 metrics.on_vote_submitted();
505 },
506 Err(e) => {
507 gum::warn!(
508 target: LOG_TARGET,
509 ?relay_parent,
510 ?validation_code_hash,
511 "error occurred during submitting a vote: {:?}",
512 e,
513 );
514 },
515 }
516}
517
518async fn initiate_precheck(
523 state: &mut State,
524 sender: &mut impl overseer::PvfCheckerSenderTrait,
525 relay_parent: Hash,
526 validation_code_hash: ValidationCodeHash,
527 metrics: &Metrics,
528) {
529 gum::debug!(target: LOG_TARGET, ?validation_code_hash, ?relay_parent, "initiating a precheck",);
530
531 let (tx, rx) = oneshot::channel();
532 sender
533 .send_message(CandidateValidationMessage::PreCheck {
534 relay_parent,
535 validation_code_hash,
536 response_sender: tx,
537 })
538 .await;
539
540 let timer = metrics.time_pre_check_judgement();
541 state.currently_checking.push(Box::pin(async move {
542 let _timer = timer;
543 match rx.await {
544 Ok(accept) => Some((accept, validation_code_hash)),
545 Err(oneshot::Canceled) => {
546 gum::debug!(
550 target: LOG_TARGET,
551 ?validation_code_hash,
552 ?relay_parent,
553 "precheck request was canceled",
554 );
555 None
556 },
557 }
558 }));
559}