cumulus_client_collator/
lib.rs1use polkadot_node_primitives::CollationGenerationConfig;
20use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
21use polkadot_overseer::Handle as OverseerHandle;
22use polkadot_primitives::{CollatorPair, Id as ParaId};
23pub mod service;
24
25pub mod relay_chain_driven {
32 use futures::{
33 channel::{mpsc, oneshot},
34 prelude::*,
35 };
36 use polkadot_node_primitives::{CollationGenerationConfig, CollationResult};
37 use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
38 use polkadot_overseer::Handle as OverseerHandle;
39 use polkadot_primitives::{CollatorPair, Id as ParaId};
40
41 use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
42
43 pub struct CollationRequest {
47 relay_parent: PHash,
48 pvd: PersistedValidationData,
49 sender: oneshot::Sender<Option<CollationResult>>,
50 }
51
52 impl CollationRequest {
53 pub fn relay_parent(&self) -> &PHash {
55 &self.relay_parent
56 }
57
58 pub fn persisted_validation_data(&self) -> &PersistedValidationData {
60 &self.pvd
61 }
62
63 pub fn complete(self, collation: Option<CollationResult>) {
65 let _ = self.sender.send(collation);
66 }
67 }
68
69 pub async fn init(
72 key: CollatorPair,
73 para_id: ParaId,
74 overseer_handle: OverseerHandle,
75 ) -> mpsc::Receiver<CollationRequest> {
76 let mut overseer_handle = overseer_handle;
77
78 let (stream_tx, stream_rx) = mpsc::channel(0);
79 let config = CollationGenerationConfig {
80 key,
81 para_id,
82 collator: Some(Box::new(move |relay_parent, validation_data| {
83 let mut stream_tx = stream_tx.clone();
88 let validation_data = validation_data.clone();
89 Box::pin(async move {
90 let (this_tx, this_rx) = oneshot::channel();
91 let request =
92 CollationRequest { relay_parent, pvd: validation_data, sender: this_tx };
93
94 if stream_tx.send(request).await.is_err() {
95 return None;
96 }
97
98 this_rx.await.ok().flatten()
99 })
100 })),
101 };
102
103 overseer_handle
104 .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
105 .await;
106
107 overseer_handle
108 .send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
109 .await;
110
111 stream_rx
112 }
113}
114
115pub async fn initialize_collator_subsystems(
120 overseer_handle: &mut OverseerHandle,
121 key: CollatorPair,
122 para_id: ParaId,
123 reinitialize: bool,
124) {
125 let config = CollationGenerationConfig { key, para_id, collator: None };
126
127 if reinitialize {
128 overseer_handle
129 .send_msg(CollationGenerationMessage::Reinitialize(config), "StartCollator")
130 .await;
131 } else {
132 overseer_handle
133 .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
134 .await;
135 }
136
137 overseer_handle
138 .send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
139 .await;
140}