polkadot_availability_distribution/pov_requester/
mod.rs1use futures::{channel::oneshot, future::BoxFuture, FutureExt};
20
21use polkadot_node_network_protocol::request_response::{
22 outgoing::{RequestError, Requests},
23 v1::{PoVFetchingRequest, PoVFetchingResponse},
24 OutgoingRequest, Recipient,
25};
26use polkadot_node_primitives::PoV;
27use polkadot_node_subsystem::{
28 messages::{IfDisconnected, NetworkBridgeTxMessage},
29 overseer,
30};
31use polkadot_node_subsystem_util::runtime::RuntimeInfo;
32use polkadot_primitives::{
33 AuthorityDiscoveryId, CandidateHash, Hash, Id as ParaId, ValidatorIndex,
34};
35
36use crate::{
37 error::{Error, FatalError, JfyiError, Result},
38 metrics::{FAILED, NOT_FOUND, SUCCEEDED},
39 Metrics, LOG_TARGET,
40};
41
42#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
44pub async fn fetch_pov<Context>(
45 ctx: &mut Context,
46 runtime: &mut RuntimeInfo,
47 parent: Hash,
48 from_validator: ValidatorIndex,
49 para_id: ParaId,
50 candidate_hash: CandidateHash,
51 pov_hash: Hash,
52 tx: oneshot::Sender<PoV>,
53 metrics: Metrics,
54) -> Result<()> {
55 let info = &runtime.get_session_info(ctx.sender(), parent).await?.session_info;
56 let authority_id = info
57 .discovery_keys
58 .get(from_validator.0 as usize)
59 .ok_or(JfyiError::InvalidValidatorIndex)?
60 .clone();
61 let (req, pending_response) = OutgoingRequest::new(
62 Recipient::Authority(authority_id.clone()),
63 PoVFetchingRequest { candidate_hash },
64 );
65 let full_req = Requests::PoVFetchingV1(req);
66
67 ctx.send_message(NetworkBridgeTxMessage::SendRequests(
68 vec![full_req],
69 IfDisconnected::ImmediateError,
70 ))
71 .await;
72
73 ctx.spawn(
74 "pov-fetcher",
75 fetch_pov_job(para_id, pov_hash, authority_id, pending_response.boxed(), tx, metrics)
76 .boxed(),
77 )
78 .map_err(|e| FatalError::SpawnTask(e))?;
79 Ok(())
80}
81
82async fn fetch_pov_job(
84 para_id: ParaId,
85 pov_hash: Hash,
86 authority_id: AuthorityDiscoveryId,
87 pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>,
88 tx: oneshot::Sender<PoV>,
89 metrics: Metrics,
90) {
91 if let Err(err) = do_fetch_pov(pov_hash, pending_response, tx, metrics).await {
92 gum::warn!(target: LOG_TARGET, ?err, ?para_id, ?pov_hash, ?authority_id, "fetch_pov_job");
93 }
94}
95
96async fn do_fetch_pov(
98 pov_hash: Hash,
99 pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>,
100 tx: oneshot::Sender<PoV>,
101 metrics: Metrics,
102) -> Result<()> {
103 let response = pending_response.await.map_err(Error::FetchPoV);
104 let pov = match response {
105 Ok(PoVFetchingResponse::PoV(pov)) => pov,
106 Ok(PoVFetchingResponse::NoSuchPoV) => {
107 metrics.on_fetched_pov(NOT_FOUND);
108 return Err(Error::NoSuchPoV)
109 },
110 Err(err) => {
111 metrics.on_fetched_pov(FAILED);
112 return Err(err)
113 },
114 };
115 if pov.hash() == pov_hash {
116 metrics.on_fetched_pov(SUCCEEDED);
117 tx.send(pov).map_err(|_| Error::SendResponse)
118 } else {
119 metrics.on_fetched_pov(FAILED);
120 Err(Error::UnexpectedPoV)
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use assert_matches::assert_matches;
127 use futures::{executor, future};
128
129 use codec::Encode;
130 use sc_network::ProtocolName;
131 use sp_core::testing::TaskExecutor;
132
133 use polkadot_node_primitives::BlockData;
134 use polkadot_node_subsystem::messages::{
135 AllMessages, AvailabilityDistributionMessage, RuntimeApiMessage, RuntimeApiRequest,
136 };
137 use polkadot_node_subsystem_test_helpers as test_helpers;
138 use polkadot_primitives::{CandidateHash, ExecutorParams, Hash, NodeFeatures, ValidatorIndex};
139 use test_helpers::mock::make_ferdie_keystore;
140
141 use super::*;
142 use crate::{tests::mock::make_session_info, LOG_TARGET};
143
144 #[test]
145 fn rejects_invalid_pov() {
146 sp_tracing::try_init_simple();
147 let pov = PoV { block_data: BlockData(vec![1, 2, 3, 4, 5, 6]) };
148 test_run(Hash::default(), pov);
149 }
150
151 #[test]
152 fn accepts_valid_pov() {
153 sp_tracing::try_init_simple();
154 let pov = PoV { block_data: BlockData(vec![1, 2, 3, 4, 5, 6]) };
155 test_run(pov.hash(), pov);
156 }
157
158 fn test_run(pov_hash: Hash, pov: PoV) {
159 let pool = TaskExecutor::new();
160 let (mut context, mut virtual_overseer) =
161 polkadot_node_subsystem_test_helpers::make_subsystem_context::<
162 AvailabilityDistributionMessage,
163 TaskExecutor,
164 >(pool.clone());
165 let keystore = make_ferdie_keystore();
166 let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(Some(keystore));
167
168 let (tx, rx) = oneshot::channel();
169 let testee = async {
170 fetch_pov(
171 &mut context,
172 &mut runtime,
173 Hash::default(),
174 ValidatorIndex(0),
175 ParaId::default(),
176 CandidateHash::default(),
177 pov_hash,
178 tx,
179 Metrics::new_dummy(),
180 )
181 .await
182 .expect("Should succeed");
183 };
184
185 let tester = async move {
186 loop {
187 match virtual_overseer.recv().await {
188 AllMessages::RuntimeApi(RuntimeApiMessage::Request(
189 _,
190 RuntimeApiRequest::SessionIndexForChild(tx),
191 )) => {
192 tx.send(Ok(0)).unwrap();
193 },
194 AllMessages::RuntimeApi(RuntimeApiMessage::Request(
195 _,
196 RuntimeApiRequest::SessionInfo(_, tx),
197 )) => {
198 tx.send(Ok(Some(make_session_info()))).unwrap();
199 },
200 AllMessages::RuntimeApi(RuntimeApiMessage::Request(
201 _,
202 RuntimeApiRequest::SessionExecutorParams(_, tx),
203 )) => {
204 tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
205 },
206 AllMessages::RuntimeApi(RuntimeApiMessage::Request(
207 _,
208 RuntimeApiRequest::NodeFeatures(_, si_tx),
209 )) => {
210 si_tx.send(Ok(NodeFeatures::EMPTY)).unwrap();
211 },
212 AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(
213 mut reqs,
214 _,
215 )) => {
216 let req = assert_matches!(
217 reqs.pop(),
218 Some(Requests::PoVFetchingV1(outgoing)) => {outgoing}
219 );
220 req.pending_response
221 .send(Ok((
222 PoVFetchingResponse::PoV(pov.clone()).encode(),
223 ProtocolName::from(""),
224 )))
225 .unwrap();
226 break
227 },
228 msg => gum::debug!(target: LOG_TARGET, msg = ?msg, "Received msg"),
229 }
230 }
231 if pov.hash() == pov_hash {
232 assert_eq!(rx.await, Ok(pov));
233 } else {
234 assert_eq!(rx.await, Err(oneshot::Canceled));
235 }
236 };
237 futures::pin_mut!(testee);
238 futures::pin_mut!(tester);
239 executor::block_on(future::join(testee, tester));
240 }
241}