polkadot_collator_protocol/validator_side_experimental/
mod.rs1#![allow(unused)]
18
19use tokio as _;
21
22mod common;
23mod error;
24mod metrics;
25mod peer_manager;
26mod state;
27
28use std::collections::VecDeque;
29
30use common::MAX_STORED_SCORES_PER_PARA;
31use error::{log_error, FatalError, FatalResult, Result};
32use fatality::Split;
33use peer_manager::{Db, PeerManager};
34use polkadot_node_subsystem::{
35 overseer, ActivatedLeaf, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal,
36};
37use polkadot_node_subsystem_util::{
38 find_validator_group, request_claim_queue, request_validator_groups, request_validators,
39 runtime::recv_runtime, signing_key_and_index,
40};
41use polkadot_primitives::{Hash, Id as ParaId};
42use sp_keystore::KeystorePtr;
43use state::State;
44
45pub use metrics::Metrics;
46
47use crate::LOG_TARGET;
48
49#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
51pub(crate) async fn run<Context>(
52 mut ctx: Context,
53 keystore: KeystorePtr,
54 metrics: Metrics,
55) -> FatalResult<()> {
56 if let Some(_state) = initialize(&mut ctx, keystore, metrics).await? {
57 }
59
60 Ok(())
61}
62
63#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
64async fn initialize<Context>(
65 ctx: &mut Context,
66 keystore: KeystorePtr,
67 metrics: Metrics,
68) -> FatalResult<Option<State<Db>>> {
69 loop {
70 let first_leaf = match wait_for_first_leaf(ctx).await? {
71 Some(activated_leaf) => activated_leaf,
72 None => return Ok(None),
73 };
74
75 let scheduled_paras = match scheduled_paras(ctx.sender(), first_leaf.hash, &keystore).await
76 {
77 Ok(paras) => paras,
78 Err(err) => {
79 log_error(Err(err))?;
80 continue
81 },
82 };
83
84 let backend = Db::new(MAX_STORED_SCORES_PER_PARA).await;
85
86 match PeerManager::startup(backend, ctx.sender(), scheduled_paras.into_iter().collect())
87 .await
88 {
89 Ok(peer_manager) => return Ok(Some(State::new(peer_manager, keystore, metrics))),
90 Err(err) => {
91 log_error(Err(err))?;
92 continue
93 },
94 }
95 }
96}
97
98#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
100async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> FatalResult<Option<ActivatedLeaf>> {
101 loop {
102 match ctx.recv().await.map_err(FatalError::SubsystemReceive)? {
103 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(None),
104 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
105 if let Some(activated) = update.activated {
106 return Ok(Some(activated))
107 }
108 },
109 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
110 FromOrchestra::Communication { msg } => {
111 gum::warn!(
114 target: LOG_TARGET,
115 ?msg,
116 "Received msg before first active leaves update. This is not expected - message will be dropped."
117 )
118 },
119 }
120 }
121}
122
123async fn scheduled_paras<Sender: CollatorProtocolSenderTrait>(
124 sender: &mut Sender,
125 hash: Hash,
126 keystore: &KeystorePtr,
127) -> Result<VecDeque<ParaId>> {
128 let validators = recv_runtime(request_validators(hash, sender).await).await?;
129
130 let (groups, rotation_info) =
131 recv_runtime(request_validator_groups(hash, sender).await).await?;
132
133 let core_now = if let Some(group) = signing_key_and_index(&validators, keystore)
134 .and_then(|(_, index)| find_validator_group(&groups, index))
135 {
136 rotation_info.core_for_group(group, groups.len())
137 } else {
138 gum::trace!(target: LOG_TARGET, ?hash, "Not a validator");
139 return Ok(VecDeque::new())
140 };
141
142 let mut claim_queue = recv_runtime(request_claim_queue(hash, sender).await).await?;
143 Ok(claim_queue.remove(&core_now).unwrap_or_else(|| VecDeque::new()))
144}