1use cumulus_primitives_core::{
21 relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,
22};
23
24use sc_client_api::BlockBackend;
25use sp_api::ProvideRuntimeApi;
26use sp_core::traits::SpawnNamed;
27use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
28
29use cumulus_client_consensus_common::ParachainConsensus;
30use polkadot_node_primitives::{CollationGenerationConfig, CollationResult, MaybeCompressedPoV};
31use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
32use polkadot_overseer::Handle as OverseerHandle;
33use polkadot_primitives::{CollatorPair, Id as ParaId};
34
35use codec::Decode;
36use futures::prelude::*;
37use std::sync::Arc;
38
39use crate::service::CollatorService;
40
41pub mod service;
42
43const LOG_TARGET: &str = "cumulus-collator";
45
46pub struct Collator<Block: BlockT, BS, RA> {
52 service: CollatorService<Block, BS, RA>,
53 parachain_consensus: Box<dyn ParachainConsensus<Block>>,
54}
55
56impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
57 fn clone(&self) -> Self {
58 Collator {
59 service: self.service.clone(),
60 parachain_consensus: self.parachain_consensus.clone(),
61 }
62 }
63}
64
65impl<Block, BS, RA> Collator<Block, BS, RA>
66where
67 Block: BlockT,
68 BS: BlockBackend<Block>,
69 RA: ProvideRuntimeApi<Block>,
70 RA::Api: CollectCollationInfo<Block>,
71{
72 fn new(
74 collator_service: CollatorService<Block, BS, RA>,
75 parachain_consensus: Box<dyn ParachainConsensus<Block>>,
76 ) -> Self {
77 Self { service: collator_service, parachain_consensus }
78 }
79
80 async fn produce_candidate(
81 mut self,
82 relay_parent: PHash,
83 validation_data: PersistedValidationData,
84 ) -> Option<CollationResult> {
85 tracing::trace!(
86 target: LOG_TARGET,
87 relay_parent = ?relay_parent,
88 "Producing candidate",
89 );
90
91 let last_head = match Block::Header::decode(&mut &validation_data.parent_head.0[..]) {
92 Ok(x) => x,
93 Err(e) => {
94 tracing::error!(
95 target: LOG_TARGET,
96 error = ?e,
97 "Could not decode the head data."
98 );
99 return None
100 },
101 };
102
103 let last_head_hash = last_head.hash();
104 if !self.service.check_block_status(last_head_hash, &last_head) {
105 return None
106 }
107
108 tracing::info!(
109 target: LOG_TARGET,
110 relay_parent = ?relay_parent,
111 at = ?last_head_hash,
112 "Starting collation.",
113 );
114
115 let candidate = self
116 .parachain_consensus
117 .produce_candidate(&last_head, relay_parent, &validation_data)
118 .await?;
119
120 let block_hash = candidate.block.header().hash();
121
122 let (collation, b) = self.service.build_collation(&last_head, block_hash, candidate)?;
123
124 b.log_size_info();
125
126 if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
127 tracing::info!(
128 target: LOG_TARGET,
129 "Compressed PoV size: {}kb",
130 pov.block_data.0.len() as f64 / 1024f64,
131 );
132 }
133
134 let result_sender = self.service.announce_with_barrier(block_hash);
135
136 tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",);
137
138 Some(CollationResult { collation, result_sender: Some(result_sender) })
139 }
140}
141
142pub mod relay_chain_driven {
149 use futures::{
150 channel::{mpsc, oneshot},
151 prelude::*,
152 };
153 use polkadot_node_primitives::{CollationGenerationConfig, CollationResult};
154 use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
155 use polkadot_overseer::Handle as OverseerHandle;
156 use polkadot_primitives::{CollatorPair, Id as ParaId};
157
158 use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
159
160 pub struct CollationRequest {
164 relay_parent: PHash,
165 pvd: PersistedValidationData,
166 sender: oneshot::Sender<Option<CollationResult>>,
167 }
168
169 impl CollationRequest {
170 pub fn relay_parent(&self) -> &PHash {
172 &self.relay_parent
173 }
174
175 pub fn persisted_validation_data(&self) -> &PersistedValidationData {
177 &self.pvd
178 }
179
180 pub fn complete(self, collation: Option<CollationResult>) {
182 let _ = self.sender.send(collation);
183 }
184 }
185
186 pub async fn init(
189 key: CollatorPair,
190 para_id: ParaId,
191 overseer_handle: OverseerHandle,
192 ) -> mpsc::Receiver<CollationRequest> {
193 let mut overseer_handle = overseer_handle;
194
195 let (stream_tx, stream_rx) = mpsc::channel(0);
196 let config = CollationGenerationConfig {
197 key,
198 para_id,
199 collator: Some(Box::new(move |relay_parent, validation_data| {
200 let mut stream_tx = stream_tx.clone();
205 let validation_data = validation_data.clone();
206 Box::pin(async move {
207 let (this_tx, this_rx) = oneshot::channel();
208 let request =
209 CollationRequest { relay_parent, pvd: validation_data, sender: this_tx };
210
211 if stream_tx.send(request).await.is_err() {
212 return None
213 }
214
215 this_rx.await.ok().flatten()
216 })
217 })),
218 };
219
220 overseer_handle
221 .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
222 .await;
223
224 overseer_handle
225 .send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
226 .await;
227
228 stream_rx
229 }
230}
231
232pub async fn initialize_collator_subsystems(
237 overseer_handle: &mut OverseerHandle,
238 key: CollatorPair,
239 para_id: ParaId,
240 reinitialize: bool,
241) {
242 let config = CollationGenerationConfig { key, para_id, collator: None };
243
244 if reinitialize {
245 overseer_handle
246 .send_msg(CollationGenerationMessage::Reinitialize(config), "StartCollator")
247 .await;
248 } else {
249 overseer_handle
250 .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
251 .await;
252 }
253
254 overseer_handle
255 .send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
256 .await;
257}
258
259pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
261 pub para_id: ParaId,
262 pub runtime_api: Arc<RA>,
263 pub block_status: Arc<BS>,
264 pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
265 pub overseer_handle: OverseerHandle,
266 pub spawner: Spawner,
267 pub key: CollatorPair,
268 pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
269}
270
271#[deprecated = "Collators should run consensus futures which handle this logic internally"]
273pub async fn start_collator<Block, RA, BS, Spawner>(
274 params: StartCollatorParams<Block, RA, BS, Spawner>,
275) where
276 Block: BlockT,
277 BS: BlockBackend<Block> + Send + Sync + 'static,
278 Spawner: SpawnNamed + Clone + Send + Sync + 'static,
279 RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
280 RA::Api: CollectCollationInfo<Block>,
281{
282 #[allow(deprecated)]
284 start_collator_sync(params);
285}
286
287#[deprecated = "Collators should run consensus futures which handle this logic internally"]
289pub fn start_collator_sync<Block, RA, BS, Spawner>(
290 StartCollatorParams {
291 para_id,
292 block_status,
293 announce_block,
294 overseer_handle,
295 spawner,
296 key,
297 parachain_consensus,
298 runtime_api,
299 }: StartCollatorParams<Block, RA, BS, Spawner>,
300) where
301 Block: BlockT,
302 BS: BlockBackend<Block> + Send + Sync + 'static,
303 Spawner: SpawnNamed + Clone + Send + Sync + 'static,
304 RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
305 RA::Api: CollectCollationInfo<Block>,
306{
307 let collator_service =
308 CollatorService::new(block_status, Arc::new(spawner.clone()), announce_block, runtime_api);
309
310 let collator = Collator::new(collator_service, parachain_consensus);
311
312 let collation_future = Box::pin(async move {
313 let mut request_stream = relay_chain_driven::init(key, para_id, overseer_handle).await;
314 while let Some(request) = request_stream.next().await {
315 let collation = collator
316 .clone()
317 .produce_candidate(
318 *request.relay_parent(),
319 request.persisted_validation_data().clone(),
320 )
321 .await;
322
323 request.complete(collation);
324 }
325 });
326
327 spawner.spawn("cumulus-relay-driven-collator", None, collation_future);
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use async_trait::async_trait;
334 use codec::Encode;
335 use cumulus_client_consensus_common::ParachainCandidate;
336 use cumulus_primitives_core::ParachainBlockData;
337 use cumulus_test_client::{
338 Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder,
339 TestClientBuilder, TestClientBuilderExt,
340 };
341 use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
342 use cumulus_test_runtime::{Block, Header};
343 use futures::{channel::mpsc, executor::block_on, StreamExt};
344 use polkadot_node_primitives::CollationGenerationConfig;
345 use polkadot_node_subsystem::messages::CollationGenerationMessage;
346 use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
347 use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
348 use polkadot_primitives::HeadData;
349 use sp_consensus::BlockOrigin;
350 use sp_core::{testing::TaskExecutor, Pair};
351 use sp_runtime::traits::BlakeTwo256;
352 use sp_state_machine::Backend;
353
354 struct AlwaysSupportsParachains;
355
356 #[async_trait]
357 impl HeadSupportsParachains for AlwaysSupportsParachains {
358 async fn head_supports_parachains(&self, _head: &PHash) -> bool {
359 true
360 }
361 }
362
363 #[derive(Clone)]
364 struct DummyParachainConsensus {
365 client: Arc<Client>,
366 }
367
368 #[async_trait::async_trait]
369 impl ParachainConsensus<Block> for DummyParachainConsensus {
370 async fn produce_candidate(
371 &mut self,
372 parent: &Header,
373 _: PHash,
374 validation_data: &PersistedValidationData,
375 ) -> Option<ParachainCandidate<Block>> {
376 let mut sproof = RelayStateSproofBuilder::default();
377 sproof.included_para_head = Some(HeadData(parent.encode()));
378 sproof.para_id = cumulus_test_runtime::PARACHAIN_ID.into();
379
380 let cumulus_test_client::BlockBuilderAndSupportData { block_builder, .. } = self
381 .client
382 .init_block_builder_at(parent.hash(), Some(validation_data.clone()), sproof);
383
384 let (block, _, proof) = block_builder.build().expect("Creates block").into_inner();
385
386 self.client
387 .import(BlockOrigin::Own, block.clone())
388 .await
389 .expect("Imports the block");
390
391 Some(ParachainCandidate { block, proof: proof.expect("Proof is returned") })
392 }
393 }
394
395 #[test]
396 fn collates_produces_a_block_and_storage_proof_does_not_contains_code() {
397 sp_tracing::try_init_simple();
398
399 let spawner = TaskExecutor::new();
400 let para_id = ParaId::from(100);
401 let announce_block = |_, _| ();
402 let client = Arc::new(TestClientBuilder::new().build());
403 let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();
404
405 let (sub_tx, sub_rx) = mpsc::channel(64);
406
407 let (overseer, handle) =
408 dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None)
409 .expect("Creates overseer builder")
410 .replace_collation_generation(|_| ForwardSubsystem(sub_tx))
411 .build()
412 .expect("Builds overseer");
413
414 spawner.spawn("overseer", None, overseer.run().then(|_| async {}).boxed());
415
416 #[allow(deprecated)]
417 let collator_start = start_collator(StartCollatorParams {
418 runtime_api: client.clone(),
419 block_status: client.clone(),
420 announce_block: Arc::new(announce_block),
421 overseer_handle: OverseerHandle::new(handle),
422 spawner,
423 para_id,
424 key: CollatorPair::generate().0,
425 parachain_consensus: Box::new(DummyParachainConsensus { client }),
426 });
427 block_on(collator_start);
428
429 let msg = block_on(sub_rx.into_future())
430 .0
431 .expect("message should be send by `start_collator` above.");
432
433 let collator_fn = match msg {
434 CollationGenerationMessage::Initialize(CollationGenerationConfig {
435 collator: Some(c),
436 ..
437 }) => c,
438 _ => panic!("unexpected message or no collator fn"),
439 };
440
441 let validation_data =
442 PersistedValidationData { parent_head: header.encode().into(), ..Default::default() };
443 let relay_parent = Default::default();
444
445 let collation = block_on(collator_fn(relay_parent, &validation_data))
446 .expect("Collation is build")
447 .collation;
448
449 let pov = collation.proof_of_validity.into_compressed();
450
451 let decompressed =
452 sp_maybe_compressed_blob::decompress(&pov.block_data.0, 1024 * 1024 * 10).unwrap();
453
454 let block =
455 ParachainBlockData::<Block>::decode(&mut &decompressed[..]).expect("Is a valid block");
456
457 assert_eq!(1, *block.blocks()[0].header().number());
458
459 let proof = block.proof().clone();
461
462 let backend = sp_state_machine::create_proof_check_backend::<BlakeTwo256>(
463 *header.state_root(),
464 proof.to_storage_proof::<BlakeTwo256>(None).unwrap().0,
465 )
466 .unwrap();
467
468 assert!(backend
470 .storage(sp_core::storage::well_known_keys::CODE)
471 .unwrap_err()
472 .contains("Trie lookup error: Database missing expected key"));
473 }
474}