cumulus_client_consensus_aura/collators/
basic.rs1use codec::{Codec, Decode};
27use cumulus_client_collator::{
28 relay_chain_driven::CollationRequest, service::ServiceInterface as CollatorServiceInterface,
29};
30use cumulus_client_consensus_common::ParachainBlockImportMarker;
31use cumulus_primitives_core::{relay_chain::BlockId as RBlockId, CollectCollationInfo};
32use cumulus_relay_chain_interface::RelayChainInterface;
33use sp_consensus::Environment;
34
35use polkadot_node_primitives::CollationResult;
36use polkadot_overseer::Handle as OverseerHandle;
37use polkadot_primitives::{CollatorPair, Id as ParaId, ValidationCode};
38
39use futures::{channel::mpsc::Receiver, prelude::*};
40use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
41use sc_consensus::BlockImport;
42use sc_network_types::PeerId;
43use sp_api::{CallApiAt, ProvideRuntimeApi};
44use sp_application_crypto::AppPublic;
45use sp_blockchain::HeaderBackend;
46use sp_consensus_aura::AuraApi;
47use sp_core::crypto::Pair;
48use sp_inherents::CreateInherentDataProviders;
49use sp_keystore::KeystorePtr;
50use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
51use sp_state_machine::Backend as _;
52use std::{sync::Arc, time::Duration};
53
54use crate::collator as collator_util;
55
56pub struct Params<BI, CIDP, Client, RClient, Proposer, CS> {
58 pub create_inherent_data_providers: CIDP,
62 pub block_import: BI,
64 pub para_client: Arc<Client>,
66 pub relay_client: RClient,
68 pub keystore: KeystorePtr,
70 pub collator_key: CollatorPair,
72 pub collator_peer_id: PeerId,
74 pub para_id: ParaId,
76 pub overseer_handle: OverseerHandle,
78 pub relay_chain_slot_duration: Duration,
80 pub proposer: Proposer,
82 pub collator_service: CS,
84 pub authoring_duration: Duration,
86 pub collation_request_receiver: Option<Receiver<CollationRequest>>,
90}
91
92pub fn run<Block, P, BI, CIDP, Client, RClient, Proposer, CS>(
94 params: Params<BI, CIDP, Client, RClient, Proposer, CS>,
95) -> impl Future<Output = ()> + Send + 'static
96where
97 Block: BlockT + Send,
98 Client: ProvideRuntimeApi<Block>
99 + BlockOf
100 + AuxStore
101 + HeaderBackend<Block>
102 + BlockBackend<Block>
103 + CallApiAt<Block>
104 + Send
105 + Sync
106 + 'static,
107 Client::Api: AuraApi<Block, P::Public> + CollectCollationInfo<Block>,
108 RClient: RelayChainInterface + Send + Clone + 'static,
109 CIDP: CreateInherentDataProviders<Block, ()> + Send + 'static,
110 CIDP::InherentDataProviders: Send,
111 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
112 Proposer: Environment<Block> + Clone + Send + Sync + 'static,
113 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
114 P: Pair,
115 P::Public: AppPublic + Member + Codec,
116 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
117{
118 async move {
119 let mut collation_requests = match params.collation_request_receiver {
120 Some(receiver) => receiver,
121 None =>
122 cumulus_client_collator::relay_chain_driven::init(
123 params.collator_key,
124 params.para_id,
125 params.overseer_handle,
126 )
127 .await,
128 };
129
130 let mut collator = {
131 let params = collator_util::Params {
132 create_inherent_data_providers: params.create_inherent_data_providers,
133 block_import: params.block_import,
134 relay_client: params.relay_client.clone(),
135 keystore: params.keystore.clone(),
136 collator_peer_id: params.collator_peer_id,
137 para_id: params.para_id,
138 proposer: params.proposer,
139 collator_service: params.collator_service,
140 };
141
142 collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
143 };
144
145 let mut last_processed_slot = 0;
146 let mut last_relay_chain_block = Default::default();
147
148 while let Some(request) = collation_requests.next().await {
149 macro_rules! reject_with_error {
150 ($err:expr) => {{
151 request.complete(None);
152 tracing::error!(target: crate::LOG_TARGET, err = ?{ $err });
153 continue;
154 }};
155 }
156
157 macro_rules! try_request {
158 ($x:expr) => {{
159 match $x {
160 Ok(x) => x,
161 Err(e) => reject_with_error!(e),
162 }
163 }};
164 }
165
166 let validation_data = request.persisted_validation_data();
167
168 let parent_header =
169 try_request!(Block::Header::decode(&mut &validation_data.parent_head.0[..]));
170
171 let parent_hash = parent_header.hash();
172
173 if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
174 continue
175 }
176
177 let Ok(Some(code)) =
178 params.para_client.state_at(parent_hash).map_err(drop).and_then(|s| {
179 s.storage(&sp_core::storage::well_known_keys::CODE).map_err(drop)
180 })
181 else {
182 continue;
183 };
184
185 super::check_validation_code_or_log(
186 &ValidationCode::from(code).hash(),
187 params.para_id,
188 ¶ms.relay_client,
189 *request.relay_parent(),
190 )
191 .await;
192
193 let relay_parent_header =
194 match params.relay_client.header(RBlockId::hash(*request.relay_parent())).await {
195 Err(e) => reject_with_error!(e),
196 Ok(None) => continue, Ok(Some(h)) => h,
198 };
199
200 let slot_duration = match params.para_client.runtime_api().slot_duration(parent_hash) {
201 Ok(d) => d,
202 Err(e) => reject_with_error!(e),
203 };
204
205 let claim = match collator_util::claim_slot::<_, _, P>(
206 &*params.para_client,
207 parent_hash,
208 &relay_parent_header,
209 slot_duration,
210 params.relay_chain_slot_duration,
211 ¶ms.keystore,
212 )
213 .await
214 {
215 Ok(None) => continue,
216 Ok(Some(c)) => c,
217 Err(e) => reject_with_error!(e),
218 };
219
220 if last_processed_slot >= *claim.slot() &&
229 last_relay_chain_block < *relay_parent_header.number()
230 {
231 continue
232 }
233
234 let (parachain_inherent_data, other_inherent_data) = try_request!(
235 collator
236 .create_inherent_data(
237 *request.relay_parent(),
238 &validation_data,
239 parent_hash,
240 claim.timestamp(),
241 params.collator_peer_id,
242 )
243 .await
244 );
245
246 let allowed_pov_size = (validation_data.max_pov_size / 2) as usize;
247
248 let maybe_collation = try_request!(
249 collator
250 .collate(
251 &parent_header,
252 &claim,
253 None,
254 (parachain_inherent_data, other_inherent_data),
255 params.authoring_duration,
256 allowed_pov_size,
257 )
258 .await
259 );
260
261 if let Some((collation, block_data)) = maybe_collation {
262 let Some(block_hash) = block_data.blocks().first().map(|b| b.hash()) else {
263 continue
264 };
265 let result_sender =
266 Some(collator.collator_service().announce_with_barrier(block_hash));
267 request.complete(Some(CollationResult { collation, result_sender }));
268 } else {
269 request.complete(None);
270 tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
271 }
272
273 last_processed_slot = *claim.slot();
274 last_relay_chain_block = *relay_parent_header.number();
275 }
276 }
277}