cumulus_client_collator/
lib.rs1use polkadot_node_primitives::CollationGenerationConfig;
21use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
22use polkadot_overseer::Handle as OverseerHandle;
23use polkadot_primitives::{CollatorPair, Id as ParaId};
24pub mod service;
25
26pub mod relay_chain_driven {
33 use futures::{
34 channel::{mpsc, oneshot},
35 prelude::*,
36 };
37 use polkadot_node_primitives::{CollationGenerationConfig, CollationResult};
38 use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
39 use polkadot_overseer::Handle as OverseerHandle;
40 use polkadot_primitives::{CollatorPair, Id as ParaId};
41
42 use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
43
44 pub struct CollationRequest {
48 relay_parent: PHash,
49 pvd: PersistedValidationData,
50 sender: oneshot::Sender<Option<CollationResult>>,
51 }
52
53 impl CollationRequest {
54 pub fn relay_parent(&self) -> &PHash {
56 &self.relay_parent
57 }
58
59 pub fn persisted_validation_data(&self) -> &PersistedValidationData {
61 &self.pvd
62 }
63
64 pub fn complete(self, collation: Option<CollationResult>) {
66 let _ = self.sender.send(collation);
67 }
68 }
69
70 pub async fn init(
73 key: CollatorPair,
74 para_id: ParaId,
75 overseer_handle: OverseerHandle,
76 ) -> mpsc::Receiver<CollationRequest> {
77 let mut overseer_handle = overseer_handle;
78
79 let (stream_tx, stream_rx) = mpsc::channel(0);
80 let config = CollationGenerationConfig {
81 key,
82 para_id,
83 collator: Some(Box::new(move |relay_parent, validation_data| {
84 let mut stream_tx = stream_tx.clone();
89 let validation_data = validation_data.clone();
90 Box::pin(async move {
91 let (this_tx, this_rx) = oneshot::channel();
92 let request =
93 CollationRequest { relay_parent, pvd: validation_data, sender: this_tx };
94
95 if stream_tx.send(request).await.is_err() {
96 return None
97 }
98
99 this_rx.await.ok().flatten()
100 })
101 })),
102 };
103
104 overseer_handle
105 .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
106 .await;
107
108 overseer_handle
109 .send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
110 .await;
111
112 stream_rx
113 }
114}
115
116pub async fn initialize_collator_subsystems(
121 overseer_handle: &mut OverseerHandle,
122 key: CollatorPair,
123 para_id: ParaId,
124 reinitialize: bool,
125) {
126 let config = CollationGenerationConfig { key, para_id, collator: None };
127
128 if reinitialize {
129 overseer_handle
130 .send_msg(CollationGenerationMessage::Reinitialize(config), "StartCollator")
131 .await;
132 } else {
133 overseer_handle
134 .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
135 .await;
136 }
137
138 overseer_handle
139 .send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
140 .await;
141}