referrerpolicy=no-referrer-when-downgrade

cumulus_client_collator/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Cumulus Collator implementation for Substrate.
19
20use cumulus_primitives_core::{
21	relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,
22};
23
24use sc_client_api::BlockBackend;
25use sp_api::ProvideRuntimeApi;
26use sp_core::traits::SpawnNamed;
27use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
28
29use cumulus_client_consensus_common::ParachainConsensus;
30use polkadot_node_primitives::{CollationGenerationConfig, CollationResult, MaybeCompressedPoV};
31use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
32use polkadot_overseer::Handle as OverseerHandle;
33use polkadot_primitives::{CollatorPair, Id as ParaId};
34
35use codec::Decode;
36use futures::prelude::*;
37use std::sync::Arc;
38
39use crate::service::CollatorService;
40
41pub mod service;
42
43/// The logging target.
44const LOG_TARGET: &str = "cumulus-collator";
45
46/// The implementation of the Cumulus `Collator`.
47///
48/// Note that this implementation is soon to be deprecated and removed, and it is suggested to
49/// directly use the [`CollatorService`] instead, so consensus engine implementations
50/// live at the top level.
51pub struct Collator<Block: BlockT, BS, RA> {
52	service: CollatorService<Block, BS, RA>,
53	parachain_consensus: Box<dyn ParachainConsensus<Block>>,
54}
55
56impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
57	fn clone(&self) -> Self {
58		Collator {
59			service: self.service.clone(),
60			parachain_consensus: self.parachain_consensus.clone(),
61		}
62	}
63}
64
65impl<Block, BS, RA> Collator<Block, BS, RA>
66where
67	Block: BlockT,
68	BS: BlockBackend<Block>,
69	RA: ProvideRuntimeApi<Block>,
70	RA::Api: CollectCollationInfo<Block>,
71{
72	/// Create a new instance.
73	fn new(
74		collator_service: CollatorService<Block, BS, RA>,
75		parachain_consensus: Box<dyn ParachainConsensus<Block>>,
76	) -> Self {
77		Self { service: collator_service, parachain_consensus }
78	}
79
80	async fn produce_candidate(
81		mut self,
82		relay_parent: PHash,
83		validation_data: PersistedValidationData,
84	) -> Option<CollationResult> {
85		tracing::trace!(
86			target: LOG_TARGET,
87			relay_parent = ?relay_parent,
88			"Producing candidate",
89		);
90
91		let last_head = match Block::Header::decode(&mut &validation_data.parent_head.0[..]) {
92			Ok(x) => x,
93			Err(e) => {
94				tracing::error!(
95					target: LOG_TARGET,
96					error = ?e,
97					"Could not decode the head data."
98				);
99				return None
100			},
101		};
102
103		let last_head_hash = last_head.hash();
104		if !self.service.check_block_status(last_head_hash, &last_head) {
105			return None
106		}
107
108		tracing::info!(
109			target: LOG_TARGET,
110			relay_parent = ?relay_parent,
111			at = ?last_head_hash,
112			"Starting collation.",
113		);
114
115		let candidate = self
116			.parachain_consensus
117			.produce_candidate(&last_head, relay_parent, &validation_data)
118			.await?;
119
120		let block_hash = candidate.block.header().hash();
121
122		let (collation, b) = self.service.build_collation(&last_head, block_hash, candidate)?;
123
124		b.log_size_info();
125
126		if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
127			tracing::info!(
128				target: LOG_TARGET,
129				"Compressed PoV size: {}kb",
130				pov.block_data.0.len() as f64 / 1024f64,
131			);
132		}
133
134		let result_sender = self.service.announce_with_barrier(block_hash);
135
136		tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",);
137
138		Some(CollationResult { collation, result_sender: Some(result_sender) })
139	}
140}
141
142/// Relay-chain-driven collators are those whose block production is driven purely
143/// by new relay chain blocks and the most recently included parachain blocks
144/// within them.
145///
146/// This method of driving collators is not suited to anything but the most simple parachain
147/// consensus mechanisms, and this module may soon be deprecated.
148pub mod relay_chain_driven {
149	use futures::{
150		channel::{mpsc, oneshot},
151		prelude::*,
152	};
153	use polkadot_node_primitives::{CollationGenerationConfig, CollationResult};
154	use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
155	use polkadot_overseer::Handle as OverseerHandle;
156	use polkadot_primitives::{CollatorPair, Id as ParaId};
157
158	use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
159
160	/// A request to author a collation, based on the advancement of the relay chain.
161	///
162	/// See the module docs for more info on relay-chain-driven collators.
163	pub struct CollationRequest {
164		relay_parent: PHash,
165		pvd: PersistedValidationData,
166		sender: oneshot::Sender<Option<CollationResult>>,
167	}
168
169	impl CollationRequest {
170		/// Get the relay parent of the collation request.
171		pub fn relay_parent(&self) -> &PHash {
172			&self.relay_parent
173		}
174
175		/// Get the [`PersistedValidationData`] for the request.
176		pub fn persisted_validation_data(&self) -> &PersistedValidationData {
177			&self.pvd
178		}
179
180		/// Complete the request with a collation, if any.
181		pub fn complete(self, collation: Option<CollationResult>) {
182			let _ = self.sender.send(collation);
183		}
184	}
185
186	/// Initialize the collator with Polkadot's collation-generation
187	/// subsystem, returning a stream of collation requests to handle.
188	pub async fn init(
189		key: CollatorPair,
190		para_id: ParaId,
191		overseer_handle: OverseerHandle,
192	) -> mpsc::Receiver<CollationRequest> {
193		let mut overseer_handle = overseer_handle;
194
195		let (stream_tx, stream_rx) = mpsc::channel(0);
196		let config = CollationGenerationConfig {
197			key,
198			para_id,
199			collator: Some(Box::new(move |relay_parent, validation_data| {
200				// Cloning the channel on each usage effectively makes the channel
201				// unbounded. The channel is actually bounded by the block production
202				// and consensus systems of Polkadot, which limits the amount of possible
203				// blocks.
204				let mut stream_tx = stream_tx.clone();
205				let validation_data = validation_data.clone();
206				Box::pin(async move {
207					let (this_tx, this_rx) = oneshot::channel();
208					let request =
209						CollationRequest { relay_parent, pvd: validation_data, sender: this_tx };
210
211					if stream_tx.send(request).await.is_err() {
212						return None
213					}
214
215					this_rx.await.ok().flatten()
216				})
217			})),
218		};
219
220		overseer_handle
221			.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
222			.await;
223
224		overseer_handle
225			.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
226			.await;
227
228		stream_rx
229	}
230}
231
232/// Initialize the collation-related subsystems on the relay-chain side.
233///
234/// This must be done prior to collation, and does not set up any callback for collation.
235/// For callback-driven collators, use the [`relay_chain_driven`] module.
236pub async fn initialize_collator_subsystems(
237	overseer_handle: &mut OverseerHandle,
238	key: CollatorPair,
239	para_id: ParaId,
240	reinitialize: bool,
241) {
242	let config = CollationGenerationConfig { key, para_id, collator: None };
243
244	if reinitialize {
245		overseer_handle
246			.send_msg(CollationGenerationMessage::Reinitialize(config), "StartCollator")
247			.await;
248	} else {
249		overseer_handle
250			.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
251			.await;
252	}
253
254	overseer_handle
255		.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
256		.await;
257}
258
259/// Parameters for [`start_collator`].
260pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
261	pub para_id: ParaId,
262	pub runtime_api: Arc<RA>,
263	pub block_status: Arc<BS>,
264	pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
265	pub overseer_handle: OverseerHandle,
266	pub spawner: Spawner,
267	pub key: CollatorPair,
268	pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
269}
270
271/// Start the collator.
272#[deprecated = "Collators should run consensus futures which handle this logic internally"]
273pub async fn start_collator<Block, RA, BS, Spawner>(
274	params: StartCollatorParams<Block, RA, BS, Spawner>,
275) where
276	Block: BlockT,
277	BS: BlockBackend<Block> + Send + Sync + 'static,
278	Spawner: SpawnNamed + Clone + Send + Sync + 'static,
279	RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
280	RA::Api: CollectCollationInfo<Block>,
281{
282	// This never needed to be asynchronous, but shouldn't be changed due to backcompat.
283	#[allow(deprecated)]
284	start_collator_sync(params);
285}
286
287/// Start the collator in a synchronous function.
288#[deprecated = "Collators should run consensus futures which handle this logic internally"]
289pub fn start_collator_sync<Block, RA, BS, Spawner>(
290	StartCollatorParams {
291		para_id,
292		block_status,
293		announce_block,
294		overseer_handle,
295		spawner,
296		key,
297		parachain_consensus,
298		runtime_api,
299	}: StartCollatorParams<Block, RA, BS, Spawner>,
300) where
301	Block: BlockT,
302	BS: BlockBackend<Block> + Send + Sync + 'static,
303	Spawner: SpawnNamed + Clone + Send + Sync + 'static,
304	RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
305	RA::Api: CollectCollationInfo<Block>,
306{
307	let collator_service =
308		CollatorService::new(block_status, Arc::new(spawner.clone()), announce_block, runtime_api);
309
310	let collator = Collator::new(collator_service, parachain_consensus);
311
312	let collation_future = Box::pin(async move {
313		let mut request_stream = relay_chain_driven::init(key, para_id, overseer_handle).await;
314		while let Some(request) = request_stream.next().await {
315			let collation = collator
316				.clone()
317				.produce_candidate(
318					*request.relay_parent(),
319					request.persisted_validation_data().clone(),
320				)
321				.await;
322
323			request.complete(collation);
324		}
325	});
326
327	spawner.spawn("cumulus-relay-driven-collator", None, collation_future);
328}
329
330#[cfg(test)]
331mod tests {
332	use super::*;
333	use async_trait::async_trait;
334	use codec::Encode;
335	use cumulus_client_consensus_common::ParachainCandidate;
336	use cumulus_primitives_core::ParachainBlockData;
337	use cumulus_test_client::{
338		Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder,
339		TestClientBuilder, TestClientBuilderExt,
340	};
341	use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
342	use cumulus_test_runtime::{Block, Header};
343	use futures::{channel::mpsc, executor::block_on, StreamExt};
344	use polkadot_node_primitives::CollationGenerationConfig;
345	use polkadot_node_subsystem::messages::CollationGenerationMessage;
346	use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
347	use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
348	use polkadot_primitives::HeadData;
349	use sp_consensus::BlockOrigin;
350	use sp_core::{testing::TaskExecutor, Pair};
351	use sp_runtime::traits::BlakeTwo256;
352	use sp_state_machine::Backend;
353
354	struct AlwaysSupportsParachains;
355
356	#[async_trait]
357	impl HeadSupportsParachains for AlwaysSupportsParachains {
358		async fn head_supports_parachains(&self, _head: &PHash) -> bool {
359			true
360		}
361	}
362
363	#[derive(Clone)]
364	struct DummyParachainConsensus {
365		client: Arc<Client>,
366	}
367
368	#[async_trait::async_trait]
369	impl ParachainConsensus<Block> for DummyParachainConsensus {
370		async fn produce_candidate(
371			&mut self,
372			parent: &Header,
373			_: PHash,
374			validation_data: &PersistedValidationData,
375		) -> Option<ParachainCandidate<Block>> {
376			let mut sproof = RelayStateSproofBuilder::default();
377			sproof.included_para_head = Some(HeadData(parent.encode()));
378			sproof.para_id = cumulus_test_runtime::PARACHAIN_ID.into();
379
380			let cumulus_test_client::BlockBuilderAndSupportData { block_builder, .. } = self
381				.client
382				.init_block_builder_at(parent.hash(), Some(validation_data.clone()), sproof);
383
384			let (block, _, proof) = block_builder.build().expect("Creates block").into_inner();
385
386			self.client
387				.import(BlockOrigin::Own, block.clone())
388				.await
389				.expect("Imports the block");
390
391			Some(ParachainCandidate { block, proof: proof.expect("Proof is returned") })
392		}
393	}
394
395	#[test]
396	fn collates_produces_a_block_and_storage_proof_does_not_contains_code() {
397		sp_tracing::try_init_simple();
398
399		let spawner = TaskExecutor::new();
400		let para_id = ParaId::from(100);
401		let announce_block = |_, _| ();
402		let client = Arc::new(TestClientBuilder::new().build());
403		let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();
404
405		let (sub_tx, sub_rx) = mpsc::channel(64);
406
407		let (overseer, handle) =
408			dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None)
409				.expect("Creates overseer builder")
410				.replace_collation_generation(|_| ForwardSubsystem(sub_tx))
411				.build()
412				.expect("Builds overseer");
413
414		spawner.spawn("overseer", None, overseer.run().then(|_| async {}).boxed());
415
416		#[allow(deprecated)]
417		let collator_start = start_collator(StartCollatorParams {
418			runtime_api: client.clone(),
419			block_status: client.clone(),
420			announce_block: Arc::new(announce_block),
421			overseer_handle: OverseerHandle::new(handle),
422			spawner,
423			para_id,
424			key: CollatorPair::generate().0,
425			parachain_consensus: Box::new(DummyParachainConsensus { client }),
426		});
427		block_on(collator_start);
428
429		let msg = block_on(sub_rx.into_future())
430			.0
431			.expect("message should be send by `start_collator` above.");
432
433		let collator_fn = match msg {
434			CollationGenerationMessage::Initialize(CollationGenerationConfig {
435				collator: Some(c),
436				..
437			}) => c,
438			_ => panic!("unexpected message or no collator fn"),
439		};
440
441		let validation_data =
442			PersistedValidationData { parent_head: header.encode().into(), ..Default::default() };
443		let relay_parent = Default::default();
444
445		let collation = block_on(collator_fn(relay_parent, &validation_data))
446			.expect("Collation is build")
447			.collation;
448
449		let pov = collation.proof_of_validity.into_compressed();
450
451		let decompressed =
452			sp_maybe_compressed_blob::decompress(&pov.block_data.0, 1024 * 1024 * 10).unwrap();
453
454		let block =
455			ParachainBlockData::<Block>::decode(&mut &decompressed[..]).expect("Is a valid block");
456
457		assert_eq!(1, *block.blocks()[0].header().number());
458
459		// Ensure that we did not include `:code` in the proof.
460		let proof = block.proof().clone();
461
462		let backend = sp_state_machine::create_proof_check_backend::<BlakeTwo256>(
463			*header.state_root(),
464			proof.to_storage_proof::<BlakeTwo256>(None).unwrap().0,
465		)
466		.unwrap();
467
468		// Should return an error, as it was not included while building the proof.
469		assert!(backend
470			.storage(sp_core::storage::well_known_keys::CODE)
471			.unwrap_err()
472			.contains("Trie lookup error: Database missing expected key"));
473	}
474}