polkadot_omni_node_lib/nodes/
manual_seal.rs1use crate::common::{
18 rpc::BuildRpcExtensions as BuildRpcExtensionsT,
19 spec::{BaseNodeSpec, BuildImportQueue, ClientBlockImport, NodeSpec as NodeSpecT},
20 types::{Hash, ParachainBlockImport, ParachainClient},
21};
22use codec::Encode;
23use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
24use cumulus_primitives_aura::AuraUnincludedSegmentApi;
25use cumulus_primitives_core::CollectCollationInfo;
26use futures::FutureExt;
27use polkadot_primitives::UpgradeGoAhead;
28use sc_client_api::Backend;
29use sc_consensus::{DefaultImportQueue, LongestChain};
30use sc_consensus_manual_seal::rpc::{ManualSeal, ManualSealApiServer};
31use sc_network::NetworkBackend;
32use sc_service::{Configuration, PartialComponents, TaskManager};
33use sc_telemetry::TelemetryHandle;
34use sc_transaction_pool_api::OffchainTransactionPoolFactory;
35use sp_api::{ApiExt, ProvideRuntimeApi};
36use sp_runtime::traits::Header;
37use std::{marker::PhantomData, sync::Arc};
38
39pub struct ManualSealNode<NodeSpec>(PhantomData<NodeSpec>);
40
41impl<NodeSpec: NodeSpecT>
42 BuildImportQueue<
43 NodeSpec::Block,
44 NodeSpec::RuntimeApi,
45 Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>,
46 > for ManualSealNode<NodeSpec>
47{
48 fn build_import_queue(
49 client: Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>,
50 _block_import: ParachainBlockImport<
51 NodeSpec::Block,
52 Arc<ParachainClient<NodeSpec::Block, NodeSpec::RuntimeApi>>,
53 >,
54 config: &Configuration,
55 _telemetry_handle: Option<TelemetryHandle>,
56 task_manager: &TaskManager,
57 ) -> sc_service::error::Result<DefaultImportQueue<NodeSpec::Block>> {
58 Ok(sc_consensus_manual_seal::import_queue(
59 Box::new(client.clone()),
60 &task_manager.spawn_essential_handle(),
61 config.prometheus_registry(),
62 ))
63 }
64}
65
66impl<NodeSpec: NodeSpecT> BaseNodeSpec for ManualSealNode<NodeSpec> {
67 type Block = NodeSpec::Block;
68 type RuntimeApi = NodeSpec::RuntimeApi;
69 type BuildImportQueue = Self;
70 type InitBlockImport = ClientBlockImport;
71}
72
73impl<NodeSpec: NodeSpecT> ManualSealNode<NodeSpec> {
74 pub fn new() -> Self {
75 Self(Default::default())
76 }
77
78 pub fn start_node<Net>(
79 &self,
80 mut config: Configuration,
81 block_time: u64,
82 ) -> sc_service::error::Result<TaskManager>
83 where
84 Net: NetworkBackend<NodeSpec::Block, Hash>,
85 {
86 let PartialComponents {
87 client,
88 backend,
89 mut task_manager,
90 import_queue,
91 keystore_container,
92 select_chain: _,
93 transaction_pool,
94 other: (_, mut telemetry, _, _),
95 } = Self::new_partial(&config)?;
96 let select_chain = LongestChain::new(backend.clone());
97
98 let para_id =
99 Self::parachain_id(&client, &config).ok_or("Failed to retrieve the parachain id")?;
100
101 config.network.default_peers_set.in_peers = 0;
103 config.network.default_peers_set.out_peers = 0;
104 let net_config = sc_network::config::FullNetworkConfiguration::<_, _, Net>::new(
105 &config.network,
106 config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()),
107 );
108 let metrics = Net::register_notification_metrics(
109 config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
110 );
111
112 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
113 sc_service::build_network(sc_service::BuildNetworkParams {
114 config: &config,
115 client: client.clone(),
116 transaction_pool: transaction_pool.clone(),
117 spawn_handle: task_manager.spawn_handle(),
118 import_queue,
119 net_config,
120 block_announce_validator_builder: None,
121 warp_sync_config: None,
122 block_relay: None,
123 metrics,
124 })?;
125
126 if config.offchain_worker.enabled {
127 let offchain_workers =
128 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
129 runtime_api_provider: client.clone(),
130 keystore: Some(keystore_container.keystore()),
131 offchain_db: backend.offchain_storage(),
132 transaction_pool: Some(OffchainTransactionPoolFactory::new(
133 transaction_pool.clone(),
134 )),
135 network_provider: Arc::new(network.clone()),
136 is_validator: config.role.is_authority(),
137 enable_http_requests: true,
138 custom_extensions: move |_| vec![],
139 })?;
140 task_manager.spawn_handle().spawn(
141 "offchain-workers-runner",
142 "offchain-work",
143 offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
144 );
145 }
146
147 let proposer = sc_basic_authorship::ProposerFactory::new(
148 task_manager.spawn_handle(),
149 client.clone(),
150 transaction_pool.clone(),
151 None,
152 None,
153 );
154
155 let (manual_seal_sink, manual_seal_stream) = futures::channel::mpsc::channel(1024);
156 let mut manual_seal_sink_clone = manual_seal_sink.clone();
157 task_manager
158 .spawn_essential_handle()
159 .spawn("block_authoring", None, async move {
160 loop {
161 futures_timer::Delay::new(std::time::Duration::from_millis(block_time)).await;
162 manual_seal_sink_clone
163 .try_send(sc_consensus_manual_seal::EngineCommand::SealNewBlock {
164 create_empty: true,
165 finalize: true,
166 parent_hash: None,
167 sender: None,
168 })
169 .unwrap();
170 }
171 });
172
173 let client_for_cidp = client.clone();
174 let params = sc_consensus_manual_seal::ManualSealParams {
175 block_import: client.clone(),
176 env: proposer,
177 client: client.clone(),
178 pool: transaction_pool.clone(),
179 select_chain,
180 commands_stream: Box::pin(manual_seal_stream),
181 consensus_data_provider: None,
182 create_inherent_data_providers: move |block: Hash, ()| {
183 let current_para_head = client_for_cidp
184 .header(block)
185 .expect("Header lookup should succeed")
186 .expect("Header passed in as parent should be present in backend.");
187
188 let should_send_go_ahead = client_for_cidp
189 .runtime_api()
190 .collect_collation_info(block, ¤t_para_head)
191 .map(|info| info.new_validation_code.is_some())
192 .unwrap_or_default();
193
194 let requires_relay_progress = client_for_cidp
199 .runtime_api()
200 .has_api_with::<dyn AuraUnincludedSegmentApi<NodeSpec::Block>, _>(
201 block,
202 |version| version > 1,
203 )
204 .ok()
205 .unwrap_or_default();
206
207 let current_para_block_head =
208 Some(polkadot_primitives::HeadData(current_para_head.encode()));
209 let client_for_xcm = client_for_cidp.clone();
210 async move {
211 use sp_runtime::traits::UniqueSaturatedInto;
212
213 let mocked_parachain = MockValidationDataInherentDataProvider {
214 current_para_block: UniqueSaturatedInto::<u32>::unique_saturated_into(
217 *current_para_head.number(),
218 ),
219 para_id,
220 current_para_block_head,
221 relay_offset: 0,
222 relay_blocks_per_para_block: requires_relay_progress
223 .then(|| 1)
224 .unwrap_or_default(),
225 para_blocks_per_relay_epoch: 10,
226 relay_randomness_config: (),
227 xcm_config: MockXcmConfig::new(&*client_for_xcm, block, Default::default()),
228 raw_downward_messages: vec![],
229 raw_horizontal_messages: vec![],
230 additional_key_values: None,
231 upgrade_go_ahead: should_send_go_ahead.then(|| {
232 log::info!(
233 "Detected pending validation code, sending go-ahead signal."
234 );
235 UpgradeGoAhead::GoAhead
236 }),
237 };
238 Ok((
239 sp_timestamp::InherentDataProvider::new(sp_timestamp::Timestamp::new(0)),
243 mocked_parachain,
244 ))
245 }
246 },
247 };
248 let authorship_future = sc_consensus_manual_seal::run_manual_seal(params);
249 task_manager.spawn_essential_handle().spawn_blocking(
250 "manual-seal",
251 None,
252 authorship_future,
253 );
254 let rpc_extensions_builder = {
255 let client = client.clone();
256 let transaction_pool = transaction_pool.clone();
257 let backend_for_rpc = backend.clone();
258
259 Box::new(move |_| {
260 let mut module = NodeSpec::BuildRpcExtensions::build_rpc_extensions(
261 client.clone(),
262 backend_for_rpc.clone(),
263 transaction_pool.clone(),
264 None,
265 )?;
266 module
267 .merge(ManualSeal::new(manual_seal_sink.clone()).into_rpc())
268 .map_err(|e| sc_service::Error::Application(e.into()))?;
269 Ok(module)
270 })
271 };
272
273 let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
274 network,
275 client: client.clone(),
276 keystore: keystore_container.keystore(),
277 task_manager: &mut task_manager,
278 transaction_pool: transaction_pool.clone(),
279 rpc_builder: rpc_extensions_builder,
280 backend,
281 system_rpc_tx,
282 tx_handler_controller,
283 sync_service,
284 config,
285 telemetry: telemetry.as_mut(),
286 })?;
287
288 Ok(task_manager)
289 }
290}