polkadot_node_network_protocol/request_response/
outgoing.rs1use futures::{channel::oneshot, prelude::Future, FutureExt};
18
19use codec::{Decode, Encode, Error as DecodingError};
20use network::ProtocolName;
21
22use sc_network as network;
23use sc_network_types::PeerId;
24
25use polkadot_primitives::AuthorityDiscoveryId;
26
27use super::{v1, v2, IsRequest, Protocol};
28
29#[derive(Debug)]
31pub enum Requests {
32	ChunkFetching(OutgoingRequest<v2::ChunkFetchingRequest, v1::ChunkFetchingRequest>),
34	CollationFetchingV1(OutgoingRequest<v1::CollationFetchingRequest>),
36	PoVFetchingV1(OutgoingRequest<v1::PoVFetchingRequest>),
38	AvailableDataFetchingV1(OutgoingRequest<v1::AvailableDataFetchingRequest>),
40	DisputeSendingV1(OutgoingRequest<v1::DisputeRequest>),
42
43	AttestedCandidateV2(OutgoingRequest<v2::AttestedCandidateRequest>),
45	CollationFetchingV2(OutgoingRequest<v2::CollationFetchingRequest>),
48}
49
50impl Requests {
51	pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
59		match self {
60			Self::ChunkFetching(r) => r.encode_request(),
61			Self::CollationFetchingV1(r) => r.encode_request(),
62			Self::CollationFetchingV2(r) => r.encode_request(),
63			Self::PoVFetchingV1(r) => r.encode_request(),
64			Self::AvailableDataFetchingV1(r) => r.encode_request(),
65			Self::DisputeSendingV1(r) => r.encode_request(),
66			Self::AttestedCandidateV2(r) => r.encode_request(),
67		}
68	}
69}
70
71pub type ResponseSender = oneshot::Sender<Result<(Vec<u8>, ProtocolName), network::RequestFailure>>;
73
74#[derive(Debug, thiserror::Error)]
76pub enum RequestError {
77	#[error("Response could not be decoded: {0}")]
79	InvalidResponse(#[from] DecodingError),
80
81	#[error("{0}")]
83	NetworkError(#[from] network::RequestFailure),
84
85	#[error("Response channel got canceled")]
87	Canceled(#[from] oneshot::Canceled),
88}
89
90impl RequestError {
91	pub fn is_timed_out(&self) -> bool {
93		match self {
94			Self::Canceled(_) |
95			Self::NetworkError(network::RequestFailure::Obsolete) |
96			Self::NetworkError(network::RequestFailure::Network(
97				network::OutboundFailure::Timeout,
98			)) => true,
99			_ => false,
100		}
101	}
102}
103
104#[derive(Debug)]
115pub struct OutgoingRequest<Req, FallbackReq = Req> {
116	pub peer: Recipient,
118	pub payload: Req,
120	pub fallback_request: Option<(FallbackReq, Protocol)>,
122	pub pending_response: ResponseSender,
124}
125
126#[derive(Debug, Eq, Hash, PartialEq, Clone)]
128pub enum Recipient {
129	Peer(PeerId),
131	Authority(AuthorityDiscoveryId),
133}
134
135pub type OutgoingResult<Res> = Result<Res, RequestError>;
137
138impl<Req, FallbackReq> OutgoingRequest<Req, FallbackReq>
139where
140	Req: IsRequest + Encode,
141	Req::Response: Decode,
142	FallbackReq: IsRequest + Encode,
143	FallbackReq::Response: Decode,
144{
145	pub fn new(
150		peer: Recipient,
151		payload: Req,
152	) -> (Self, impl Future<Output = OutgoingResult<Req::Response>>) {
153		let (tx, rx) = oneshot::channel();
154		let r = Self { peer, payload, pending_response: tx, fallback_request: None };
155		(r, receive_response::<Req>(rx.map(|r| r.map(|r| r.map(|(resp, _)| resp)))))
156	}
157
158	pub fn new_with_fallback(
165		peer: Recipient,
166		payload: Req,
167		fallback_request: FallbackReq,
168	) -> (Self, impl Future<Output = OutgoingResult<(Vec<u8>, ProtocolName)>>) {
169		let (tx, rx) = oneshot::channel();
170		let r = Self {
171			peer,
172			payload,
173			pending_response: tx,
174			fallback_request: Some((fallback_request, FallbackReq::PROTOCOL)),
175		};
176		(r, async { Ok(rx.await??) })
177	}
178
179	pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
184		let OutgoingRequest { peer, payload, pending_response, fallback_request } = self;
185		let encoded = OutgoingRequest {
186			peer,
187			payload: payload.encode(),
188			fallback_request: fallback_request.map(|(r, p)| (r.encode(), p)),
189			pending_response,
190		};
191		(Req::PROTOCOL, encoded)
192	}
193}
194
195async fn receive_response<Req>(
197	rec: impl Future<Output = Result<Result<Vec<u8>, network::RequestFailure>, oneshot::Canceled>>,
198) -> OutgoingResult<Req::Response>
199where
200	Req: IsRequest,
201	Req::Response: Decode,
202{
203	let raw = rec.await??;
204	Ok(Decode::decode(&mut raw.as_ref())?)
205}