1use crate::{
18 cli::AuthoringPolicy,
19 common::{
20 aura::{AuraIdT, AuraRuntimeApi},
21 rpc::BuildParachainRpcExtensions,
22 spec::{
23 BaseNodeSpec, BuildImportQueue, ClientBlockImport, InitBlockImport, NodeSpec,
24 StartConsensus,
25 },
26 types::{
27 AccountId, Balance, Hash, Nonce, ParachainBackend, ParachainBlockImport,
28 ParachainClient,
29 },
30 ConstructNodeRuntimeApi, NodeBlock, NodeExtraArgs,
31 },
32 nodes::DynNodeSpecExt,
33};
34use cumulus_client_collator::service::{
35 CollatorService, ServiceInterface as CollatorServiceInterface,
36};
37#[docify::export(slot_based_colator_import)]
38use cumulus_client_consensus_aura::collators::slot_based::{
39 self as slot_based, Params as SlotBasedParams,
40};
41use cumulus_client_consensus_aura::{
42 collators::{
43 lookahead::{self as aura, Params as AuraParams},
44 slot_based::{SlotBasedBlockImport, SlotBasedBlockImportHandle},
45 },
46 equivocation_import_queue::Verifier as EquivocationVerifier,
47};
48use cumulus_client_consensus_proposer::ProposerInterface;
49use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier;
50#[allow(deprecated)]
51use cumulus_client_service::CollatorSybilResistance;
52use cumulus_primitives_core::{relay_chain::ValidationCode, GetParachainInfo, ParaId};
53use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
54use futures::prelude::*;
55use polkadot_primitives::CollatorPair;
56use prometheus_endpoint::Registry;
57use sc_client_api::BlockchainEvents;
58use sc_client_db::DbHash;
59use sc_consensus::{
60 import_queue::{BasicQueue, Verifier as VerifierT},
61 BlockImportParams, DefaultImportQueue,
62};
63use sc_service::{Configuration, Error, TaskManager};
64use sc_telemetry::TelemetryHandle;
65use sc_transaction_pool::TransactionPoolHandle;
66use sp_api::ProvideRuntimeApi;
67use sp_core::traits::SpawnNamed;
68use sp_inherents::CreateInherentDataProviders;
69use sp_keystore::KeystorePtr;
70use sp_runtime::{
71 app_crypto::AppCrypto,
72 traits::{Block as BlockT, Header as HeaderT},
73};
74use std::{marker::PhantomData, sync::Arc, time::Duration};
75
76struct Verifier<Block, Client, AuraId> {
77 client: Arc<Client>,
78 aura_verifier: Box<dyn VerifierT<Block>>,
79 relay_chain_verifier: Box<dyn VerifierT<Block>>,
80 _phantom: PhantomData<AuraId>,
81}
82
83#[async_trait::async_trait]
84impl<Block: BlockT, Client, AuraId> VerifierT<Block> for Verifier<Block, Client, AuraId>
85where
86 Client: ProvideRuntimeApi<Block> + Send + Sync,
87 Client::Api: AuraRuntimeApi<Block, AuraId>,
88 AuraId: AuraIdT + Sync,
89{
90 async fn verify(
91 &self,
92 block_import: BlockImportParams<Block>,
93 ) -> Result<BlockImportParams<Block>, String> {
94 if self.client.runtime_api().has_aura_api(*block_import.header.parent_hash()) {
95 self.aura_verifier.verify(block_import).await
96 } else {
97 self.relay_chain_verifier.verify(block_import).await
98 }
99 }
100}
101
102pub(crate) struct BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId, BlockImport>(
105 PhantomData<(Block, RuntimeApi, AuraId, BlockImport)>,
106);
107
108impl<Block: BlockT, RuntimeApi, AuraId, BlockImport>
109 BuildImportQueue<Block, RuntimeApi, BlockImport>
110 for BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId, BlockImport>
111where
112 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
113 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>,
114 AuraId: AuraIdT + Sync,
115 BlockImport:
116 sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + Send + Sync + 'static,
117{
118 fn build_import_queue(
119 client: Arc<ParachainClient<Block, RuntimeApi>>,
120 block_import: ParachainBlockImport<Block, BlockImport>,
121 config: &Configuration,
122 telemetry_handle: Option<TelemetryHandle>,
123 task_manager: &TaskManager,
124 ) -> sc_service::error::Result<DefaultImportQueue<Block>> {
125 let inherent_data_providers =
126 move |_, _| async move { Ok(sp_timestamp::InherentDataProvider::from_system_time()) };
127 let registry = config.prometheus_registry();
128 let spawner = task_manager.spawn_essential_handle();
129
130 let relay_chain_verifier =
131 Box::new(RelayChainVerifier::new(client.clone(), inherent_data_providers));
132
133 let equivocation_aura_verifier =
134 EquivocationVerifier::<<AuraId as AppCrypto>::Pair, _, _, _>::new(
135 client.clone(),
136 inherent_data_providers,
137 telemetry_handle,
138 );
139
140 let verifier = Verifier {
141 client,
142 aura_verifier: Box::new(equivocation_aura_verifier),
143 relay_chain_verifier,
144 _phantom: Default::default(),
145 };
146
147 Ok(BasicQueue::new(verifier, Box::new(block_import), None, &spawner, registry))
148 }
149}
150
151pub(crate) struct AuraNode<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport>(
155 pub PhantomData<(Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport)>,
156);
157
158impl<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> Default
159 for AuraNode<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport>
160{
161 fn default() -> Self {
162 Self(Default::default())
163 }
164}
165
166impl<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> BaseNodeSpec
167 for AuraNode<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport>
168where
169 Block: NodeBlock,
170 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
171 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>
172 + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>
173 + substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
174 AuraId: AuraIdT + Sync,
175 InitBlockImport: self::InitBlockImport<Block, RuntimeApi> + Send,
176 InitBlockImport::BlockImport:
177 sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + 'static,
178{
179 type Block = Block;
180 type RuntimeApi = RuntimeApi;
181 type BuildImportQueue =
182 BuildRelayToAuraImportQueue<Block, RuntimeApi, AuraId, InitBlockImport::BlockImport>;
183 type InitBlockImport = InitBlockImport;
184}
185
186impl<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport> NodeSpec
187 for AuraNode<Block, RuntimeApi, AuraId, StartConsensus, InitBlockImport>
188where
189 Block: NodeBlock,
190 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
191 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>
192 + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>
193 + substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
194 AuraId: AuraIdT + Sync,
195 StartConsensus: self::StartConsensus<
196 Block,
197 RuntimeApi,
198 InitBlockImport::BlockImport,
199 InitBlockImport::BlockImportAuxiliaryData,
200 > + 'static,
201 InitBlockImport: self::InitBlockImport<Block, RuntimeApi> + Send,
202 InitBlockImport::BlockImport:
203 sc_consensus::BlockImport<Block, Error = sp_consensus::Error> + 'static,
204{
205 type BuildRpcExtensions = BuildParachainRpcExtensions<Block, RuntimeApi>;
206 type StartConsensus = StartConsensus;
207 const SYBIL_RESISTANCE: CollatorSybilResistance = CollatorSybilResistance::Resistant;
208}
209
210pub fn new_aura_node_spec<Block, RuntimeApi, AuraId>(
211 extra_args: &NodeExtraArgs,
212) -> Box<dyn DynNodeSpecExt>
213where
214 Block: NodeBlock,
215 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
216 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>
217 + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>
218 + substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>
219 + GetParachainInfo<Block>,
220 AuraId: AuraIdT + Sync,
221{
222 if extra_args.authoring_policy == AuthoringPolicy::SlotBased {
223 Box::new(AuraNode::<
224 Block,
225 RuntimeApi,
226 AuraId,
227 StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId>,
228 StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId>,
229 >::default())
230 } else {
231 Box::new(AuraNode::<
232 Block,
233 RuntimeApi,
234 AuraId,
235 StartLookaheadAuraConsensus<Block, RuntimeApi, AuraId>,
236 ClientBlockImport,
237 >::default())
238 }
239}
240
241pub(crate) struct StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId>(
243 PhantomData<(Block, RuntimeApi, AuraId)>,
244);
245
246impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId>
247 StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId>
248where
249 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
250 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>,
251 AuraId: AuraIdT + Sync,
252{
253 #[docify::export_content]
254 fn launch_slot_based_collator<CIDP, CHP, Proposer, CS, Spawner>(
255 params_with_export: SlotBasedParams<
256 Block,
257 ParachainBlockImport<
258 Block,
259 SlotBasedBlockImport<
260 Block,
261 Arc<ParachainClient<Block, RuntimeApi>>,
262 ParachainClient<Block, RuntimeApi>,
263 >,
264 >,
265 CIDP,
266 ParachainClient<Block, RuntimeApi>,
267 ParachainBackend<Block>,
268 Arc<dyn RelayChainInterface>,
269 CHP,
270 Proposer,
271 CS,
272 Spawner,
273 >,
274 ) where
275 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
276 CIDP::InherentDataProviders: Send,
277 CHP: cumulus_client_consensus_common::ValidationCodeHashProvider<Hash> + Send + 'static,
278 Proposer: ProposerInterface<Block> + Send + Sync + 'static,
279 CS: CollatorServiceInterface<Block> + Send + Sync + Clone + 'static,
280 Spawner: SpawnNamed,
281 {
282 slot_based::run::<Block, <AuraId as AppCrypto>::Pair, _, _, _, _, _, _, _, _, _>(
283 params_with_export,
284 );
285 }
286}
287
288impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId>
289 StartConsensus<
290 Block,
291 RuntimeApi,
292 SlotBasedBlockImport<
293 Block,
294 Arc<ParachainClient<Block, RuntimeApi>>,
295 ParachainClient<Block, RuntimeApi>,
296 >,
297 SlotBasedBlockImportHandle<Block>,
298 > for StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId>
299where
300 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
301 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>,
302 AuraId: AuraIdT + Sync,
303{
304 fn start_consensus(
305 client: Arc<ParachainClient<Block, RuntimeApi>>,
306 block_import: ParachainBlockImport<
307 Block,
308 SlotBasedBlockImport<
309 Block,
310 Arc<ParachainClient<Block, RuntimeApi>>,
311 ParachainClient<Block, RuntimeApi>,
312 >,
313 >,
314 prometheus_registry: Option<&Registry>,
315 telemetry: Option<TelemetryHandle>,
316 task_manager: &TaskManager,
317 relay_chain_interface: Arc<dyn RelayChainInterface>,
318 transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
319 keystore: KeystorePtr,
320 relay_chain_slot_duration: Duration,
321 para_id: ParaId,
322 collator_key: CollatorPair,
323 _overseer_handle: OverseerHandle,
324 announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
325 backend: Arc<ParachainBackend<Block>>,
326 node_extra_args: NodeExtraArgs,
327 block_import_handle: SlotBasedBlockImportHandle<Block>,
328 ) -> Result<(), Error> {
329 let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
330 task_manager.spawn_handle(),
331 client.clone(),
332 transaction_pool,
333 prometheus_registry,
334 telemetry.clone(),
335 );
336
337 let collator_service = CollatorService::new(
338 client.clone(),
339 Arc::new(task_manager.spawn_handle()),
340 announce_block,
341 client.clone(),
342 );
343
344 let client_for_aura = client.clone();
345 let params = SlotBasedParams {
346 create_inherent_data_providers: move |_, ()| async move { Ok(()) },
347 block_import,
348 para_client: client.clone(),
349 para_backend: backend.clone(),
350 relay_client: relay_chain_interface,
351 relay_chain_slot_duration,
352 code_hash_provider: move |block_hash| {
353 client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
354 },
355 keystore,
356 collator_key,
357 para_id,
358 proposer,
359 collator_service,
360 authoring_duration: Duration::from_millis(2000),
361 reinitialize: false,
362 slot_offset: Duration::from_secs(1),
363 block_import_handle,
364 spawner: task_manager.spawn_handle(),
365 export_pov: node_extra_args.export_pov,
366 max_pov_percentage: node_extra_args.max_pov_percentage,
367 };
368
369 Self::launch_slot_based_collator(params);
373
374 Ok(())
375 }
376}
377
378impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId> InitBlockImport<Block, RuntimeApi>
379 for StartSlotBasedAuraConsensus<Block, RuntimeApi, AuraId>
380where
381 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
382 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>,
383 AuraId: AuraIdT + Sync,
384{
385 type BlockImport = SlotBasedBlockImport<
386 Block,
387 Arc<ParachainClient<Block, RuntimeApi>>,
388 ParachainClient<Block, RuntimeApi>,
389 >;
390 type BlockImportAuxiliaryData = SlotBasedBlockImportHandle<Block>;
391
392 fn init_block_import(
393 client: Arc<ParachainClient<Block, RuntimeApi>>,
394 ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> {
395 Ok(SlotBasedBlockImport::new(client.clone(), client))
396 }
397}
398
399async fn wait_for_aura<Block: BlockT, RuntimeApi, AuraId>(
403 client: Arc<ParachainClient<Block, RuntimeApi>>,
404) where
405 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
406 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>,
407 AuraId: AuraIdT + Sync,
408{
409 let finalized_hash = client.chain_info().finalized_hash;
410 if client.runtime_api().has_aura_api(finalized_hash) {
411 return;
412 };
413
414 let mut stream = client.finality_notification_stream();
415 while let Some(notification) = stream.next().await {
416 if client.runtime_api().has_aura_api(notification.hash) {
417 return;
418 }
419 }
420}
421
422pub(crate) struct StartLookaheadAuraConsensus<Block, RuntimeApi, AuraId>(
424 PhantomData<(Block, RuntimeApi, AuraId)>,
425);
426
427impl<Block: BlockT<Hash = DbHash>, RuntimeApi, AuraId>
428 StartConsensus<Block, RuntimeApi, Arc<ParachainClient<Block, RuntimeApi>>, ()>
429 for StartLookaheadAuraConsensus<Block, RuntimeApi, AuraId>
430where
431 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
432 RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>,
433 AuraId: AuraIdT + Sync,
434{
435 fn start_consensus(
436 client: Arc<ParachainClient<Block, RuntimeApi>>,
437 block_import: ParachainBlockImport<Block, Arc<ParachainClient<Block, RuntimeApi>>>,
438 prometheus_registry: Option<&Registry>,
439 telemetry: Option<TelemetryHandle>,
440 task_manager: &TaskManager,
441 relay_chain_interface: Arc<dyn RelayChainInterface>,
442 transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
443 keystore: KeystorePtr,
444 relay_chain_slot_duration: Duration,
445 para_id: ParaId,
446 collator_key: CollatorPair,
447 overseer_handle: OverseerHandle,
448 announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
449 backend: Arc<ParachainBackend<Block>>,
450 node_extra_args: NodeExtraArgs,
451 _: (),
452 ) -> Result<(), Error> {
453 let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
454 task_manager.spawn_handle(),
455 client.clone(),
456 transaction_pool,
457 prometheus_registry,
458 telemetry.clone(),
459 );
460
461 let collator_service = CollatorService::new(
462 client.clone(),
463 Arc::new(task_manager.spawn_handle()),
464 announce_block,
465 client.clone(),
466 );
467
468 let params = aura::ParamsWithExport {
469 export_pov: node_extra_args.export_pov,
470 params: AuraParams {
471 create_inherent_data_providers: move |_, ()| async move { Ok(()) },
472 block_import,
473 para_client: client.clone(),
474 para_backend: backend,
475 relay_client: relay_chain_interface,
476 code_hash_provider: {
477 let client = client.clone();
478 move |block_hash| {
479 client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
480 }
481 },
482 keystore,
483 collator_key,
484 para_id,
485 overseer_handle,
486 relay_chain_slot_duration,
487 proposer,
488 collator_service,
489 authoring_duration: Duration::from_millis(2000),
490 reinitialize: false,
491 max_pov_percentage: node_extra_args.max_pov_percentage,
492 },
493 };
494
495 let fut = async move {
496 wait_for_aura(client).await;
497 aura::run_with_export::<Block, <AuraId as AppCrypto>::Pair, _, _, _, _, _, _, _, _>(
498 params,
499 )
500 .await;
501 };
502 task_manager.spawn_essential_handle().spawn("aura", None, fut);
503
504 Ok(())
505 }
506}