cumulus_client_consensus_aura/collators/
basic.rs1use codec::{Codec, Decode};
27use cumulus_client_collator::{
28 relay_chain_driven::CollationRequest, service::ServiceInterface as CollatorServiceInterface,
29};
30use cumulus_client_consensus_common::ParachainBlockImportMarker;
31use cumulus_client_consensus_proposer::ProposerInterface;
32use cumulus_primitives_core::{relay_chain::BlockId as RBlockId, CollectCollationInfo};
33use cumulus_relay_chain_interface::RelayChainInterface;
34
35use polkadot_node_primitives::CollationResult;
36use polkadot_overseer::Handle as OverseerHandle;
37use polkadot_primitives::{CollatorPair, Id as ParaId, ValidationCode};
38
39use futures::{channel::mpsc::Receiver, prelude::*};
40use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
41use sc_consensus::BlockImport;
42use sp_api::{CallApiAt, ProvideRuntimeApi};
43use sp_application_crypto::AppPublic;
44use sp_blockchain::HeaderBackend;
45use sp_consensus_aura::AuraApi;
46use sp_core::crypto::Pair;
47use sp_inherents::CreateInherentDataProviders;
48use sp_keystore::KeystorePtr;
49use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
50use sp_state_machine::Backend as _;
51use std::{sync::Arc, time::Duration};
52
53use crate::collator as collator_util;
54
55pub struct Params<BI, CIDP, Client, RClient, Proposer, CS> {
57 pub create_inherent_data_providers: CIDP,
61 pub block_import: BI,
63 pub para_client: Arc<Client>,
65 pub relay_client: RClient,
67 pub keystore: KeystorePtr,
69 pub collator_key: CollatorPair,
71 pub para_id: ParaId,
73 pub overseer_handle: OverseerHandle,
75 pub relay_chain_slot_duration: Duration,
77 pub proposer: Proposer,
79 pub collator_service: CS,
81 pub authoring_duration: Duration,
83 pub collation_request_receiver: Option<Receiver<CollationRequest>>,
87}
88
89pub fn run<Block, P, BI, CIDP, Client, RClient, Proposer, CS>(
91 params: Params<BI, CIDP, Client, RClient, Proposer, CS>,
92) -> impl Future<Output = ()> + Send + 'static
93where
94 Block: BlockT + Send,
95 Client: ProvideRuntimeApi<Block>
96 + BlockOf
97 + AuxStore
98 + HeaderBackend<Block>
99 + BlockBackend<Block>
100 + CallApiAt<Block>
101 + Send
102 + Sync
103 + 'static,
104 Client::Api: AuraApi<Block, P::Public> + CollectCollationInfo<Block>,
105 RClient: RelayChainInterface + Send + Clone + 'static,
106 CIDP: CreateInherentDataProviders<Block, ()> + Send + 'static,
107 CIDP::InherentDataProviders: Send,
108 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
109 Proposer: ProposerInterface<Block> + Send + Sync + 'static,
110 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
111 P: Pair,
112 P::Public: AppPublic + Member + Codec,
113 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
114{
115 async move {
116 let mut collation_requests = match params.collation_request_receiver {
117 Some(receiver) => receiver,
118 None =>
119 cumulus_client_collator::relay_chain_driven::init(
120 params.collator_key,
121 params.para_id,
122 params.overseer_handle,
123 )
124 .await,
125 };
126
127 let mut collator = {
128 let params = collator_util::Params {
129 create_inherent_data_providers: params.create_inherent_data_providers,
130 block_import: params.block_import,
131 relay_client: params.relay_client.clone(),
132 keystore: params.keystore.clone(),
133 para_id: params.para_id,
134 proposer: params.proposer,
135 collator_service: params.collator_service,
136 };
137
138 collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
139 };
140
141 let mut last_processed_slot = 0;
142 let mut last_relay_chain_block = Default::default();
143
144 while let Some(request) = collation_requests.next().await {
145 macro_rules! reject_with_error {
146 ($err:expr) => {{
147 request.complete(None);
148 tracing::error!(target: crate::LOG_TARGET, err = ?{ $err });
149 continue;
150 }};
151 }
152
153 macro_rules! try_request {
154 ($x:expr) => {{
155 match $x {
156 Ok(x) => x,
157 Err(e) => reject_with_error!(e),
158 }
159 }};
160 }
161
162 let validation_data = request.persisted_validation_data();
163
164 let parent_header =
165 try_request!(Block::Header::decode(&mut &validation_data.parent_head.0[..]));
166
167 let parent_hash = parent_header.hash();
168
169 if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
170 continue
171 }
172
173 let Ok(Some(code)) =
174 params.para_client.state_at(parent_hash).map_err(drop).and_then(|s| {
175 s.storage(&sp_core::storage::well_known_keys::CODE).map_err(drop)
176 })
177 else {
178 continue;
179 };
180
181 super::check_validation_code_or_log(
182 &ValidationCode::from(code).hash(),
183 params.para_id,
184 ¶ms.relay_client,
185 *request.relay_parent(),
186 )
187 .await;
188
189 let relay_parent_header =
190 match params.relay_client.header(RBlockId::hash(*request.relay_parent())).await {
191 Err(e) => reject_with_error!(e),
192 Ok(None) => continue, Ok(Some(h)) => h,
194 };
195
196 let slot_duration = match params.para_client.runtime_api().slot_duration(parent_hash) {
197 Ok(d) => d,
198 Err(e) => reject_with_error!(e),
199 };
200
201 let claim = match collator_util::claim_slot::<_, _, P>(
202 &*params.para_client,
203 parent_hash,
204 &relay_parent_header,
205 slot_duration,
206 params.relay_chain_slot_duration,
207 ¶ms.keystore,
208 )
209 .await
210 {
211 Ok(None) => continue,
212 Ok(Some(c)) => c,
213 Err(e) => reject_with_error!(e),
214 };
215
216 if last_processed_slot >= *claim.slot() &&
225 last_relay_chain_block < *relay_parent_header.number()
226 {
227 continue
228 }
229
230 let (parachain_inherent_data, other_inherent_data) = try_request!(
231 collator
232 .create_inherent_data(
233 *request.relay_parent(),
234 &validation_data,
235 parent_hash,
236 claim.timestamp(),
237 )
238 .await
239 );
240
241 let allowed_pov_size = (validation_data.max_pov_size / 2) as usize;
242
243 let maybe_collation = try_request!(
244 collator
245 .collate(
246 &parent_header,
247 &claim,
248 None,
249 (parachain_inherent_data, other_inherent_data),
250 params.authoring_duration,
251 allowed_pov_size,
252 )
253 .await
254 );
255
256 if let Some((collation, block_data)) = maybe_collation {
257 let Some(block_hash) = block_data.blocks().first().map(|b| b.hash()) else {
258 continue
259 };
260 let result_sender =
261 Some(collator.collator_service().announce_with_barrier(block_hash));
262 request.complete(Some(CollationResult { collation, result_sender }));
263 } else {
264 request.complete(None);
265 tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
266 }
267
268 last_processed_slot = *claim.slot();
269 last_relay_chain_block = *relay_parent_header.number();
270 }
271 }
272}