polkadot_statement_distribution/
lib.rs1#![warn(missing_docs)]
23
24use error::FatalResult;
25use std::time::Duration;
26
27use polkadot_node_network_protocol::request_response::{
28 v2::AttestedCandidateRequest, IncomingRequestReceiver,
29};
30use polkadot_node_subsystem::{
31 messages::StatementDistributionMessage, overseer, ActiveLeavesUpdate, FromOrchestra,
32 OverseerSignal, SpawnedSubsystem, SubsystemError,
33};
34use polkadot_node_subsystem_util::reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL};
35
36use futures::{channel::mpsc, prelude::*};
37use sp_keystore::KeystorePtr;
38
39use fatality::Nested;
40
41mod error;
42pub use error::{Error, FatalError, JfyiError, Result};
43
44pub(crate) mod metrics;
46use metrics::Metrics;
47
48mod v2;
49
50const LOG_TARGET: &str = "parachain::statement-distribution";
51
52pub struct StatementDistributionSubsystem {
54 keystore: KeystorePtr,
56 req_receiver: Option<IncomingRequestReceiver<AttestedCandidateRequest>>,
58 metrics: Metrics,
60 reputation: ReputationAggregator,
62}
63
64#[overseer::subsystem(StatementDistribution, error=SubsystemError, prefix=self::overseer)]
65impl<Context> StatementDistributionSubsystem {
66 fn start(self, ctx: Context) -> SpawnedSubsystem {
67 SpawnedSubsystem {
70 name: "statement-distribution-subsystem",
71 future: self
72 .run(ctx)
73 .map_err(|e| SubsystemError::with_origin("statement-distribution", e))
74 .boxed(),
75 }
76 }
77}
78
79enum MuxedMessage {
81 Subsystem(FatalResult<FromOrchestra<StatementDistributionMessage>>),
83 Responder(Option<v2::ResponderMessage>),
85 Response(v2::UnhandledResponse),
87 RetryRequest(()),
90}
91
92#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)]
93impl MuxedMessage {
94 async fn receive<Context>(
95 ctx: &mut Context,
96 state: &mut v2::State,
97 from_responder: &mut mpsc::Receiver<v2::ResponderMessage>,
98 ) -> MuxedMessage {
99 let (request_manager, response_manager) = state.request_and_response_managers();
100 let from_orchestra = ctx.recv().fuse();
103 let from_responder = from_responder.next();
104 let receive_response = v2::receive_response(response_manager).fuse();
105 let retry_request = v2::next_retry(request_manager).fuse();
106 futures::pin_mut!(from_orchestra, from_responder, receive_response, retry_request,);
107 futures::select! {
108 msg = from_orchestra => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
109 msg = from_responder => MuxedMessage::Responder(msg),
110 msg = receive_response => MuxedMessage::Response(msg),
111 msg = retry_request => MuxedMessage::RetryRequest(msg),
112 }
113 }
114}
115
116#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)]
117impl StatementDistributionSubsystem {
118 pub fn new(
120 keystore: KeystorePtr,
121 req_receiver: IncomingRequestReceiver<AttestedCandidateRequest>,
122 metrics: Metrics,
123 ) -> Self {
124 Self { keystore, req_receiver: Some(req_receiver), metrics, reputation: Default::default() }
125 }
126
127 async fn run<Context>(self, ctx: Context) -> std::result::Result<(), FatalError> {
128 self.run_inner(ctx, REPUTATION_CHANGE_INTERVAL).await
129 }
130
131 async fn run_inner<Context>(
132 mut self,
133 mut ctx: Context,
134 reputation_interval: Duration,
135 ) -> std::result::Result<(), FatalError> {
136 let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
137 let mut reputation_delay = new_reputation_delay();
138
139 let mut state = crate::v2::State::new(self.keystore.clone());
140
141 let (res_sender, mut res_receiver) = mpsc::channel(1);
143
144 ctx.spawn(
145 "candidate-responder",
146 v2::respond_task(
147 self.req_receiver.take().expect("Mandatory argument to new. qed"),
148 res_sender.clone(),
149 self.metrics.clone(),
150 )
151 .boxed(),
152 )
153 .map_err(FatalError::SpawnTask)?;
154
155 loop {
156 let message = futures::select! {
158 _ = reputation_delay => {
159 self.reputation.send(ctx.sender()).await;
160 reputation_delay = new_reputation_delay();
161 continue
162 },
163 message = MuxedMessage::receive(
164 &mut ctx,
165 &mut state,
166 &mut res_receiver,
167 ).fuse() => {
168 message
169 }
170 };
171
172 match message {
173 MuxedMessage::Subsystem(result) => {
174 let result = self.handle_subsystem_message(&mut ctx, &mut state, result?).await;
175 match result.into_nested()? {
176 Ok(true) => break,
177 Ok(false) => {},
178 Err(jfyi) => gum::debug!(target: LOG_TARGET, error = ?jfyi),
179 }
180 },
181 MuxedMessage::Responder(result) => {
182 v2::answer_request(
183 &mut state,
184 result.ok_or(FatalError::RequesterReceiverFinished)?,
185 );
186 },
187 MuxedMessage::Response(result) => {
188 v2::handle_response(
189 &mut ctx,
190 &mut state,
191 result,
192 &mut self.reputation,
193 &self.metrics,
194 )
195 .await;
196 },
197 MuxedMessage::RetryRequest(()) => {
198 ()
201 },
202 };
203
204 v2::dispatch_requests(&mut ctx, &mut state).await;
205 }
206 Ok(())
207 }
208
209 async fn handle_subsystem_message<Context>(
210 &mut self,
211 ctx: &mut Context,
212 state: &mut v2::State,
213 message: FromOrchestra<StatementDistributionMessage>,
214 ) -> Result<bool> {
215 let metrics = &self.metrics;
216
217 match message {
218 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
219 activated,
220 deactivated,
221 })) => {
222 let _timer = metrics.time_active_leaves_update();
223
224 if let Some(ref activated) = activated {
225 let res =
226 v2::handle_active_leaves_update(ctx, state, activated, &metrics).await;
227 v2::handle_deactivate_leaves(state, &deactivated);
230 res?;
231 } else {
232 v2::handle_deactivate_leaves(state, &deactivated);
233 }
234 },
235 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {
236 },
238 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(true),
239 FromOrchestra::Communication { msg } => match msg {
240 StatementDistributionMessage::Share(relay_parent, statement) => {
241 let _timer = metrics.time_share();
242
243 v2::share_local_statement(
244 ctx,
245 state,
246 relay_parent,
247 statement,
248 &mut self.reputation,
249 &self.metrics,
250 )
251 .await?;
252 },
253 StatementDistributionMessage::NetworkBridgeUpdate(event) => {
254 v2::handle_network_update(
255 ctx,
256 state,
257 event,
258 &mut self.reputation,
259 &self.metrics,
260 )
261 .await;
262 },
263 StatementDistributionMessage::Backed(candidate_hash) => {
264 crate::v2::handle_backed_candidate_message(
265 ctx,
266 state,
267 candidate_hash,
268 &self.metrics,
269 )
270 .await;
271 },
272 },
273 }
274 Ok(false)
275 }
276}