1use codec::Codec;
29use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
30use cumulus_client_consensus_common::{
31 self as consensus_common, ParachainBlockImportMarker, ParachainCandidate,
32};
33use cumulus_client_parachain_inherent::{ParachainInherentData, ParachainInherentDataProvider};
34use cumulus_primitives_core::{
35 relay_chain::Hash as PHash, DigestItem, ParachainBlockData, PersistedValidationData,
36};
37use cumulus_relay_chain_interface::RelayChainInterface;
38use sc_client_api::BackendTransaction;
39use sp_consensus::{Environment, ProposeArgs, Proposer};
40
41use polkadot_node_primitives::{Collation, MaybeCompressedPoV};
42use polkadot_primitives::{Header as PHeader, Id as ParaId};
43use sp_externalities::Extensions;
44use sp_trie::proof_size_extension::ProofSizeExt;
45
46use crate::collators::RelayParentData;
47use futures::prelude::*;
48use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction};
49use sc_consensus_aura::standalone as aura_internal;
50use sc_network_types::PeerId;
51use sp_api::{ProofRecorder, ProvideRuntimeApi, StorageProof};
52use sp_application_crypto::AppPublic;
53use sp_consensus::BlockOrigin;
54use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
55use sp_core::crypto::Pair;
56use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
57use sp_keystore::KeystorePtr;
58use sp_runtime::{
59 generic::Digest,
60 traits::{Block as BlockT, HashingFor, Header as HeaderT, Member},
61};
62use sp_state_machine::StorageChanges;
63use sp_timestamp::Timestamp;
64use std::{error::Error, time::Duration};
65
66pub struct Params<BI, CIDP, RClient, PF, CS> {
68 pub create_inherent_data_providers: CIDP,
70 pub block_import: BI,
72 pub relay_client: RClient,
74 pub keystore: KeystorePtr,
76 pub collator_peer_id: PeerId,
78 pub para_id: ParaId,
80 pub proposer: PF,
82 pub collator_service: CS,
85}
86
87pub struct BuildBlockAndImportParams<'a, Block: BlockT, P: Pair> {
89 pub parent_header: &'a Block::Header,
91 pub slot_claim: &'a SlotClaim<P::Public>,
93 pub additional_pre_digest: Vec<DigestItem>,
95 pub parachain_inherent_data: ParachainInherentData,
97 pub extra_inherent_data: InherentData,
99 pub proposal_duration: Duration,
101 pub max_pov_size: usize,
103 pub storage_proof_recorder: Option<ProofRecorder<Block>>,
108 pub extra_extensions: Extensions,
110}
111
112pub struct BuiltBlock<Block: BlockT> {
114 pub block: Block,
116 pub proof: StorageProof,
118 pub backend_transaction: BackendTransaction<HashingFor<Block>>,
122}
123
124impl<Block: BlockT> From<BuiltBlock<Block>> for ParachainCandidate<Block> {
125 fn from(built: BuiltBlock<Block>) -> Self {
126 Self { block: built.block, proof: built.proof }
127 }
128}
129
130pub struct Collator<Block, P, BI, CIDP, RClient, PF, CS> {
133 create_inherent_data_providers: CIDP,
134 block_import: BI,
135 relay_client: RClient,
136 keystore: KeystorePtr,
137 para_id: ParaId,
138 proposer: PF,
139 collator_service: CS,
140 _marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
141}
142
143impl<Block, P, BI, CIDP, RClient, PF, CS> Collator<Block, P, BI, CIDP, RClient, PF, CS>
144where
145 Block: BlockT,
146 RClient: RelayChainInterface,
147 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
148 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
149 PF: Environment<Block>,
150 CS: CollatorServiceInterface<Block>,
151 P: Pair,
152 P::Public: AppPublic + Member,
153 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
154{
155 pub fn new(params: Params<BI, CIDP, RClient, PF, CS>) -> Self {
157 Collator {
158 create_inherent_data_providers: params.create_inherent_data_providers,
159 block_import: params.block_import,
160 relay_client: params.relay_client,
161 keystore: params.keystore,
162 para_id: params.para_id,
163 proposer: params.proposer,
164 collator_service: params.collator_service,
165 _marker: std::marker::PhantomData,
166 }
167 }
168
169 pub async fn create_inherent_data_with_rp_offset(
174 &self,
175 relay_parent: PHash,
176 validation_data: &PersistedValidationData,
177 parent_hash: Block::Hash,
178 timestamp: impl Into<Option<Timestamp>>,
179 relay_parent_descendants: Option<RelayParentData>,
180 collator_peer_id: PeerId,
181 ) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
182 let paras_inherent_data = ParachainInherentDataProvider::create_at(
183 relay_parent,
184 &self.relay_client,
185 validation_data,
186 self.para_id,
187 relay_parent_descendants
188 .map(RelayParentData::into_inherent_descendant_list)
189 .unwrap_or_default(),
190 Vec::new(),
191 collator_peer_id,
192 )
193 .await;
194
195 let paras_inherent_data = match paras_inherent_data {
196 Some(p) => p,
197 None =>
198 return Err(
199 format!("Could not create paras inherent data at {:?}", relay_parent).into()
200 ),
201 };
202
203 let mut other_inherent_data = self
204 .create_inherent_data_providers
205 .create_inherent_data_providers(parent_hash, ())
206 .map_err(|e| e as Box<dyn Error + Send + Sync + 'static>)
207 .await?
208 .create_inherent_data()
209 .await
210 .map_err(Box::new)?;
211
212 if let Some(timestamp) = timestamp.into() {
213 other_inherent_data.replace_data(sp_timestamp::INHERENT_IDENTIFIER, ×tamp);
214 }
215
216 Ok((paras_inherent_data, other_inherent_data))
217 }
218
219 pub async fn create_inherent_data(
222 &self,
223 relay_parent: PHash,
224 validation_data: &PersistedValidationData,
225 parent_hash: Block::Hash,
226 timestamp: impl Into<Option<Timestamp>>,
227 collator_peer_id: PeerId,
228 ) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
229 self.create_inherent_data_with_rp_offset(
230 relay_parent,
231 validation_data,
232 parent_hash,
233 timestamp,
234 None,
235 collator_peer_id,
236 )
237 .await
238 }
239
240 pub async fn build_block_and_import(
242 &mut self,
243 mut params: BuildBlockAndImportParams<'_, Block, P>,
244 ) -> Result<Option<BuiltBlock<Block>>, Box<dyn Error + Send + 'static>> {
245 let mut digest = params.additional_pre_digest;
246 digest.push(params.slot_claim.pre_digest.clone());
247
248 let proposer = self
250 .proposer
251 .init(¶ms.parent_header)
252 .await
253 .map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
254
255 let mut inherent_data_combined = params.extra_inherent_data;
257 params
258 .parachain_inherent_data
259 .provide_inherent_data(&mut inherent_data_combined)
260 .await
261 .map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
262
263 let recorder_passed = params.storage_proof_recorder.is_some();
264 let storage_proof_recorder = params.storage_proof_recorder.unwrap_or_default();
265 let proof_size_ext_registered =
266 params.extra_extensions.is_registered(ProofSizeExt::type_id());
267
268 if !proof_size_ext_registered {
269 params
270 .extra_extensions
271 .register(ProofSizeExt::new(storage_proof_recorder.clone()));
272 } else if proof_size_ext_registered && !recorder_passed {
273 return Err(
274 Box::from("`ProofSizeExt` registered, but no `storage_proof_recorder` provided. This is a bug.")
275 as Box<dyn Error + Send + Sync>
276 )
277 }
278
279 let propose_args = ProposeArgs {
281 inherent_data: inherent_data_combined,
282 inherent_digests: Digest { logs: digest },
283 max_duration: params.proposal_duration,
284 block_size_limit: Some(params.max_pov_size),
285 extra_extensions: params.extra_extensions,
286 storage_proof_recorder: Some(storage_proof_recorder.clone()),
287 };
288
289 let proposal = proposer
291 .propose(propose_args)
292 .await
293 .map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
294
295 let backend_transaction = proposal.storage_changes.transaction.clone();
296
297 let sealed_importable = seal::<_, P>(
298 proposal.block,
299 proposal.storage_changes,
300 ¶ms.slot_claim.author_pub,
301 &self.keystore,
302 )
303 .map_err(|e| e as Box<dyn Error + Send>)?;
304
305 let block = Block::new(
306 sealed_importable.post_header(),
307 sealed_importable
308 .body
309 .as_ref()
310 .expect("body always created with this `propose` fn; qed")
311 .clone(),
312 );
313
314 self.block_import
315 .import_block(sealed_importable)
316 .map_err(|e| Box::new(e) as Box<dyn Error + Send>)
317 .await?;
318
319 let proof = storage_proof_recorder.drain_storage_proof();
320
321 Ok(Some(BuiltBlock { block, proof, backend_transaction }))
322 }
323
324 pub async fn collate(
333 &mut self,
334 parent_header: &Block::Header,
335 slot_claim: &SlotClaim<P::Public>,
336 additional_pre_digest: impl Into<Option<Vec<DigestItem>>>,
337 inherent_data: (ParachainInherentData, InherentData),
338 proposal_duration: Duration,
339 max_pov_size: usize,
340 ) -> Result<Option<(Collation, ParachainBlockData<Block>)>, Box<dyn Error + Send + 'static>> {
341 let maybe_candidate = self
342 .build_block_and_import(BuildBlockAndImportParams {
343 parent_header,
344 slot_claim,
345 additional_pre_digest: additional_pre_digest.into().unwrap_or_default(),
346 parachain_inherent_data: inherent_data.0,
347 extra_inherent_data: inherent_data.1,
348 proposal_duration,
349 max_pov_size,
350 storage_proof_recorder: None,
351 extra_extensions: Default::default(),
352 })
353 .await?;
354
355 let Some(candidate) = maybe_candidate else { return Ok(None) };
356
357 let hash = candidate.block.header().hash();
358 if let Some((collation, block_data)) =
359 self.collator_service.build_collation(parent_header, hash, candidate.into())
360 {
361 block_data.log_size_info();
362
363 if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
364 tracing::info!(
365 target: crate::LOG_TARGET,
366 "Compressed PoV size: {}kb",
367 pov.block_data.0.len() as f64 / 1024f64,
368 );
369 }
370
371 Ok(Some((collation, block_data)))
372 } else {
373 Err(Box::<dyn Error + Send + Sync>::from("Unable to produce collation"))
374 }
375 }
376
377 pub fn collator_service(&self) -> &CS {
379 &self.collator_service
380 }
381}
382
383pub struct SlotClaim<Pub> {
385 author_pub: Pub,
386 pre_digest: DigestItem,
387 slot: Slot,
388 timestamp: Timestamp,
389}
390
391impl<Pub> SlotClaim<Pub> {
392 pub fn unchecked<P>(author_pub: Pub, slot: Slot, timestamp: Timestamp) -> Self
397 where
398 P: Pair<Public = Pub>,
399 P::Public: Codec,
400 P::Signature: Codec,
401 {
402 SlotClaim { author_pub, timestamp, pre_digest: aura_internal::pre_digest::<P>(slot), slot }
403 }
404
405 pub fn author_pub(&self) -> &Pub {
407 &self.author_pub
408 }
409
410 pub fn pre_digest(&self) -> &DigestItem {
412 &self.pre_digest
413 }
414
415 pub fn slot(&self) -> Slot {
417 self.slot
418 }
419
420 pub fn timestamp(&self) -> Timestamp {
423 self.timestamp
424 }
425}
426
427pub async fn claim_slot<B, C, P>(
429 client: &C,
430 parent_hash: B::Hash,
431 relay_parent_header: &PHeader,
432 slot_duration: SlotDuration,
433 relay_chain_slot_duration: Duration,
434 keystore: &KeystorePtr,
435) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
436where
437 B: BlockT,
438 C: ProvideRuntimeApi<B> + Send + Sync + 'static,
439 C::Api: AuraApi<B, P::Public>,
440 P: Pair,
441 P::Public: Codec,
442 P::Signature: Codec,
443{
444 let authorities = client.runtime_api().authorities(parent_hash).map_err(Box::new)?;
446
447 let (slot_now, timestamp) = match consensus_common::relay_slot_and_timestamp(
449 relay_parent_header,
450 relay_chain_slot_duration,
451 ) {
452 Some((r_s, t)) => {
453 let our_slot = Slot::from_timestamp(t, slot_duration);
454 tracing::debug!(
455 target: crate::LOG_TARGET,
456 relay_slot = ?r_s,
457 para_slot = ?our_slot,
458 timestamp = ?t,
459 ?slot_duration,
460 ?relay_chain_slot_duration,
461 "Adjusted relay-chain slot to parachain slot"
462 );
463 (our_slot, t)
464 },
465 None => return Ok(None),
466 };
467
468 let author_pub = {
470 let res = aura_internal::claim_slot::<P>(slot_now, &authorities, keystore).await;
471 match res {
472 Some(p) => p,
473 None => return Ok(None),
474 }
475 };
476
477 Ok(Some(SlotClaim::unchecked::<P>(author_pub, slot_now, timestamp)))
478}
479
480pub fn seal<B: BlockT, P>(
482 pre_sealed: B,
483 storage_changes: StorageChanges<HashingFor<B>>,
484 author_pub: &P::Public,
485 keystore: &KeystorePtr,
486) -> Result<BlockImportParams<B>, Box<dyn Error + Send + Sync + 'static>>
487where
488 P: Pair,
489 P::Signature: Codec + TryFrom<Vec<u8>>,
490 P::Public: AppPublic,
491{
492 let (pre_header, body) = pre_sealed.deconstruct();
493 let pre_hash = pre_header.hash();
494 let block_number = *pre_header.number();
495
496 let block_import_params = {
498 let seal_digest =
499 aura_internal::seal::<_, P>(&pre_hash, &author_pub, keystore).map_err(Box::new)?;
500 let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header);
501 block_import_params.post_digests.push(seal_digest);
502 block_import_params.body = Some(body);
503 block_import_params.state_action =
504 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
505 block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
506 block_import_params
507 };
508 let post_hash = block_import_params.post_hash();
509
510 tracing::info!(
511 target: crate::LOG_TARGET,
512 "๐ Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
513 block_number,
514 post_hash,
515 pre_hash,
516 );
517
518 Ok(block_import_params)
519}