referrerpolicy=no-referrer-when-downgrade

polkadot_omni_node_lib/nodes/
manual_seal.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use crate::common::{
18	rpc::BuildRpcExtensions as BuildRpcExtensionsT,
19	spec::{BaseNodeSpec, BuildImportQueue, ClientBlockImport, NodeSpec as NodeSpecT},
20	types::{Hash, ParachainBlockImport, ParachainClient},
21};
22use codec::Encode;
23use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
24use cumulus_primitives_aura::AuraUnincludedSegmentApi;
25use cumulus_primitives_core::CollectCollationInfo;
26use futures::FutureExt;
27use polkadot_primitives::UpgradeGoAhead;
28use sc_client_api::Backend;
29use sc_consensus::{DefaultImportQueue, LongestChain};
30use sc_consensus_manual_seal::rpc::{ManualSeal, ManualSealApiServer};
31use sc_network::NetworkBackend;
32use sc_service::{Configuration, PartialComponents, TaskManager};
33use sc_telemetry::TelemetryHandle;
34use sc_transaction_pool_api::OffchainTransactionPoolFactory;
35use sp_api::{ApiExt, ProvideRuntimeApi};
36use sp_runtime::traits::Header;
37use std::{marker::PhantomData, sync::Arc};
38
39pub struct ManualSealNode<NodeSpec>(PhantomData<NodeSpec>);
40
41impl<NodeSpec: NodeSpecT>
42	BuildImportQueue<
43		NodeSpec::Block,
44		NodeSpec::RuntimeApi,
45		Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>,
46	> for ManualSealNode<NodeSpec>
47{
48	fn build_import_queue(
49		client: Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>,
50		_block_import: ParachainBlockImport<
51			NodeSpec::Block,
52			Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>,
53		>,
54		config: &Configuration,
55		_telemetry_handle: Option<TelemetryHandle>,
56		task_manager: &TaskManager,
57	) -> sc_service::error::Result<DefaultImportQueue<NodeSpec::Block>> {
58		Ok(sc_consensus_manual_seal::import_queue(
59			Box::new(client.clone()),
60			&task_manager.spawn_essential_handle(),
61			config.prometheus_registry(),
62		))
63	}
64}
65
66impl<NodeSpec: NodeSpecT> BaseNodeSpec for ManualSealNode<NodeSpec> {
67	type Block = NodeSpec::Block;
68	type RuntimeApi = NodeSpec::RuntimeApi;
69	type BuildImportQueue = Self;
70	type InitBlockImport = ClientBlockImport;
71}
72
73impl<NodeSpec: NodeSpecT> ManualSealNode<NodeSpec> {
74	pub fn new() -> Self {
75		Self(Default::default())
76	}
77
78	pub fn start_node<Net>(
79		&self,
80		mut config: Configuration,
81		block_time: u64,
82	) -> sc_service::error::Result<TaskManager>
83	where
84		Net: NetworkBackend<NodeSpec::Block, Hash>,
85	{
86		let PartialComponents {
87			client,
88			backend,
89			mut task_manager,
90			import_queue,
91			keystore_container,
92			select_chain: _,
93			transaction_pool,
94			other: (_, mut telemetry, _, _),
95		} = Self::new_partial(&config)?;
96		let select_chain = LongestChain::new(backend.clone());
97
98		let para_id =
99			Self::parachain_id(&client, &config).ok_or("Failed to retrieve the parachain id")?;
100
101		// Since this is a dev node, prevent it from connecting to peers.
102		config.network.default_peers_set.in_peers = 0;
103		config.network.default_peers_set.out_peers = 0;
104		let net_config = sc_network::config::FullNetworkConfiguration::<_, _, Net>::new(
105			&config.network,
106			config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()),
107		);
108		let metrics = Net::register_notification_metrics(
109			config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
110		);
111
112		let (network, system_rpc_tx, tx_handler_controller, sync_service) =
113			sc_service::build_network(sc_service::BuildNetworkParams {
114				config: &config,
115				client: client.clone(),
116				transaction_pool: transaction_pool.clone(),
117				spawn_handle: task_manager.spawn_handle(),
118				import_queue,
119				net_config,
120				block_announce_validator_builder: None,
121				warp_sync_config: None,
122				block_relay: None,
123				metrics,
124			})?;
125
126		if config.offchain_worker.enabled {
127			let offchain_workers =
128				sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
129					runtime_api_provider: client.clone(),
130					keystore: Some(keystore_container.keystore()),
131					offchain_db: backend.offchain_storage(),
132					transaction_pool: Some(OffchainTransactionPoolFactory::new(
133						transaction_pool.clone(),
134					)),
135					network_provider: Arc::new(network.clone()),
136					is_validator: config.role.is_authority(),
137					enable_http_requests: true,
138					custom_extensions: move |_| vec![],
139				})?;
140			task_manager.spawn_handle().spawn(
141				"offchain-workers-runner",
142				"offchain-work",
143				offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
144			);
145		}
146
147		let proposer = sc_basic_authorship::ProposerFactory::new(
148			task_manager.spawn_handle(),
149			client.clone(),
150			transaction_pool.clone(),
151			None,
152			None,
153		);
154
155		let (manual_seal_sink, manual_seal_stream) = futures::channel::mpsc::channel(1024);
156		let mut manual_seal_sink_clone = manual_seal_sink.clone();
157		task_manager
158			.spawn_essential_handle()
159			.spawn("block_authoring", None, async move {
160				loop {
161					futures_timer::Delay::new(std::time::Duration::from_millis(block_time)).await;
162					manual_seal_sink_clone
163						.try_send(sc_consensus_manual_seal::EngineCommand::SealNewBlock {
164							create_empty: true,
165							finalize: true,
166							parent_hash: None,
167							sender: None,
168						})
169						.unwrap();
170				}
171			});
172
173		let client_for_cidp = client.clone();
174		let params = sc_consensus_manual_seal::ManualSealParams {
175			block_import: client.clone(),
176			env: proposer,
177			client: client.clone(),
178			pool: transaction_pool.clone(),
179			select_chain,
180			commands_stream: Box::pin(manual_seal_stream),
181			consensus_data_provider: None,
182			create_inherent_data_providers: move |block: Hash, ()| {
183				let current_para_head = client_for_cidp
184					.header(block)
185					.expect("Header lookup should succeed")
186					.expect("Header passed in as parent should be present in backend.");
187
188				let should_send_go_ahead = client_for_cidp
189					.runtime_api()
190					.collect_collation_info(block, &current_para_head)
191					.map(|info| info.new_validation_code.is_some())
192					.unwrap_or_default();
193
194				// The API version is relevant here because the constraints in the runtime changed
195				// in https://github.com/paritytech/polkadot-sdk/pull/6825. In general, the logic
196				// here assumes that we are using the aura-ext consensushook in the parachain
197				// runtime.
198				let requires_relay_progress = client_for_cidp
199					.runtime_api()
200					.has_api_with::<dyn AuraUnincludedSegmentApi<NodeSpec::Block>, _>(
201						block,
202						|version| version > 1,
203					)
204					.ok()
205					.unwrap_or_default();
206
207				let current_para_block_head =
208					Some(polkadot_primitives::HeadData(current_para_head.encode()));
209				let client_for_xcm = client_for_cidp.clone();
210				async move {
211					use sp_runtime::traits::UniqueSaturatedInto;
212
213					let mocked_parachain = MockValidationDataInherentDataProvider {
214						// When using manual seal we start from block 0, and it's very unlikely to
215						// reach a block number > u32::MAX.
216						current_para_block: UniqueSaturatedInto::<u32>::unique_saturated_into(
217							*current_para_head.number(),
218						),
219						para_id,
220						current_para_block_head,
221						relay_offset: 0,
222						relay_blocks_per_para_block: requires_relay_progress
223							.then(|| 1)
224							.unwrap_or_default(),
225						para_blocks_per_relay_epoch: 10,
226						relay_randomness_config: (),
227						xcm_config: MockXcmConfig::new(&*client_for_xcm, block, Default::default()),
228						raw_downward_messages: vec![],
229						raw_horizontal_messages: vec![],
230						additional_key_values: None,
231						upgrade_go_ahead: should_send_go_ahead.then(|| {
232							log::info!(
233								"Detected pending validation code, sending go-ahead signal."
234							);
235							UpgradeGoAhead::GoAhead
236						}),
237					};
238					Ok((
239						// This is intentional, as the runtime that we expect to run against this
240						// will never receive the aura-related inherents/digests, and providing
241						// real timestamps would cause aura <> timestamp checking to fail.
242						sp_timestamp::InherentDataProvider::new(sp_timestamp::Timestamp::new(0)),
243						mocked_parachain,
244					))
245				}
246			},
247		};
248		let authorship_future = sc_consensus_manual_seal::run_manual_seal(params);
249		task_manager.spawn_essential_handle().spawn_blocking(
250			"manual-seal",
251			None,
252			authorship_future,
253		);
254		let rpc_extensions_builder = {
255			let client = client.clone();
256			let transaction_pool = transaction_pool.clone();
257			let backend_for_rpc = backend.clone();
258
259			Box::new(move |_| {
260				let mut module = NodeSpec::BuildRpcExtensions::build_rpc_extensions(
261					client.clone(),
262					backend_for_rpc.clone(),
263					transaction_pool.clone(),
264					None,
265				)?;
266				module
267					.merge(ManualSeal::new(manual_seal_sink.clone()).into_rpc())
268					.map_err(|e| sc_service::Error::Application(e.into()))?;
269				Ok(module)
270			})
271		};
272
273		let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
274			network,
275			client: client.clone(),
276			keystore: keystore_container.keystore(),
277			task_manager: &mut task_manager,
278			transaction_pool: transaction_pool.clone(),
279			rpc_builder: rpc_extensions_builder,
280			backend,
281			system_rpc_tx,
282			tx_handler_controller,
283			sync_service,
284			config,
285			telemetry: telemetry.as_mut(),
286		})?;
287
288		Ok(task_manager)
289	}
290}