1use codec::Codec;
29use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
30use cumulus_client_consensus_common::{
31 self as consensus_common, ParachainBlockImportMarker, ParachainCandidate,
32};
33use cumulus_client_consensus_proposer::ProposerInterface;
34use cumulus_client_parachain_inherent::{ParachainInherentData, ParachainInherentDataProvider};
35use cumulus_primitives_core::{
36 relay_chain::Hash as PHash, DigestItem, ParachainBlockData, PersistedValidationData,
37};
38use cumulus_relay_chain_interface::RelayChainInterface;
39
40use polkadot_node_primitives::{Collation, MaybeCompressedPoV};
41use polkadot_primitives::{Header as PHeader, Id as ParaId};
42
43use crate::collators::RelayParentData;
44use futures::prelude::*;
45use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction};
46use sc_consensus_aura::standalone as aura_internal;
47use sp_api::ProvideRuntimeApi;
48use sp_application_crypto::AppPublic;
49use sp_consensus::BlockOrigin;
50use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
51use sp_core::crypto::Pair;
52use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
53use sp_keystore::KeystorePtr;
54use sp_runtime::{
55 generic::Digest,
56 traits::{Block as BlockT, HashingFor, Header as HeaderT, Member},
57};
58use sp_state_machine::StorageChanges;
59use sp_timestamp::Timestamp;
60use std::{error::Error, time::Duration};
61
62pub struct Params<BI, CIDP, RClient, Proposer, CS> {
64 pub create_inherent_data_providers: CIDP,
66 pub block_import: BI,
68 pub relay_client: RClient,
70 pub keystore: KeystorePtr,
72 pub para_id: ParaId,
74 pub proposer: Proposer,
76 pub collator_service: CS,
79}
80
81pub struct Collator<Block, P, BI, CIDP, RClient, Proposer, CS> {
84 create_inherent_data_providers: CIDP,
85 block_import: BI,
86 relay_client: RClient,
87 keystore: KeystorePtr,
88 para_id: ParaId,
89 proposer: Proposer,
90 collator_service: CS,
91 _marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
92}
93
94impl<Block, P, BI, CIDP, RClient, Proposer, CS> Collator<Block, P, BI, CIDP, RClient, Proposer, CS>
95where
96 Block: BlockT,
97 RClient: RelayChainInterface,
98 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
99 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
100 Proposer: ProposerInterface<Block>,
101 CS: CollatorServiceInterface<Block>,
102 P: Pair,
103 P::Public: AppPublic + Member,
104 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
105{
106 pub fn new(params: Params<BI, CIDP, RClient, Proposer, CS>) -> Self {
108 Collator {
109 create_inherent_data_providers: params.create_inherent_data_providers,
110 block_import: params.block_import,
111 relay_client: params.relay_client,
112 keystore: params.keystore,
113 para_id: params.para_id,
114 proposer: params.proposer,
115 collator_service: params.collator_service,
116 _marker: std::marker::PhantomData,
117 }
118 }
119
120 pub async fn create_inherent_data_with_rp_offset(
125 &self,
126 relay_parent: PHash,
127 validation_data: &PersistedValidationData,
128 parent_hash: Block::Hash,
129 timestamp: impl Into<Option<Timestamp>>,
130 relay_parent_descendants: Option<RelayParentData>,
131 ) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
132 let paras_inherent_data = ParachainInherentDataProvider::create_at(
133 relay_parent,
134 &self.relay_client,
135 validation_data,
136 self.para_id,
137 relay_parent_descendants
138 .map(RelayParentData::into_inherent_descendant_list)
139 .unwrap_or_default(),
140 Vec::new(),
141 )
142 .await;
143
144 let paras_inherent_data = match paras_inherent_data {
145 Some(p) => p,
146 None =>
147 return Err(
148 format!("Could not create paras inherent data at {:?}", relay_parent).into()
149 ),
150 };
151
152 let mut other_inherent_data = self
153 .create_inherent_data_providers
154 .create_inherent_data_providers(parent_hash, ())
155 .map_err(|e| e as Box<dyn Error + Send + Sync + 'static>)
156 .await?
157 .create_inherent_data()
158 .await
159 .map_err(Box::new)?;
160
161 if let Some(timestamp) = timestamp.into() {
162 other_inherent_data.replace_data(sp_timestamp::INHERENT_IDENTIFIER, ×tamp);
163 }
164
165 Ok((paras_inherent_data, other_inherent_data))
166 }
167
168 pub async fn create_inherent_data(
171 &self,
172 relay_parent: PHash,
173 validation_data: &PersistedValidationData,
174 parent_hash: Block::Hash,
175 timestamp: impl Into<Option<Timestamp>>,
176 ) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
177 self.create_inherent_data_with_rp_offset(
178 relay_parent,
179 validation_data,
180 parent_hash,
181 timestamp,
182 None,
183 )
184 .await
185 }
186
187 pub async fn build_block_and_import(
189 &mut self,
190 parent_header: &Block::Header,
191 slot_claim: &SlotClaim<P::Public>,
192 additional_pre_digest: impl Into<Option<Vec<DigestItem>>>,
193 inherent_data: (ParachainInherentData, InherentData),
194 proposal_duration: Duration,
195 max_pov_size: usize,
196 ) -> Result<Option<ParachainCandidate<Block>>, Box<dyn Error + Send + 'static>> {
197 let mut digest = additional_pre_digest.into().unwrap_or_default();
198 digest.push(slot_claim.pre_digest.clone());
199
200 let maybe_proposal = self
201 .proposer
202 .propose(
203 &parent_header,
204 &inherent_data.0,
205 inherent_data.1,
206 Digest { logs: digest },
207 proposal_duration,
208 Some(max_pov_size),
209 )
210 .await
211 .map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
212
213 let proposal = match maybe_proposal {
214 None => return Ok(None),
215 Some(p) => p,
216 };
217
218 let sealed_importable = seal::<_, P>(
219 proposal.block,
220 proposal.storage_changes,
221 &slot_claim.author_pub,
222 &self.keystore,
223 )
224 .map_err(|e| e as Box<dyn Error + Send>)?;
225
226 let block = Block::new(
227 sealed_importable.post_header(),
228 sealed_importable
229 .body
230 .as_ref()
231 .expect("body always created with this `propose` fn; qed")
232 .clone(),
233 );
234
235 self.block_import
236 .import_block(sealed_importable)
237 .map_err(|e| Box::new(e) as Box<dyn Error + Send>)
238 .await?;
239
240 Ok(Some(ParachainCandidate { block, proof: proposal.proof }))
241 }
242
243 pub async fn collate(
252 &mut self,
253 parent_header: &Block::Header,
254 slot_claim: &SlotClaim<P::Public>,
255 additional_pre_digest: impl Into<Option<Vec<DigestItem>>>,
256 inherent_data: (ParachainInherentData, InherentData),
257 proposal_duration: Duration,
258 max_pov_size: usize,
259 ) -> Result<Option<(Collation, ParachainBlockData<Block>)>, Box<dyn Error + Send + 'static>> {
260 let maybe_candidate = self
261 .build_block_and_import(
262 parent_header,
263 slot_claim,
264 additional_pre_digest,
265 inherent_data,
266 proposal_duration,
267 max_pov_size,
268 )
269 .await?;
270
271 let Some(candidate) = maybe_candidate else { return Ok(None) };
272
273 let hash = candidate.block.header().hash();
274 if let Some((collation, block_data)) =
275 self.collator_service.build_collation(parent_header, hash, candidate)
276 {
277 block_data.log_size_info();
278
279 if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
280 tracing::info!(
281 target: crate::LOG_TARGET,
282 "Compressed PoV size: {}kb",
283 pov.block_data.0.len() as f64 / 1024f64,
284 );
285 }
286
287 Ok(Some((collation, block_data)))
288 } else {
289 Err(Box::<dyn Error + Send + Sync>::from("Unable to produce collation"))
290 }
291 }
292
293 pub fn collator_service(&self) -> &CS {
295 &self.collator_service
296 }
297}
298
299pub struct SlotClaim<Pub> {
301 author_pub: Pub,
302 pre_digest: DigestItem,
303 slot: Slot,
304 timestamp: Timestamp,
305}
306
307impl<Pub> SlotClaim<Pub> {
308 pub fn unchecked<P>(author_pub: Pub, slot: Slot, timestamp: Timestamp) -> Self
313 where
314 P: Pair<Public = Pub>,
315 P::Public: Codec,
316 P::Signature: Codec,
317 {
318 SlotClaim { author_pub, timestamp, pre_digest: aura_internal::pre_digest::<P>(slot), slot }
319 }
320
321 pub fn author_pub(&self) -> &Pub {
323 &self.author_pub
324 }
325
326 pub fn pre_digest(&self) -> &DigestItem {
328 &self.pre_digest
329 }
330
331 pub fn slot(&self) -> Slot {
333 self.slot
334 }
335
336 pub fn timestamp(&self) -> Timestamp {
339 self.timestamp
340 }
341}
342
343pub async fn claim_slot<B, C, P>(
345 client: &C,
346 parent_hash: B::Hash,
347 relay_parent_header: &PHeader,
348 slot_duration: SlotDuration,
349 relay_chain_slot_duration: Duration,
350 keystore: &KeystorePtr,
351) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
352where
353 B: BlockT,
354 C: ProvideRuntimeApi<B> + Send + Sync + 'static,
355 C::Api: AuraApi<B, P::Public>,
356 P: Pair,
357 P::Public: Codec,
358 P::Signature: Codec,
359{
360 let authorities = client.runtime_api().authorities(parent_hash).map_err(Box::new)?;
362
363 let (slot_now, timestamp) = match consensus_common::relay_slot_and_timestamp(
365 relay_parent_header,
366 relay_chain_slot_duration,
367 ) {
368 Some((r_s, t)) => {
369 let our_slot = Slot::from_timestamp(t, slot_duration);
370 tracing::debug!(
371 target: crate::LOG_TARGET,
372 relay_slot = ?r_s,
373 para_slot = ?our_slot,
374 timestamp = ?t,
375 ?slot_duration,
376 ?relay_chain_slot_duration,
377 "Adjusted relay-chain slot to parachain slot"
378 );
379 (our_slot, t)
380 },
381 None => return Ok(None),
382 };
383
384 let author_pub = {
386 let res = aura_internal::claim_slot::<P>(slot_now, &authorities, keystore).await;
387 match res {
388 Some(p) => p,
389 None => return Ok(None),
390 }
391 };
392
393 Ok(Some(SlotClaim::unchecked::<P>(author_pub, slot_now, timestamp)))
394}
395
396pub fn seal<B: BlockT, P>(
398 pre_sealed: B,
399 storage_changes: StorageChanges<HashingFor<B>>,
400 author_pub: &P::Public,
401 keystore: &KeystorePtr,
402) -> Result<BlockImportParams<B>, Box<dyn Error + Send + Sync + 'static>>
403where
404 P: Pair,
405 P::Signature: Codec + TryFrom<Vec<u8>>,
406 P::Public: AppPublic,
407{
408 let (pre_header, body) = pre_sealed.deconstruct();
409 let pre_hash = pre_header.hash();
410 let block_number = *pre_header.number();
411
412 let block_import_params = {
414 let seal_digest =
415 aura_internal::seal::<_, P>(&pre_hash, &author_pub, keystore).map_err(Box::new)?;
416 let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header);
417 block_import_params.post_digests.push(seal_digest);
418 block_import_params.body = Some(body);
419 block_import_params.state_action =
420 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
421 block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
422 block_import_params
423 };
424 let post_hash = block_import_params.post_hash();
425
426 tracing::info!(
427 target: crate::LOG_TARGET,
428 "๐ Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
429 block_number,
430 post_hash,
431 pre_hash,
432 );
433
434 Ok(block_import_params)
435}