referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Collator Protocol allows collators and validators talk to each other.
18//! This subsystem implements both sides of the collator protocol.
19
20#![deny(missing_docs)]
21#![deny(unused_crate_dependencies)]
22#![recursion_limit = "256"]
23
24use std::{
25	collections::{HashMap, HashSet},
26	sync::Arc,
27	time::{Duration, Instant},
28};
29
30use futures::{
31	channel::oneshot,
32	stream::{FusedStream, StreamExt},
33	FutureExt, TryFutureExt,
34};
35
36use polkadot_node_subsystem::CollatorProtocolSenderTrait;
37use polkadot_node_subsystem_util::{database::Database, reputation::ReputationAggregator};
38use sp_consensus_babe::digests::CompatibleDigestItem;
39use sp_core::H256;
40use sp_keystore::KeystorePtr;
41
42use polkadot_node_network_protocol::{
43	request_response::{v2 as protocol_v2, IncomingRequestReceiver},
44	PeerId, UnifiedReputationChange as Rep,
45};
46use polkadot_node_subsystem::{
47	errors::SubsystemError, messages::ChainApiMessage, overseer, DummySubsystem, SpawnedSubsystem,
48};
49use polkadot_primitives::{CollatorPair, Hash, RELAY_CHAIN_SLOT_DURATION_MILLIS};
50use sp_consensus_slots::SlotDuration;
51pub use validator_side_experimental::ReputationConfig;
52
53mod collator_side;
54mod validator_side;
55mod validator_side_experimental;
56
57// TODO: move into validator_side_experimental once `validator_side` is retired
58mod validator_side_metrics;
59
60const LOG_TARGET: &'static str = "parachain::collator-protocol";
61const LOG_TARGET_STATS: &'static str = "parachain::collator-protocol::stats";
62
63/// A collator eviction policy - how fast to evict collators which are inactive.
64#[derive(Debug, Clone, Copy)]
65pub struct CollatorEvictionPolicy {
66	/// How fast to evict collators who are inactive.
67	pub inactive_collator: Duration,
68	/// How fast to evict peers which don't declare their para.
69	pub undeclared: Duration,
70}
71
72impl Default for CollatorEvictionPolicy {
73	fn default() -> Self {
74		CollatorEvictionPolicy {
75			inactive_collator: Duration::from_secs(24),
76			undeclared: Duration::from_secs(1),
77		}
78	}
79}
80
81/// What side of the collator protocol is being engaged
82pub enum ProtocolSide {
83	/// Validators operate on the relay chain.
84	Validator {
85		/// The keystore holding validator keys.
86		keystore: KeystorePtr,
87		/// An eviction policy for inactive peers or validators.
88		eviction_policy: CollatorEvictionPolicy,
89		/// Prometheus metrics for validators.
90		metrics: validator_side::Metrics,
91		/// List of invulnerable collators which is handled with a priority.
92		invulnerables: HashSet<PeerId>,
93		/// Override for `HOLD_OFF_DURATION` constant .
94		collator_protocol_hold_off: Option<Duration>,
95	},
96	/// Experimental variant of the validator side. Do not use in production.
97	ValidatorExperimental {
98		/// The keystore holding validator keys.
99		keystore: KeystorePtr,
100		/// Prometheus metrics for validators.
101		metrics: validator_side_experimental::Metrics,
102		/// Database used for reputation house keeping.
103		db: Arc<dyn Database>,
104		/// Reputation configuration (column number).
105		reputation_config: validator_side_experimental::ReputationConfig,
106	},
107	/// Collators operate on a parachain.
108	Collator {
109		/// Local peer id.
110		peer_id: PeerId,
111		/// Parachain collator pair.
112		collator_pair: CollatorPair,
113		/// Receiver for v2 collation fetching requests.
114		request_receiver_v2: IncomingRequestReceiver<protocol_v2::CollationFetchingRequest>,
115		/// Metrics.
116		metrics: collator_side::Metrics,
117	},
118	/// No protocol side, just disable it.
119	None,
120}
121
122/// The collator protocol subsystem.
123pub struct CollatorProtocolSubsystem {
124	protocol_side: ProtocolSide,
125}
126
127#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
128impl CollatorProtocolSubsystem {
129	/// Start the collator protocol.
130	pub fn new(protocol_side: ProtocolSide) -> Self {
131		Self { protocol_side }
132	}
133}
134
135#[overseer::subsystem(CollatorProtocol, error=SubsystemError, prefix=self::overseer)]
136impl<Context> CollatorProtocolSubsystem {
137	fn start(self, ctx: Context) -> SpawnedSubsystem {
138		let future = match self.protocol_side {
139			ProtocolSide::Validator {
140				keystore,
141				eviction_policy,
142				metrics,
143				invulnerables,
144				collator_protocol_hold_off,
145			} => {
146				gum::trace!(
147					target: LOG_TARGET,
148					?invulnerables,
149					?collator_protocol_hold_off,
150					"AH collator protocol params",
151				);
152				validator_side::run(
153					ctx,
154					keystore,
155					eviction_policy,
156					metrics,
157					invulnerables,
158					collator_protocol_hold_off,
159				)
160				.map_err(|e| SubsystemError::with_origin("collator-protocol", e))
161				.boxed()
162			},
163			ProtocolSide::ValidatorExperimental { keystore, metrics, db, reputation_config } => {
164				validator_side_experimental::run(ctx, keystore, metrics, db, reputation_config)
165					.map_err(|e| SubsystemError::with_origin("collator-protocol", e))
166					.boxed()
167			},
168			ProtocolSide::Collator { peer_id, collator_pair, request_receiver_v2, metrics } => {
169				collator_side::run(ctx, peer_id, collator_pair, request_receiver_v2, metrics)
170					.map_err(|e| SubsystemError::with_origin("collator-protocol", e))
171					.boxed()
172			},
173			ProtocolSide::None => return DummySubsystem.start(ctx),
174		};
175
176		SpawnedSubsystem { name: "collator-protocol-subsystem", future }
177	}
178}
179
180/// Modify the reputation of a peer based on its behavior.
181async fn modify_reputation(
182	reputation: &mut ReputationAggregator,
183	sender: &mut impl overseer::CollatorProtocolSenderTrait,
184	peer: PeerId,
185	rep: Rep,
186) {
187	gum::trace!(
188		target: LOG_TARGET,
189		rep = ?rep,
190		peer_id = %peer,
191		"reputation change for peer",
192	);
193
194	reputation.modify(sender, peer, rep).await;
195}
196
197/// Wait until tick and return the timestamp for the following one.
198async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant {
199	let now = Instant::now();
200	let next_poll = last_poll + period;
201
202	if next_poll > now {
203		futures_timer::Delay::new(next_poll - now).await
204	}
205
206	Instant::now()
207}
208
209/// Returns an infinite stream that yields with an interval of `period`.
210fn tick_stream(period: Duration) -> impl FusedStream<Item = ()> {
211	futures::stream::unfold(Instant::now(), move |next_check| async move {
212		Some(((), wait_until_next_tick(next_check, period).await))
213	})
214	.fuse()
215}
216
217/// Scheduling info tracked per active leaf, used for V3 scheduling parent validation.
218/// Stores the leaf's BABE slot and parent hash so the validator can determine whether
219/// the scheduling parent corresponds to the last finished relay chain slot.
220struct LeafSchedulingInfo {
221	/// The parent hash of the leaf block.
222	parent_hash: Hash,
223	/// The BABE slot of the leaf block.
224	slot: sp_consensus_slots::Slot,
225}
226
227pub(crate) async fn extract_leaf_scheduling_info<Sender: CollatorProtocolSenderTrait>(
228	sender: &mut Sender,
229	leaf: H256,
230) -> Option<LeafSchedulingInfo> {
231	// Fetch leaf header to extract BABE slot for V3 scheduling parent validation.
232	// Without this info, V3 advertisements referencing this leaf will be rejected.
233	let (tx, rx) = oneshot::channel();
234	sender.send_message(ChainApiMessage::BlockHeader(leaf, tx)).await;
235	let header = rx.await.ok().and_then(|r| r.ok().flatten());
236	header.and_then(|header| {
237		let slot = header.digest.logs().iter().find_map(|log| log.as_babe_pre_digest())?.slot();
238		Some(LeafSchedulingInfo { parent_hash: header.parent_hash, slot })
239	})
240}
241
242pub(crate) fn is_scheduling_parent_valid(
243	scheduling_parent: &Hash,
244	leaf_scheduling_info: &HashMap<Hash, LeafSchedulingInfo>,
245) -> bool {
246	let slot_duration = SlotDuration::from_millis(RELAY_CHAIN_SLOT_DURATION_MILLIS);
247	let current_slot =
248		sp_consensus_slots::Slot::from_timestamp(sp_timestamp::Timestamp::current(), slot_duration);
249	if let Some(info) = leaf_scheduling_info.get(scheduling_parent) {
250		// scheduling_parent is a leaf. This is allowed only when the leaf's slot is
251		// the previous slot.
252		*current_slot == *info.slot + 1
253	} else {
254		// scheduling_parent is not a leaf. This is allowed only if the sp is the parent of
255		// any leaf whose slot is still in progress.
256		leaf_scheduling_info
257			.iter()
258			.any(|(_, info)| *current_slot == *info.slot && *scheduling_parent == info.parent_hash)
259	}
260}