referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/validator_side_experimental/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17#![allow(unused)]
18
19// See reasoning in Cargo.toml why this temporary useless import is needed.
20use tokio as _;
21
22mod common;
23mod error;
24mod metrics;
25mod peer_manager;
26mod state;
27
28use std::collections::VecDeque;
29
30use common::MAX_STORED_SCORES_PER_PARA;
31use error::{log_error, FatalError, FatalResult, Result};
32use fatality::Split;
33use peer_manager::{Db, PeerManager};
34use polkadot_node_subsystem::{
35	overseer, ActivatedLeaf, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal,
36};
37use polkadot_node_subsystem_util::{
38	find_validator_group, request_claim_queue, request_validator_groups, request_validators,
39	runtime::recv_runtime, signing_key_and_index,
40};
41use polkadot_primitives::{Hash, Id as ParaId};
42use sp_keystore::KeystorePtr;
43use state::State;
44
45pub use metrics::Metrics;
46
47use crate::LOG_TARGET;
48
49/// The main run loop.
50#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
51pub(crate) async fn run<Context>(
52	mut ctx: Context,
53	keystore: KeystorePtr,
54	metrics: Metrics,
55) -> FatalResult<()> {
56	if let Some(_state) = initialize(&mut ctx, keystore, metrics).await? {
57		// run_inner(state);
58	}
59
60	Ok(())
61}
62
63#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
64async fn initialize<Context>(
65	ctx: &mut Context,
66	keystore: KeystorePtr,
67	metrics: Metrics,
68) -> FatalResult<Option<State<Db>>> {
69	loop {
70		let first_leaf = match wait_for_first_leaf(ctx).await? {
71			Some(activated_leaf) => activated_leaf,
72			None => return Ok(None),
73		};
74
75		let scheduled_paras = match scheduled_paras(ctx.sender(), first_leaf.hash, &keystore).await
76		{
77			Ok(paras) => paras,
78			Err(err) => {
79				log_error(Err(err))?;
80				continue
81			},
82		};
83
84		let backend = Db::new(MAX_STORED_SCORES_PER_PARA).await;
85
86		match PeerManager::startup(backend, ctx.sender(), scheduled_paras.into_iter().collect())
87			.await
88		{
89			Ok(peer_manager) => return Ok(Some(State::new(peer_manager, keystore, metrics))),
90			Err(err) => {
91				log_error(Err(err))?;
92				continue
93			},
94		}
95	}
96}
97
98/// Wait for `ActiveLeavesUpdate`, returns `None` if `Conclude` signal came first.
99#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
100async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> FatalResult<Option<ActivatedLeaf>> {
101	loop {
102		match ctx.recv().await.map_err(FatalError::SubsystemReceive)? {
103			FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(None),
104			FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
105				if let Some(activated) = update.activated {
106					return Ok(Some(activated))
107				}
108			},
109			FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
110			FromOrchestra::Communication { msg } => {
111				// TODO: we should actually disconnect peers connected on collation protocol while
112				// we're still bootstrapping. OR buffer these messages until we've bootstrapped.
113				gum::warn!(
114					target: LOG_TARGET,
115					?msg,
116					"Received msg before first active leaves update. This is not expected - message will be dropped."
117				)
118			},
119		}
120	}
121}
122
123async fn scheduled_paras<Sender: CollatorProtocolSenderTrait>(
124	sender: &mut Sender,
125	hash: Hash,
126	keystore: &KeystorePtr,
127) -> Result<VecDeque<ParaId>> {
128	let validators = recv_runtime(request_validators(hash, sender).await).await?;
129
130	let (groups, rotation_info) =
131		recv_runtime(request_validator_groups(hash, sender).await).await?;
132
133	let core_now = if let Some(group) = signing_key_and_index(&validators, keystore)
134		.and_then(|(_, index)| find_validator_group(&groups, index))
135	{
136		rotation_info.core_for_group(group, groups.len())
137	} else {
138		gum::trace!(target: LOG_TARGET, ?hash, "Not a validator");
139		return Ok(VecDeque::new())
140	};
141
142	let mut claim_queue = recv_runtime(request_claim_queue(hash, sender).await).await?;
143	Ok(claim_queue.remove(&core_now).unwrap_or_else(|| VecDeque::new()))
144}