1use std::{
19 collections::{BTreeMap, VecDeque},
20 pin::Pin,
21 sync::Arc,
22 time::Duration,
23};
24
25use async_trait::async_trait;
26use cumulus_client_bootnodes::bootnode_request_response_config;
27use cumulus_primitives_core::{
28 relay_chain::{
29 runtime_api::ParachainHost,
30 vstaging::{CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreState},
31 Block as PBlock, BlockId, BlockNumber, CoreIndex, Hash as PHash, Header as PHeader,
32 InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId,
33 },
34 InboundDownwardMessage, ParaId, PersistedValidationData,
35};
36use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
37use futures::{FutureExt, Stream, StreamExt};
38use polkadot_primitives::vstaging::CandidateEvent;
39use polkadot_service::{
40 builder::PolkadotServiceBuilder, CollatorOverseerGen, CollatorPair, Configuration, FullBackend,
41 FullClient, Handle, NewFull, NewFullParams, TaskManager,
42};
43use sc_cli::{RuntimeVersion, SubstrateCli};
44use sc_client_api::{
45 blockchain::BlockStatus, Backend, BlockchainEvents, HeaderBackend, ImportNotifications,
46 StorageProof, TrieCacheContext,
47};
48use sc_network::{
49 config::NetworkBackendType,
50 request_responses::IncomingRequest,
51 service::traits::{NetworkBackend, NetworkService},
52};
53use sc_telemetry::TelemetryWorkerHandle;
54use sp_api::{CallApiAt, CallApiAtParams, CallContext, ProvideRuntimeApi};
55use sp_consensus::SyncOracle;
56use sp_core::Pair;
57use sp_state_machine::{Backend as StateBackend, StorageValue};
58
59const TIMEOUT_IN_SECONDS: u64 = 6;
61
62#[derive(Clone)]
65pub struct RelayChainInProcessInterface {
66 full_client: Arc<FullClient>,
67 backend: Arc<FullBackend>,
68 sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
69 overseer_handle: Handle,
70}
71
72impl RelayChainInProcessInterface {
73 pub fn new(
75 full_client: Arc<FullClient>,
76 backend: Arc<FullBackend>,
77 sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
78 overseer_handle: Handle,
79 ) -> Self {
80 Self { full_client, backend, sync_oracle, overseer_handle }
81 }
82}
83
84#[async_trait]
85impl RelayChainInterface for RelayChainInProcessInterface {
86 async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion> {
87 Ok(self.full_client.runtime_version_at(relay_parent)?)
88 }
89
90 async fn retrieve_dmq_contents(
91 &self,
92 para_id: ParaId,
93 relay_parent: PHash,
94 ) -> RelayChainResult<Vec<InboundDownwardMessage>> {
95 Ok(self.full_client.runtime_api().dmq_contents(relay_parent, para_id)?)
96 }
97
98 async fn retrieve_all_inbound_hrmp_channel_contents(
99 &self,
100 para_id: ParaId,
101 relay_parent: PHash,
102 ) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
103 Ok(self
104 .full_client
105 .runtime_api()
106 .inbound_hrmp_channels_contents(relay_parent, para_id)?)
107 }
108
109 async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
110 let hash = match block_id {
111 BlockId::Hash(hash) => hash,
112 BlockId::Number(num) =>
113 if let Some(hash) = self.full_client.hash(num)? {
114 hash
115 } else {
116 return Ok(None)
117 },
118 };
119 let header = self.full_client.header(hash)?;
120
121 Ok(header)
122 }
123
124 async fn persisted_validation_data(
125 &self,
126 hash: PHash,
127 para_id: ParaId,
128 occupied_core_assumption: OccupiedCoreAssumption,
129 ) -> RelayChainResult<Option<PersistedValidationData>> {
130 Ok(self.full_client.runtime_api().persisted_validation_data(
131 hash,
132 para_id,
133 occupied_core_assumption,
134 )?)
135 }
136
137 async fn validation_code_hash(
138 &self,
139 hash: PHash,
140 para_id: ParaId,
141 occupied_core_assumption: OccupiedCoreAssumption,
142 ) -> RelayChainResult<Option<ValidationCodeHash>> {
143 Ok(self.full_client.runtime_api().validation_code_hash(
144 hash,
145 para_id,
146 occupied_core_assumption,
147 )?)
148 }
149
150 async fn candidate_pending_availability(
151 &self,
152 hash: PHash,
153 para_id: ParaId,
154 ) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
155 Ok(self
156 .full_client
157 .runtime_api()
158 .candidate_pending_availability(hash, para_id)?
159 .map(|receipt| receipt.into()))
160 }
161
162 async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult<SessionIndex> {
163 Ok(self.full_client.runtime_api().session_index_for_child(hash)?)
164 }
165
166 async fn validators(&self, hash: PHash) -> RelayChainResult<Vec<ValidatorId>> {
167 Ok(self.full_client.runtime_api().validators(hash)?)
168 }
169
170 async fn import_notification_stream(
171 &self,
172 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
173 let notification_stream = self
174 .full_client
175 .import_notification_stream()
176 .map(|notification| notification.header);
177 Ok(Box::pin(notification_stream))
178 }
179
180 async fn finality_notification_stream(
181 &self,
182 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
183 let notification_stream = self
184 .full_client
185 .finality_notification_stream()
186 .map(|notification| notification.header);
187 Ok(Box::pin(notification_stream))
188 }
189
190 async fn best_block_hash(&self) -> RelayChainResult<PHash> {
191 Ok(self.backend.blockchain().info().best_hash)
192 }
193
194 async fn finalized_block_hash(&self) -> RelayChainResult<PHash> {
195 Ok(self.backend.blockchain().info().finalized_hash)
196 }
197
198 async fn call_runtime_api(
199 &self,
200 method_name: &'static str,
201 hash: PHash,
202 payload: &[u8],
203 ) -> RelayChainResult<Vec<u8>> {
204 Ok(self.full_client.call_api_at(CallApiAtParams {
205 at: hash,
206 function: method_name,
207 arguments: payload.to_vec(),
208 overlayed_changes: &Default::default(),
209 call_context: CallContext::Offchain,
210 recorder: &None,
211 extensions: &Default::default(),
212 })?)
213 }
214
215 async fn is_major_syncing(&self) -> RelayChainResult<bool> {
216 Ok(self.sync_oracle.is_major_syncing())
217 }
218
219 fn overseer_handle(&self) -> RelayChainResult<Handle> {
220 Ok(self.overseer_handle.clone())
221 }
222
223 async fn get_storage_by_key(
224 &self,
225 relay_parent: PHash,
226 key: &[u8],
227 ) -> RelayChainResult<Option<StorageValue>> {
228 let state = self.backend.state_at(relay_parent, TrieCacheContext::Untrusted)?;
229 state.storage(key).map_err(RelayChainError::GenericError)
230 }
231
232 async fn prove_read(
233 &self,
234 relay_parent: PHash,
235 relevant_keys: &Vec<Vec<u8>>,
236 ) -> RelayChainResult<StorageProof> {
237 let state_backend = self.backend.state_at(relay_parent, TrieCacheContext::Untrusted)?;
238
239 sp_state_machine::prove_read(state_backend, relevant_keys)
240 .map_err(RelayChainError::StateMachineError)
241 }
242
243 async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
262 let mut listener =
263 match check_block_in_chain(self.backend.clone(), self.full_client.clone(), hash)? {
264 BlockCheckStatus::InChain => return Ok(()),
265 BlockCheckStatus::Unknown(listener) => listener,
266 };
267
268 let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
269
270 loop {
271 futures::select! {
272 _ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
273 evt = listener.next() => match evt {
274 Some(evt) if evt.hash == hash => return Ok(()),
275 Some(_) => continue,
277 None => return Err(RelayChainError::ImportListenerClosed(hash)),
278 }
279 }
280 }
281 }
282
283 async fn new_best_notification_stream(
284 &self,
285 ) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
286 let notifications_stream =
287 self.full_client
288 .import_notification_stream()
289 .filter_map(|notification| async move {
290 notification.is_new_best.then_some(notification.header)
291 });
292 Ok(Box::pin(notifications_stream))
293 }
294
295 async fn availability_cores(
296 &self,
297 relay_parent: PHash,
298 ) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>> {
299 Ok(self
300 .full_client
301 .runtime_api()
302 .availability_cores(relay_parent)?
303 .into_iter()
304 .map(|core_state| core_state.into())
305 .collect::<Vec<_>>())
306 }
307
308 async fn candidates_pending_availability(
309 &self,
310 hash: PHash,
311 para_id: ParaId,
312 ) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
313 Ok(self
314 .full_client
315 .runtime_api()
316 .candidates_pending_availability(hash, para_id)?
317 .into_iter()
318 .map(|receipt| receipt.into())
319 .collect::<Vec<_>>())
320 }
321
322 async fn claim_queue(
323 &self,
324 hash: PHash,
325 ) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>> {
326 Ok(self.full_client.runtime_api().claim_queue(hash)?)
327 }
328
329 async fn scheduling_lookahead(&self, hash: PHash) -> RelayChainResult<u32> {
330 Ok(self.full_client.runtime_api().scheduling_lookahead(hash)?)
331 }
332
333 async fn candidate_events(&self, hash: PHash) -> RelayChainResult<Vec<CandidateEvent>> {
334 Ok(self.full_client.runtime_api().candidate_events(hash)?)
335 }
336}
337
338pub enum BlockCheckStatus {
339 InChain,
341 Unknown(ImportNotifications<PBlock>),
343}
344
345pub fn check_block_in_chain(
347 backend: Arc<FullBackend>,
348 client: Arc<FullClient>,
349 hash: PHash,
350) -> RelayChainResult<BlockCheckStatus> {
351 let _lock = backend.get_import_lock().read();
352
353 if backend.blockchain().status(hash)? == BlockStatus::InChain {
354 return Ok(BlockCheckStatus::InChain)
355 }
356
357 let listener = client.import_notification_stream();
358
359 Ok(BlockCheckStatus::Unknown(listener))
360}
361
362fn build_polkadot_with_paranode_protocol<Network>(
364 config: Configuration,
365 params: NewFullParams<CollatorOverseerGen>,
366) -> Result<(NewFull, async_channel::Receiver<IncomingRequest>), polkadot_service::Error>
367where
368 Network: NetworkBackend<PBlock, PHash>,
369{
370 let fork_id = config.chain_spec.fork_id().map(ToString::to_string);
371 let mut polkadot_builder = PolkadotServiceBuilder::<_, Network>::new(config, params)?;
372 let (config, request_receiver) = bootnode_request_response_config::<_, _, Network>(
373 polkadot_builder.genesis_hash(),
374 fork_id.as_deref(),
375 );
376 polkadot_builder.add_extra_request_response_protocol(config);
377
378 Ok((polkadot_builder.build()?, request_receiver))
379}
380
381#[sc_tracing::logging::prefix_logs_with("Relaychain")]
383fn build_polkadot_full_node(
384 config: Configuration,
385 parachain_config: &Configuration,
386 telemetry_worker_handle: Option<TelemetryWorkerHandle>,
387 hwbench: Option<sc_sysinfo::HwBench>,
388) -> Result<
389 (NewFull, Option<CollatorPair>, async_channel::Receiver<IncomingRequest>),
390 polkadot_service::Error,
391> {
392 let (is_parachain_node, maybe_collator_key) = if parachain_config.role.is_authority() {
393 let collator_key = CollatorPair::generate().0;
394 (polkadot_service::IsParachainNode::Collator(collator_key.clone()), Some(collator_key))
395 } else {
396 (polkadot_service::IsParachainNode::FullNode, None)
397 };
398
399 let new_full_params = polkadot_service::NewFullParams {
400 is_parachain_node,
401 enable_beefy: false,
403 force_authoring_backoff: false,
404 telemetry_worker_handle,
405
406 node_version: None,
408 secure_validator_mode: false,
409 workers_path: None,
410 workers_names: None,
411
412 overseer_gen: CollatorOverseerGen,
413 overseer_message_channel_capacity_override: None,
414 malus_finality_delay: None,
415 hwbench,
416 execute_workers_max_num: None,
417 prepare_workers_hard_max_num: None,
418 prepare_workers_soft_max_num: None,
419 keep_finalized_for: None,
420 };
421
422 let (relay_chain_full_node, paranode_req_receiver) = match config.network.network_backend {
423 NetworkBackendType::Libp2p => build_polkadot_with_paranode_protocol::<
424 sc_network::NetworkWorker<_, _>,
425 >(config, new_full_params)?,
426 NetworkBackendType::Litep2p => build_polkadot_with_paranode_protocol::<
427 sc_network::Litep2pNetworkBackend,
428 >(config, new_full_params)?,
429 };
430
431 Ok((relay_chain_full_node, maybe_collator_key, paranode_req_receiver))
432}
433
434pub fn build_inprocess_relay_chain(
436 mut polkadot_config: Configuration,
437 parachain_config: &Configuration,
438 telemetry_worker_handle: Option<TelemetryWorkerHandle>,
439 task_manager: &mut TaskManager,
440 hwbench: Option<sc_sysinfo::HwBench>,
441) -> RelayChainResult<(
442 Arc<(dyn RelayChainInterface + 'static)>,
443 Option<CollatorPair>,
444 Arc<dyn NetworkService>,
445 async_channel::Receiver<IncomingRequest>,
446)> {
447 polkadot_config.impl_version = polkadot_cli::Cli::impl_version();
450 polkadot_config.impl_name = polkadot_cli::Cli::impl_name();
451
452 let (full_node, collator_key, paranode_req_receiver) = build_polkadot_full_node(
453 polkadot_config,
454 parachain_config,
455 telemetry_worker_handle,
456 hwbench,
457 )
458 .map_err(|e| RelayChainError::Application(Box::new(e) as Box<_>))?;
459
460 let relay_chain_interface = Arc::new(RelayChainInProcessInterface::new(
461 full_node.client,
462 full_node.backend,
463 full_node.sync_service,
464 full_node.overseer_handle.clone().ok_or(RelayChainError::GenericError(
465 "Overseer not running in full node.".to_string(),
466 ))?,
467 ));
468
469 task_manager.add_child(full_node.task_manager);
470
471 Ok((relay_chain_interface, collator_key, full_node.network, paranode_req_receiver))
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 use polkadot_primitives::Block as PBlock;
479 use polkadot_test_client::{
480 construct_transfer_extrinsic, BlockBuilderExt, Client, ClientBlockImportExt,
481 DefaultTestClientBuilderExt, InitPolkadotBlockBuilder, TestClientBuilder,
482 TestClientBuilderExt,
483 };
484 use sp_consensus::{BlockOrigin, SyncOracle};
485 use sp_runtime::traits::Block as BlockT;
486 use std::sync::Arc;
487
488 use futures::{executor::block_on, poll, task::Poll};
489
490 struct DummyNetwork {}
491
492 impl SyncOracle for DummyNetwork {
493 fn is_major_syncing(&self) -> bool {
494 unimplemented!("Not needed for test")
495 }
496
497 fn is_offline(&self) -> bool {
498 unimplemented!("Not needed for test")
499 }
500 }
501
502 fn build_client_backend_and_block() -> (Arc<Client>, PBlock, RelayChainInProcessInterface) {
503 let builder = TestClientBuilder::new();
504 let backend = builder.backend();
505 let client = Arc::new(builder.build());
506
507 let block_builder = client.init_polkadot_block_builder();
508 let block = block_builder.build().expect("Finalizes the block").block;
509 let dummy_network: Arc<dyn SyncOracle + Sync + Send> = Arc::new(DummyNetwork {});
510
511 let (tx, _rx) = metered::channel(30);
512 let mock_handle = Handle::new(tx);
513 (
514 client.clone(),
515 block,
516 RelayChainInProcessInterface::new(client, backend, dummy_network, mock_handle),
517 )
518 }
519
520 #[test]
521 fn returns_directly_for_available_block() {
522 let (client, block, relay_chain_interface) = build_client_backend_and_block();
523 let hash = block.hash();
524
525 block_on(client.import(BlockOrigin::Own, block)).expect("Imports the block");
526
527 block_on(async move {
528 assert!(matches!(
530 poll!(relay_chain_interface.wait_for_block(hash)),
531 Poll::Ready(Ok(()))
532 ));
533 });
534 }
535
536 #[test]
537 fn resolve_after_block_import_notification_was_received() {
538 let (client, block, relay_chain_interface) = build_client_backend_and_block();
539 let hash = block.hash();
540
541 block_on(async move {
542 let mut future = relay_chain_interface.wait_for_block(hash);
543 assert!(poll!(&mut future).is_pending());
545
546 client.import(BlockOrigin::Own, block).await.expect("Imports the block");
548
549 assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
551 });
552 }
553
554 #[test]
555 fn wait_for_block_time_out_when_block_is_not_imported() {
556 let (_, block, relay_chain_interface) = build_client_backend_and_block();
557 let hash = block.hash();
558
559 assert!(matches!(
560 block_on(relay_chain_interface.wait_for_block(hash)),
561 Err(RelayChainError::WaitTimeout(_))
562 ));
563 }
564
565 #[test]
566 fn do_not_resolve_after_different_block_import_notification_was_received() {
567 let (client, block, relay_chain_interface) = build_client_backend_and_block();
568 let hash = block.hash();
569
570 let ext = construct_transfer_extrinsic(
571 &client,
572 sp_keyring::Sr25519Keyring::Alice,
573 sp_keyring::Sr25519Keyring::Bob,
574 1000,
575 );
576 let mut block_builder = client.init_polkadot_block_builder();
577 block_builder.push_polkadot_extrinsic(ext).expect("Push extrinsic");
579 let block2 = block_builder.build().expect("Build second block").block;
580 let hash2 = block2.hash();
581
582 block_on(async move {
583 let mut future = relay_chain_interface.wait_for_block(hash);
584 let mut future2 = relay_chain_interface.wait_for_block(hash2);
585 assert!(poll!(&mut future).is_pending());
587 assert!(poll!(&mut future2).is_pending());
588
589 client.import(BlockOrigin::Own, block2).await.expect("Imports the second block");
591
592 assert!(poll!(&mut future).is_pending());
594 assert!(matches!(poll!(future2), Poll::Ready(Ok(()))));
596
597 client.import(BlockOrigin::Own, block).await.expect("Imports the first block");
598
599 assert!(matches!(poll!(future), Poll::Ready(Ok(()))));
601 });
602 }
603}