referrerpolicy=no-referrer-when-downgrade

polkadot_availability_distribution/
responder.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Answer requests for availability chunks.
18
19use std::sync::Arc;
20
21use futures::{channel::oneshot, select, FutureExt};
22
23use codec::{Decode, Encode};
24use fatality::Nested;
25use polkadot_node_network_protocol::{
26	request_response::{v1, v2, IncomingRequest, IncomingRequestReceiver, IsRequest},
27	UnifiedReputationChange as Rep,
28};
29use polkadot_node_primitives::{AvailableData, ErasureChunk};
30use polkadot_node_subsystem::{messages::AvailabilityStoreMessage, SubsystemSender};
31use polkadot_primitives::{CandidateHash, ValidatorIndex};
32
33use crate::{
34	error::{JfyiError, Result},
35	metrics::{Metrics, FAILED, NOT_FOUND, SUCCEEDED},
36	LOG_TARGET,
37};
38
39const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
40
41/// Receiver task to be forked as a separate task to handle PoV requests.
42pub async fn run_pov_receiver<Sender>(
43	mut sender: Sender,
44	mut receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
45	metrics: Metrics,
46) where
47	Sender: SubsystemSender<AvailabilityStoreMessage>,
48{
49	loop {
50		match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() {
51			Ok(Ok(msg)) => {
52				answer_pov_request_log(&mut sender, msg, &metrics).await;
53			},
54			Err(fatal) => {
55				gum::debug!(
56					target: LOG_TARGET,
57					error = ?fatal,
58					"Shutting down POV receiver."
59				);
60				return
61			},
62			Ok(Err(jfyi)) => {
63				gum::debug!(target: LOG_TARGET, error = ?jfyi, "Error decoding incoming PoV request.");
64			},
65		}
66	}
67}
68
69/// Receiver task to be forked as a separate task to handle chunk requests.
70pub async fn run_chunk_receivers<Sender>(
71	mut sender: Sender,
72	mut receiver_v1: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
73	mut receiver_v2: IncomingRequestReceiver<v2::ChunkFetchingRequest>,
74	metrics: Metrics,
75) where
76	Sender: SubsystemSender<AvailabilityStoreMessage>,
77{
78	let make_resp_v1 = |chunk: Option<ErasureChunk>| match chunk {
79		None => v1::ChunkFetchingResponse::NoSuchChunk,
80		Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()),
81	};
82
83	let make_resp_v2 = |chunk: Option<ErasureChunk>| match chunk {
84		None => v2::ChunkFetchingResponse::NoSuchChunk,
85		Some(chunk) => v2::ChunkFetchingResponse::Chunk(chunk.into()),
86	};
87
88	loop {
89		select! {
90			res = receiver_v1.recv(|| vec![COST_INVALID_REQUEST]).fuse() => match res.into_nested() {
91				Ok(Ok(msg)) => {
92					answer_chunk_request_log(&mut sender, msg, make_resp_v1, &metrics).await;
93				},
94				Err(fatal) => {
95					gum::debug!(
96						target: LOG_TARGET,
97						error = ?fatal,
98						"Shutting down chunk receiver."
99					);
100					return
101				},
102				Ok(Err(jfyi)) => {
103					gum::debug!(
104						target: LOG_TARGET,
105						error = ?jfyi,
106						"Error decoding incoming chunk request."
107					);
108				}
109			},
110			res = receiver_v2.recv(|| vec![COST_INVALID_REQUEST]).fuse() => match res.into_nested() {
111				Ok(Ok(msg)) => {
112					answer_chunk_request_log(&mut sender, msg.into(), make_resp_v2, &metrics).await;
113				},
114				Err(fatal) => {
115					gum::debug!(
116						target: LOG_TARGET,
117						error = ?fatal,
118						"Shutting down chunk receiver."
119					);
120					return
121				},
122				Ok(Err(jfyi)) => {
123					gum::debug!(
124						target: LOG_TARGET,
125						error = ?jfyi,
126						"Error decoding incoming chunk request."
127					);
128				}
129			}
130		}
131	}
132}
133
134/// Variant of `answer_pov_request` that does Prometheus metric and logging on errors.
135///
136/// Any errors of `answer_pov_request` will simply be logged.
137pub async fn answer_pov_request_log<Sender>(
138	sender: &mut Sender,
139	req: IncomingRequest<v1::PoVFetchingRequest>,
140	metrics: &Metrics,
141) where
142	Sender: SubsystemSender<AvailabilityStoreMessage>,
143{
144	let res = answer_pov_request(sender, req).await;
145	match res {
146		Ok(result) => metrics.on_served_pov(if result { SUCCEEDED } else { NOT_FOUND }),
147		Err(err) => {
148			gum::warn!(
149				target: LOG_TARGET,
150				err= ?err,
151				"Serving PoV failed with error"
152			);
153			metrics.on_served_pov(FAILED);
154		},
155	}
156}
157
158/// Variant of `answer_chunk_request` that does Prometheus metric and logging on errors.
159///
160/// Any errors of `answer_request` will simply be logged.
161pub async fn answer_chunk_request_log<Sender, Req, MakeResp>(
162	sender: &mut Sender,
163	req: IncomingRequest<Req>,
164	make_response: MakeResp,
165	metrics: &Metrics,
166) where
167	Req: IsRequest + Decode + Encode + Into<v1::ChunkFetchingRequest>,
168	Req::Response: Encode,
169	Sender: SubsystemSender<AvailabilityStoreMessage>,
170	MakeResp: Fn(Option<ErasureChunk>) -> Req::Response,
171{
172	let res = answer_chunk_request(sender, req, make_response).await;
173	match res {
174		Ok(result) => metrics.on_served_chunk(if result { SUCCEEDED } else { NOT_FOUND }),
175		Err(err) => {
176			gum::warn!(
177				target: LOG_TARGET,
178				err= ?err,
179				"Serving chunk failed with error"
180			);
181			metrics.on_served_chunk(FAILED);
182		},
183	}
184}
185
186/// Answer an incoming PoV fetch request by querying the av store.
187///
188/// Returns: `Ok(true)` if chunk was found and served.
189pub async fn answer_pov_request<Sender>(
190	sender: &mut Sender,
191	req: IncomingRequest<v1::PoVFetchingRequest>,
192) -> Result<bool>
193where
194	Sender: SubsystemSender<AvailabilityStoreMessage>,
195{
196	let av_data = query_available_data(sender, req.payload.candidate_hash).await?;
197
198	let result = av_data.is_some();
199
200	let response = match av_data {
201		None => v1::PoVFetchingResponse::NoSuchPoV,
202		Some(av_data) => {
203			let pov = Arc::try_unwrap(av_data.pov).unwrap_or_else(|a| (&*a).clone());
204			v1::PoVFetchingResponse::PoV(pov)
205		},
206	};
207
208	req.send_response(response).map_err(|_| JfyiError::SendResponse)?;
209	Ok(result)
210}
211
212/// Answer an incoming chunk request by querying the av store.
213///
214/// Returns: `Ok(true)` if chunk was found and served.
215pub async fn answer_chunk_request<Sender, Req, MakeResp>(
216	sender: &mut Sender,
217	req: IncomingRequest<Req>,
218	make_response: MakeResp,
219) -> Result<bool>
220where
221	Sender: SubsystemSender<AvailabilityStoreMessage>,
222	Req: IsRequest + Decode + Encode + Into<v1::ChunkFetchingRequest>,
223	Req::Response: Encode,
224	MakeResp: Fn(Option<ErasureChunk>) -> Req::Response,
225{
226	// V1 and V2 requests have the same payload, so decoding into either one will work. It's the
227	// responses that differ, hence the `MakeResp` generic.
228	let payload: v1::ChunkFetchingRequest = req.payload.into();
229
230	let chunk = query_chunk(sender, payload.candidate_hash, payload.index).await?;
231
232	let result = chunk.is_some();
233
234	gum::trace!(
235		target: LOG_TARGET,
236		hash = ?payload.candidate_hash,
237		index = ?payload.index,
238		peer = ?req.peer,
239		has_data = ?chunk.is_some(),
240		"Serving chunk",
241	);
242
243	let response = make_response(chunk);
244
245	req.pending_response
246		.send_response(response)
247		.map_err(|_| JfyiError::SendResponse)?;
248
249	Ok(result)
250}
251
252/// Query chunk from the availability store.
253async fn query_chunk<Sender>(
254	sender: &mut Sender,
255	candidate_hash: CandidateHash,
256	validator_index: ValidatorIndex,
257) -> std::result::Result<Option<ErasureChunk>, JfyiError>
258where
259	Sender: SubsystemSender<AvailabilityStoreMessage>,
260{
261	let (tx, rx) = oneshot::channel();
262	sender
263		.send_message(
264			AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx).into(),
265		)
266		.await;
267
268	let result = rx.await.map_err(|e| {
269		gum::trace!(
270			target: LOG_TARGET,
271			?validator_index,
272			?candidate_hash,
273			error = ?e,
274			"Error retrieving chunk",
275		);
276		JfyiError::QueryChunkResponseChannel(e)
277	})?;
278	Ok(result)
279}
280
281/// Query PoV from the availability store.
282async fn query_available_data<Sender>(
283	sender: &mut Sender,
284	candidate_hash: CandidateHash,
285) -> Result<Option<AvailableData>>
286where
287	Sender: SubsystemSender<AvailabilityStoreMessage>,
288{
289	let (tx, rx) = oneshot::channel();
290	sender
291		.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx).into())
292		.await;
293
294	let result = rx.await.map_err(JfyiError::QueryAvailableDataResponseChannel)?;
295	Ok(result)
296}