referrerpolicy=no-referrer-when-downgrade

polkadot_availability_distribution/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use futures::{future::Either, FutureExt, StreamExt, TryFutureExt};
18
19use sp_keystore::KeystorePtr;
20
21use polkadot_node_network_protocol::request_response::{
22	v1, v2, IncomingRequestReceiver, ReqProtocolNames,
23};
24use polkadot_node_subsystem::{
25	messages::AvailabilityDistributionMessage, overseer, FromOrchestra, OverseerSignal,
26	SpawnedSubsystem, SubsystemError,
27};
28
29/// Error and [`Result`] type for this subsystem.
30mod error;
31use error::{log_error, FatalError, Result};
32
33use polkadot_node_subsystem_util::runtime::RuntimeInfo;
34
35/// `Requester` taking care of requesting chunks for candidates pending availability.
36mod requester;
37use requester::Requester;
38
39/// Handing requests for PoVs during backing.
40mod pov_requester;
41
42/// Responding to erasure chunk requests:
43mod responder;
44use responder::{run_chunk_receivers, run_pov_receiver};
45
46mod metrics;
47/// Prometheus `Metrics` for availability distribution.
48pub use metrics::Metrics;
49
50#[cfg(test)]
51mod tests;
52
53const LOG_TARGET: &'static str = "parachain::availability-distribution";
54
55/// The availability distribution subsystem.
56pub struct AvailabilityDistributionSubsystem {
57	/// Easy and efficient runtime access for this subsystem.
58	runtime: RuntimeInfo,
59	/// Receivers to receive messages from.
60	recvs: IncomingRequestReceivers,
61	/// Mapping of the req-response protocols to the full protocol names.
62	req_protocol_names: ReqProtocolNames,
63	/// Prometheus metrics.
64	metrics: Metrics,
65}
66
67/// Receivers to be passed into availability distribution.
68pub struct IncomingRequestReceivers {
69	/// Receiver for incoming PoV requests.
70	pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
71	/// Receiver for incoming v1 availability chunk requests.
72	pub chunk_req_v1_receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
73	/// Receiver for incoming v2 availability chunk requests.
74	pub chunk_req_v2_receiver: IncomingRequestReceiver<v2::ChunkFetchingRequest>,
75}
76
77#[overseer::subsystem(AvailabilityDistribution, error=SubsystemError, prefix=self::overseer)]
78impl<Context> AvailabilityDistributionSubsystem {
79	fn start(self, ctx: Context) -> SpawnedSubsystem {
80		let future = self
81			.run(ctx)
82			.map_err(|e| SubsystemError::with_origin("availability-distribution", e))
83			.boxed();
84
85		SpawnedSubsystem { name: "availability-distribution-subsystem", future }
86	}
87}
88
89#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
90impl AvailabilityDistributionSubsystem {
91	/// Create a new instance of the availability distribution.
92	pub fn new(
93		keystore: KeystorePtr,
94		recvs: IncomingRequestReceivers,
95		req_protocol_names: ReqProtocolNames,
96		metrics: Metrics,
97	) -> Self {
98		let runtime = RuntimeInfo::new(Some(keystore));
99		Self { runtime, recvs, req_protocol_names, metrics }
100	}
101
102	/// Start processing work as passed on from the Overseer.
103	async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> {
104		let Self { mut runtime, recvs, metrics, req_protocol_names } = self;
105
106		let IncomingRequestReceivers {
107			pov_req_receiver,
108			chunk_req_v1_receiver,
109			chunk_req_v2_receiver,
110		} = recvs;
111		let mut requester = Requester::new(req_protocol_names, metrics.clone()).fuse();
112		let mut warn_freq = gum::Freq::new();
113
114		{
115			let sender = ctx.sender().clone();
116			ctx.spawn(
117				"pov-receiver",
118				run_pov_receiver(sender.clone(), pov_req_receiver, metrics.clone()).boxed(),
119			)
120			.map_err(FatalError::SpawnTask)?;
121
122			ctx.spawn(
123				"chunk-receiver",
124				run_chunk_receivers(
125					sender,
126					chunk_req_v1_receiver,
127					chunk_req_v2_receiver,
128					metrics.clone(),
129				)
130				.boxed(),
131			)
132			.map_err(FatalError::SpawnTask)?;
133		}
134
135		loop {
136			let action = {
137				let mut subsystem_next = ctx.recv().fuse();
138				futures::select! {
139					subsystem_msg = subsystem_next => Either::Left(subsystem_msg),
140					from_task = requester.next() => Either::Right(from_task),
141				}
142			};
143
144			// Handle task messages sending:
145			let message = match action {
146				Either::Left(subsystem_msg) =>
147					subsystem_msg.map_err(|e| FatalError::IncomingMessageChannel(e))?,
148				Either::Right(from_task) => {
149					let from_task = from_task.ok_or(FatalError::RequesterExhausted)?;
150					ctx.send_message(from_task).await;
151					continue
152				},
153			};
154			match message {
155				FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
156					log_error(
157						requester
158							.get_mut()
159							.update_fetching_heads(&mut ctx, &mut runtime, update)
160							.await,
161						"Error in Requester::update_fetching_heads",
162						&mut warn_freq,
163					)?;
164				},
165				FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, _finalized_number)) => {
166				},
167				FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
168				FromOrchestra::Communication {
169					msg:
170						AvailabilityDistributionMessage::FetchPoV {
171							relay_parent,
172							from_validator,
173							para_id,
174							candidate_hash,
175							pov_hash,
176							tx,
177						},
178				} => {
179					log_error(
180						pov_requester::fetch_pov(
181							&mut ctx,
182							&mut runtime,
183							relay_parent,
184							from_validator,
185							para_id,
186							candidate_hash,
187							pov_hash,
188							tx,
189							metrics.clone(),
190						)
191						.await,
192						"pov_requester::fetch_pov",
193						&mut warn_freq,
194					)?;
195				},
196			}
197		}
198	}
199}