referrerpolicy=no-referrer-when-downgrade

polkadot_statement_distribution/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Statement Distribution Subsystem.
18//!
19//! This is responsible for distributing signed statements about candidate
20//! validity among validators.
21
22#![warn(missing_docs)]
23
24use error::FatalResult;
25use std::time::Duration;
26
27use polkadot_node_network_protocol::request_response::{
28	v2::AttestedCandidateRequest, IncomingRequestReceiver,
29};
30use polkadot_node_subsystem::{
31	messages::StatementDistributionMessage, overseer, ActiveLeavesUpdate, FromOrchestra,
32	OverseerSignal, SpawnedSubsystem, SubsystemError,
33};
34use polkadot_node_subsystem_util::reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL};
35
36use futures::{channel::mpsc, prelude::*};
37use sp_keystore::KeystorePtr;
38
39use fatality::Nested;
40
41mod error;
42pub use error::{Error, FatalError, JfyiError, Result};
43
44/// Metrics for the statement distribution
45pub(crate) mod metrics;
46use metrics::Metrics;
47
48mod v2;
49
50const LOG_TARGET: &str = "parachain::statement-distribution";
51
52/// The statement distribution subsystem.
53pub struct StatementDistributionSubsystem {
54	/// Pointer to a keystore, which is required for determining this node's validator index.
55	keystore: KeystorePtr,
56	/// Receiver for incoming candidate requests.
57	req_receiver: Option<IncomingRequestReceiver<AttestedCandidateRequest>>,
58	/// Prometheus metrics
59	metrics: Metrics,
60	/// Aggregated reputation change
61	reputation: ReputationAggregator,
62}
63
64#[overseer::subsystem(StatementDistribution, error=SubsystemError, prefix=self::overseer)]
65impl<Context> StatementDistributionSubsystem {
66	fn start(self, ctx: Context) -> SpawnedSubsystem {
67		// Swallow error because failure is fatal to the node and we log with more precision
68		// within `run`.
69		SpawnedSubsystem {
70			name: "statement-distribution-subsystem",
71			future: self
72				.run(ctx)
73				.map_err(|e| SubsystemError::with_origin("statement-distribution", e))
74				.boxed(),
75		}
76	}
77}
78
79/// Messages to be handled in this subsystem.
80enum MuxedMessage {
81	/// Messages from other subsystems.
82	Subsystem(FatalResult<FromOrchestra<StatementDistributionMessage>>),
83	/// Messages from candidate responder background task.
84	Responder(Option<v2::ResponderMessage>),
85	/// Messages from answered requests.
86	Response(v2::UnhandledResponse),
87	/// Message that a request is ready to be retried. This just acts as a signal that we should
88	/// dispatch all pending requests again.
89	RetryRequest(()),
90}
91
92#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)]
93impl MuxedMessage {
94	async fn receive<Context>(
95		ctx: &mut Context,
96		state: &mut v2::State,
97		from_responder: &mut mpsc::Receiver<v2::ResponderMessage>,
98	) -> MuxedMessage {
99		let (request_manager, response_manager) = state.request_and_response_managers();
100		// We are only fusing here to make `select` happy, in reality we will quit if one of those
101		// streams end:
102		let from_orchestra = ctx.recv().fuse();
103		let from_responder = from_responder.next();
104		let receive_response = v2::receive_response(response_manager).fuse();
105		let retry_request = v2::next_retry(request_manager).fuse();
106		futures::pin_mut!(from_orchestra, from_responder, receive_response, retry_request,);
107		futures::select! {
108			msg = from_orchestra => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
109			msg = from_responder => MuxedMessage::Responder(msg),
110			msg = receive_response => MuxedMessage::Response(msg),
111			msg = retry_request => MuxedMessage::RetryRequest(msg),
112		}
113	}
114}
115
116#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)]
117impl StatementDistributionSubsystem {
118	/// Create a new Statement Distribution Subsystem
119	pub fn new(
120		keystore: KeystorePtr,
121		req_receiver: IncomingRequestReceiver<AttestedCandidateRequest>,
122		metrics: Metrics,
123	) -> Self {
124		Self { keystore, req_receiver: Some(req_receiver), metrics, reputation: Default::default() }
125	}
126
127	async fn run<Context>(self, ctx: Context) -> std::result::Result<(), FatalError> {
128		self.run_inner(ctx, REPUTATION_CHANGE_INTERVAL).await
129	}
130
131	async fn run_inner<Context>(
132		mut self,
133		mut ctx: Context,
134		reputation_interval: Duration,
135	) -> std::result::Result<(), FatalError> {
136		let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
137		let mut reputation_delay = new_reputation_delay();
138
139		let mut state = crate::v2::State::new(self.keystore.clone());
140
141		// Sender/receiver for getting news from our candidate responder task.
142		let (res_sender, mut res_receiver) = mpsc::channel(1);
143
144		ctx.spawn(
145			"candidate-responder",
146			v2::respond_task(
147				self.req_receiver.take().expect("Mandatory argument to new. qed"),
148				res_sender.clone(),
149				self.metrics.clone(),
150			)
151			.boxed(),
152		)
153		.map_err(FatalError::SpawnTask)?;
154
155		loop {
156			// Wait for the next message.
157			let message = futures::select! {
158				_ = reputation_delay => {
159					self.reputation.send(ctx.sender()).await;
160					reputation_delay = new_reputation_delay();
161					continue
162				},
163				message = MuxedMessage::receive(
164					&mut ctx,
165					&mut state,
166					&mut res_receiver,
167				).fuse() => {
168					message
169				}
170			};
171
172			match message {
173				MuxedMessage::Subsystem(result) => {
174					let result = self.handle_subsystem_message(&mut ctx, &mut state, result?).await;
175					match result.into_nested()? {
176						Ok(true) => break,
177						Ok(false) => {},
178						Err(jfyi) => gum::debug!(target: LOG_TARGET, error = ?jfyi),
179					}
180				},
181				MuxedMessage::Responder(result) => {
182					v2::answer_request(
183						&mut state,
184						result.ok_or(FatalError::RequesterReceiverFinished)?,
185					);
186				},
187				MuxedMessage::Response(result) => {
188					v2::handle_response(
189						&mut ctx,
190						&mut state,
191						result,
192						&mut self.reputation,
193						&self.metrics,
194					)
195					.await;
196				},
197				MuxedMessage::RetryRequest(()) => {
198					// A pending request is ready to retry. This is only a signal to call
199					// `dispatch_requests` again.
200					()
201				},
202			};
203
204			v2::dispatch_requests(&mut ctx, &mut state).await;
205		}
206		Ok(())
207	}
208
209	async fn handle_subsystem_message<Context>(
210		&mut self,
211		ctx: &mut Context,
212		state: &mut v2::State,
213		message: FromOrchestra<StatementDistributionMessage>,
214	) -> Result<bool> {
215		let metrics = &self.metrics;
216
217		match message {
218			FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
219				activated,
220				deactivated,
221			})) => {
222				let _timer = metrics.time_active_leaves_update();
223
224				if let Some(ref activated) = activated {
225					let res =
226						v2::handle_active_leaves_update(ctx, state, activated, &metrics).await;
227					// Regardless of the result of leaf activation, we always prune before
228					// handling it to avoid leaks.
229					v2::handle_deactivate_leaves(state, &deactivated);
230					res?;
231				} else {
232					v2::handle_deactivate_leaves(state, &deactivated);
233				}
234			},
235			FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {
236				// do nothing
237			},
238			FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(true),
239			FromOrchestra::Communication { msg } => match msg {
240				StatementDistributionMessage::Share(relay_parent, statement) => {
241					let _timer = metrics.time_share();
242
243					v2::share_local_statement(
244						ctx,
245						state,
246						relay_parent,
247						statement,
248						&mut self.reputation,
249						&self.metrics,
250					)
251					.await?;
252				},
253				StatementDistributionMessage::NetworkBridgeUpdate(event) => {
254					v2::handle_network_update(
255						ctx,
256						state,
257						event,
258						&mut self.reputation,
259						&self.metrics,
260					)
261					.await;
262				},
263				StatementDistributionMessage::Backed(candidate_hash) => {
264					crate::v2::handle_backed_candidate_message(
265						ctx,
266						state,
267						candidate_hash,
268						&self.metrics,
269					)
270					.await;
271				},
272			},
273		}
274		Ok(false)
275	}
276}