polkadot_collator_protocol/
lib.rs1#![deny(missing_docs)]
21#![deny(unused_crate_dependencies)]
22#![recursion_limit = "256"]
23
24use std::{
25 collections::{HashMap, HashSet},
26 sync::Arc,
27 time::{Duration, Instant},
28};
29
30use futures::{
31 channel::oneshot,
32 stream::{FusedStream, StreamExt},
33 FutureExt, TryFutureExt,
34};
35
36use polkadot_node_subsystem::CollatorProtocolSenderTrait;
37use polkadot_node_subsystem_util::{database::Database, reputation::ReputationAggregator};
38use sp_consensus_babe::digests::CompatibleDigestItem;
39use sp_core::H256;
40use sp_keystore::KeystorePtr;
41
42use polkadot_node_network_protocol::{
43 request_response::{v2 as protocol_v2, IncomingRequestReceiver},
44 PeerId, UnifiedReputationChange as Rep,
45};
46use polkadot_node_subsystem::{
47 errors::SubsystemError, messages::ChainApiMessage, overseer, DummySubsystem, SpawnedSubsystem,
48};
49use polkadot_primitives::{CollatorPair, Hash, RELAY_CHAIN_SLOT_DURATION_MILLIS};
50use sp_consensus_slots::SlotDuration;
51pub use validator_side_experimental::ReputationConfig;
52
53mod collator_side;
54mod validator_side;
55mod validator_side_experimental;
56
57mod validator_side_metrics;
59
60const LOG_TARGET: &'static str = "parachain::collator-protocol";
61const LOG_TARGET_STATS: &'static str = "parachain::collator-protocol::stats";
62
63#[derive(Debug, Clone, Copy)]
65pub struct CollatorEvictionPolicy {
66 pub inactive_collator: Duration,
68 pub undeclared: Duration,
70}
71
72impl Default for CollatorEvictionPolicy {
73 fn default() -> Self {
74 CollatorEvictionPolicy {
75 inactive_collator: Duration::from_secs(24),
76 undeclared: Duration::from_secs(1),
77 }
78 }
79}
80
81pub enum ProtocolSide {
83 Validator {
85 keystore: KeystorePtr,
87 eviction_policy: CollatorEvictionPolicy,
89 metrics: validator_side::Metrics,
91 invulnerables: HashSet<PeerId>,
93 collator_protocol_hold_off: Option<Duration>,
95 },
96 ValidatorExperimental {
98 keystore: KeystorePtr,
100 metrics: validator_side_experimental::Metrics,
102 db: Arc<dyn Database>,
104 reputation_config: validator_side_experimental::ReputationConfig,
106 },
107 Collator {
109 peer_id: PeerId,
111 collator_pair: CollatorPair,
113 request_receiver_v2: IncomingRequestReceiver<protocol_v2::CollationFetchingRequest>,
115 metrics: collator_side::Metrics,
117 },
118 None,
120}
121
122pub struct CollatorProtocolSubsystem {
124 protocol_side: ProtocolSide,
125}
126
127#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
128impl CollatorProtocolSubsystem {
129 pub fn new(protocol_side: ProtocolSide) -> Self {
131 Self { protocol_side }
132 }
133}
134
135#[overseer::subsystem(CollatorProtocol, error=SubsystemError, prefix=self::overseer)]
136impl<Context> CollatorProtocolSubsystem {
137 fn start(self, ctx: Context) -> SpawnedSubsystem {
138 let future = match self.protocol_side {
139 ProtocolSide::Validator {
140 keystore,
141 eviction_policy,
142 metrics,
143 invulnerables,
144 collator_protocol_hold_off,
145 } => {
146 gum::trace!(
147 target: LOG_TARGET,
148 ?invulnerables,
149 ?collator_protocol_hold_off,
150 "AH collator protocol params",
151 );
152 validator_side::run(
153 ctx,
154 keystore,
155 eviction_policy,
156 metrics,
157 invulnerables,
158 collator_protocol_hold_off,
159 )
160 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
161 .boxed()
162 },
163 ProtocolSide::ValidatorExperimental { keystore, metrics, db, reputation_config } => {
164 validator_side_experimental::run(ctx, keystore, metrics, db, reputation_config)
165 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
166 .boxed()
167 },
168 ProtocolSide::Collator { peer_id, collator_pair, request_receiver_v2, metrics } => {
169 collator_side::run(ctx, peer_id, collator_pair, request_receiver_v2, metrics)
170 .map_err(|e| SubsystemError::with_origin("collator-protocol", e))
171 .boxed()
172 },
173 ProtocolSide::None => return DummySubsystem.start(ctx),
174 };
175
176 SpawnedSubsystem { name: "collator-protocol-subsystem", future }
177 }
178}
179
180async fn modify_reputation(
182 reputation: &mut ReputationAggregator,
183 sender: &mut impl overseer::CollatorProtocolSenderTrait,
184 peer: PeerId,
185 rep: Rep,
186) {
187 gum::trace!(
188 target: LOG_TARGET,
189 rep = ?rep,
190 peer_id = %peer,
191 "reputation change for peer",
192 );
193
194 reputation.modify(sender, peer, rep).await;
195}
196
197async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant {
199 let now = Instant::now();
200 let next_poll = last_poll + period;
201
202 if next_poll > now {
203 futures_timer::Delay::new(next_poll - now).await
204 }
205
206 Instant::now()
207}
208
209fn tick_stream(period: Duration) -> impl FusedStream<Item = ()> {
211 futures::stream::unfold(Instant::now(), move |next_check| async move {
212 Some(((), wait_until_next_tick(next_check, period).await))
213 })
214 .fuse()
215}
216
217struct LeafSchedulingInfo {
221 parent_hash: Hash,
223 slot: sp_consensus_slots::Slot,
225}
226
227pub(crate) async fn extract_leaf_scheduling_info<Sender: CollatorProtocolSenderTrait>(
228 sender: &mut Sender,
229 leaf: H256,
230) -> Option<LeafSchedulingInfo> {
231 let (tx, rx) = oneshot::channel();
234 sender.send_message(ChainApiMessage::BlockHeader(leaf, tx)).await;
235 let header = rx.await.ok().and_then(|r| r.ok().flatten());
236 header.and_then(|header| {
237 let slot = header.digest.logs().iter().find_map(|log| log.as_babe_pre_digest())?.slot();
238 Some(LeafSchedulingInfo { parent_hash: header.parent_hash, slot })
239 })
240}
241
242pub(crate) fn is_scheduling_parent_valid(
243 scheduling_parent: &Hash,
244 leaf_scheduling_info: &HashMap<Hash, LeafSchedulingInfo>,
245) -> bool {
246 let slot_duration = SlotDuration::from_millis(RELAY_CHAIN_SLOT_DURATION_MILLIS);
247 let current_slot =
248 sp_consensus_slots::Slot::from_timestamp(sp_timestamp::Timestamp::current(), slot_duration);
249 if let Some(info) = leaf_scheduling_info.get(scheduling_parent) {
250 *current_slot == *info.slot + 1
253 } else {
254 leaf_scheduling_info
257 .iter()
258 .any(|(_, info)| *current_slot == *info.slot && *scheduling_parent == info.parent_hash)
259 }
260}