cumulus_relay_chain_minimal_node/
collator_overseer.rs1use futures::{select, StreamExt};
19use std::sync::Arc;
20
21use polkadot_overseer::{
22 BlockInfo, Handle, Overseer, OverseerConnector, OverseerHandle, SpawnGlue, UnpinHandle,
23};
24use polkadot_service::overseer::{collator_overseer_builder, OverseerGenArgs};
25
26use sc_network::{request_responses::IncomingRequest, service::traits::NetworkService};
27use sc_service::TaskManager;
28use sc_utils::mpsc::tracing_unbounded;
29
30use cumulus_relay_chain_interface::RelayChainError;
31
32use crate::BlockChainRpcClient;
33
34fn build_overseer(
35 connector: OverseerConnector,
36 args: OverseerGenArgs<sc_service::SpawnTaskHandle, BlockChainRpcClient>,
37) -> Result<
38 (Overseer<SpawnGlue<sc_service::SpawnTaskHandle>, Arc<BlockChainRpcClient>>, OverseerHandle),
39 RelayChainError,
40> {
41 let builder =
42 collator_overseer_builder(args).map_err(|e| RelayChainError::Application(e.into()))?;
43
44 builder
45 .build_with_connector(connector)
46 .map_err(|e| RelayChainError::Application(e.into()))
47}
48
49pub(crate) fn spawn_overseer(
50 overseer_args: OverseerGenArgs<sc_service::SpawnTaskHandle, BlockChainRpcClient>,
51 task_manager: &TaskManager,
52 relay_chain_rpc_client: Arc<BlockChainRpcClient>,
53) -> Result<polkadot_overseer::Handle, RelayChainError> {
54 let (overseer, overseer_handle) = build_overseer(OverseerConnector::default(), overseer_args)
55 .map_err(|e| {
56 tracing::error!("Failed to initialize overseer: {}", e);
57 e
58 })?;
59
60 let overseer_handle = Handle::new(overseer_handle);
61 {
62 let handle = overseer_handle.clone();
63 task_manager.spawn_essential_handle().spawn_blocking(
64 "overseer",
65 None,
66 Box::pin(async move {
67 use futures::{pin_mut, FutureExt};
68
69 let forward = forward_collator_events(relay_chain_rpc_client, handle).fuse();
70
71 let overseer_fut = overseer.run().fuse();
72
73 pin_mut!(overseer_fut);
74 pin_mut!(forward);
75
76 select! {
77 _ = forward => (),
78 _ = overseer_fut => (),
79 }
80 }),
81 );
82 }
83 Ok(overseer_handle)
84}
85
86pub struct NewMinimalNode {
88 pub task_manager: TaskManager,
90 pub overseer_handle: Handle,
92 pub network_service: Arc<dyn NetworkService>,
94 pub paranode_rx: async_channel::Receiver<IncomingRequest>,
96}
97
98async fn forward_collator_events(
101 client: Arc<BlockChainRpcClient>,
102 mut handle: Handle,
103) -> Result<(), RelayChainError> {
104 let mut finality = client.finality_notification_stream().await?.fuse();
105 let mut imports = client.import_notification_stream().await?.fuse();
106 let (dummy_sink, _) = tracing_unbounded("does-not-matter", 42);
108 let dummy_unpin_handle = UnpinHandle::new(Default::default(), dummy_sink);
109
110 loop {
111 select! {
112 f = finality.next() => {
113 match f {
114 Some(header) => {
115 let hash = header.hash();
116 tracing::info!(
117 target: "minimal-polkadot-node",
118 "Received finalized block via RPC: #{} ({} -> {})",
119 header.number,
120 header.parent_hash,
121 hash,
122 );
123 let unpin_handle = dummy_unpin_handle.clone();
124 let block_info = BlockInfo { hash, parent_hash: header.parent_hash, number: header.number, unpin_handle };
125 handle.block_finalized(block_info).await;
126 }
127 None => return Err(RelayChainError::GenericError("Relay chain finality stream ended.".to_string())),
128 }
129 },
130 i = imports.next() => {
131 match i {
132 Some(header) => {
133 let hash = header.hash();
134 tracing::info!(
135 target: "minimal-polkadot-node",
136 "Received imported block via RPC: #{} ({} -> {})",
137 header.number,
138 header.parent_hash,
139 hash,
140 );
141 let unpin_handle = dummy_unpin_handle.clone();
142 let block_info = BlockInfo { hash, parent_hash: header.parent_hash, number: header.number, unpin_handle };
143 handle.block_imported(block_info).await;
144 }
145 None => return Err(RelayChainError::GenericError("Relay chain import stream ended.".to_string())),
146 }
147 }
148 }
149 }
150}