referrerpolicy=no-referrer-when-downgrade

polkadot_availability_distribution/pov_requester/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! PoV requester takes care of requesting PoVs from validators of a backing group.
18
19use futures::{channel::oneshot, future::BoxFuture, FutureExt};
20
21use polkadot_node_network_protocol::request_response::{
22	outgoing::{RequestError, Requests},
23	v1::{PoVFetchingRequest, PoVFetchingResponse},
24	OutgoingRequest, Recipient,
25};
26use polkadot_node_primitives::PoV;
27use polkadot_node_subsystem::{
28	messages::{IfDisconnected, NetworkBridgeTxMessage},
29	overseer,
30};
31use polkadot_node_subsystem_util::runtime::RuntimeInfo;
32use polkadot_primitives::{
33	AuthorityDiscoveryId, CandidateHash, Hash, Id as ParaId, ValidatorIndex,
34};
35
36use crate::{
37	error::{Error, FatalError, JfyiError, Result},
38	metrics::{FAILED, NOT_FOUND, SUCCEEDED},
39	Metrics, LOG_TARGET,
40};
41
42/// Start background worker for taking care of fetching the requested `PoV` from the network.
43#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
44pub async fn fetch_pov<Context>(
45	ctx: &mut Context,
46	runtime: &mut RuntimeInfo,
47	parent: Hash,
48	from_validator: ValidatorIndex,
49	para_id: ParaId,
50	candidate_hash: CandidateHash,
51	pov_hash: Hash,
52	tx: oneshot::Sender<PoV>,
53	metrics: Metrics,
54) -> Result<()> {
55	let info = &runtime.get_session_info(ctx.sender(), parent).await?.session_info;
56	let authority_id = info
57		.discovery_keys
58		.get(from_validator.0 as usize)
59		.ok_or(JfyiError::InvalidValidatorIndex)?
60		.clone();
61	let (req, pending_response) = OutgoingRequest::new(
62		Recipient::Authority(authority_id.clone()),
63		PoVFetchingRequest { candidate_hash },
64	);
65	let full_req = Requests::PoVFetchingV1(req);
66
67	ctx.send_message(NetworkBridgeTxMessage::SendRequests(
68		vec![full_req],
69		IfDisconnected::ImmediateError,
70	))
71	.await;
72
73	ctx.spawn(
74		"pov-fetcher",
75		fetch_pov_job(para_id, pov_hash, authority_id, pending_response.boxed(), tx, metrics)
76			.boxed(),
77	)
78	.map_err(|e| FatalError::SpawnTask(e))?;
79	Ok(())
80}
81
82/// Future to be spawned for taking care of handling reception and sending of PoV.
83async fn fetch_pov_job(
84	para_id: ParaId,
85	pov_hash: Hash,
86	authority_id: AuthorityDiscoveryId,
87	pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>,
88	tx: oneshot::Sender<PoV>,
89	metrics: Metrics,
90) {
91	if let Err(err) = do_fetch_pov(pov_hash, pending_response, tx, metrics).await {
92		gum::warn!(target: LOG_TARGET, ?err, ?para_id, ?pov_hash, ?authority_id, "fetch_pov_job");
93	}
94}
95
96/// Do the actual work of waiting for the response.
97async fn do_fetch_pov(
98	pov_hash: Hash,
99	pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>,
100	tx: oneshot::Sender<PoV>,
101	metrics: Metrics,
102) -> Result<()> {
103	let response = pending_response.await.map_err(Error::FetchPoV);
104	let pov = match response {
105		Ok(PoVFetchingResponse::PoV(pov)) => pov,
106		Ok(PoVFetchingResponse::NoSuchPoV) => {
107			metrics.on_fetched_pov(NOT_FOUND);
108			return Err(Error::NoSuchPoV)
109		},
110		Err(err) => {
111			metrics.on_fetched_pov(FAILED);
112			return Err(err)
113		},
114	};
115	if pov.hash() == pov_hash {
116		metrics.on_fetched_pov(SUCCEEDED);
117		tx.send(pov).map_err(|_| Error::SendResponse)
118	} else {
119		metrics.on_fetched_pov(FAILED);
120		Err(Error::UnexpectedPoV)
121	}
122}
123
124#[cfg(test)]
125mod tests {
126	use assert_matches::assert_matches;
127	use futures::{executor, future};
128
129	use codec::Encode;
130	use sc_network::ProtocolName;
131	use sp_core::testing::TaskExecutor;
132
133	use polkadot_node_primitives::BlockData;
134	use polkadot_node_subsystem::messages::{
135		AllMessages, AvailabilityDistributionMessage, RuntimeApiMessage, RuntimeApiRequest,
136	};
137	use polkadot_node_subsystem_test_helpers as test_helpers;
138	use polkadot_primitives::{CandidateHash, ExecutorParams, Hash, NodeFeatures, ValidatorIndex};
139	use test_helpers::mock::make_ferdie_keystore;
140
141	use super::*;
142	use crate::{tests::mock::make_session_info, LOG_TARGET};
143
144	#[test]
145	fn rejects_invalid_pov() {
146		sp_tracing::try_init_simple();
147		let pov = PoV { block_data: BlockData(vec![1, 2, 3, 4, 5, 6]) };
148		test_run(Hash::default(), pov);
149	}
150
151	#[test]
152	fn accepts_valid_pov() {
153		sp_tracing::try_init_simple();
154		let pov = PoV { block_data: BlockData(vec![1, 2, 3, 4, 5, 6]) };
155		test_run(pov.hash(), pov);
156	}
157
158	fn test_run(pov_hash: Hash, pov: PoV) {
159		let pool = TaskExecutor::new();
160		let (mut context, mut virtual_overseer) =
161			polkadot_node_subsystem_test_helpers::make_subsystem_context::<
162				AvailabilityDistributionMessage,
163				TaskExecutor,
164			>(pool.clone());
165		let keystore = make_ferdie_keystore();
166		let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(Some(keystore));
167
168		let (tx, rx) = oneshot::channel();
169		let testee = async {
170			fetch_pov(
171				&mut context,
172				&mut runtime,
173				Hash::default(),
174				ValidatorIndex(0),
175				ParaId::default(),
176				CandidateHash::default(),
177				pov_hash,
178				tx,
179				Metrics::new_dummy(),
180			)
181			.await
182			.expect("Should succeed");
183		};
184
185		let tester = async move {
186			loop {
187				match virtual_overseer.recv().await {
188					AllMessages::RuntimeApi(RuntimeApiMessage::Request(
189						_,
190						RuntimeApiRequest::SessionIndexForChild(tx),
191					)) => {
192						tx.send(Ok(0)).unwrap();
193					},
194					AllMessages::RuntimeApi(RuntimeApiMessage::Request(
195						_,
196						RuntimeApiRequest::SessionInfo(_, tx),
197					)) => {
198						tx.send(Ok(Some(make_session_info()))).unwrap();
199					},
200					AllMessages::RuntimeApi(RuntimeApiMessage::Request(
201						_,
202						RuntimeApiRequest::SessionExecutorParams(_, tx),
203					)) => {
204						tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
205					},
206					AllMessages::RuntimeApi(RuntimeApiMessage::Request(
207						_,
208						RuntimeApiRequest::NodeFeatures(_, si_tx),
209					)) => {
210						si_tx.send(Ok(NodeFeatures::EMPTY)).unwrap();
211					},
212					AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(
213						mut reqs,
214						_,
215					)) => {
216						let req = assert_matches!(
217							reqs.pop(),
218							Some(Requests::PoVFetchingV1(outgoing)) => {outgoing}
219						);
220						req.pending_response
221							.send(Ok((
222								PoVFetchingResponse::PoV(pov.clone()).encode(),
223								ProtocolName::from(""),
224							)))
225							.unwrap();
226						break
227					},
228					msg => gum::debug!(target: LOG_TARGET, msg = ?msg, "Received msg"),
229				}
230			}
231			if pov.hash() == pov_hash {
232				assert_eq!(rx.await, Ok(pov));
233			} else {
234				assert_eq!(rx.await, Err(oneshot::Canceled));
235			}
236		};
237		futures::pin_mut!(testee);
238		futures::pin_mut!(tester);
239		executor::block_on(future::join(testee, tester));
240	}
241}