polkadot_node_network_protocol/request_response/
outgoing.rs1use futures::{channel::oneshot, prelude::Future, FutureExt};
18
19use codec::{Decode, Encode, Error as DecodingError};
20use network::ProtocolName;
21
22use sc_network as network;
23use sc_network_types::PeerId;
24
25use polkadot_primitives::AuthorityDiscoveryId;
26
27use super::{v1, v2, IsRequest, Protocol};
28
29#[derive(Debug)]
31pub enum Requests {
32 ChunkFetching(OutgoingRequest<v2::ChunkFetchingRequest, v1::ChunkFetchingRequest>),
34 CollationFetchingV1(OutgoingRequest<v1::CollationFetchingRequest>),
36 PoVFetchingV1(OutgoingRequest<v1::PoVFetchingRequest>),
38 AvailableDataFetchingV1(OutgoingRequest<v1::AvailableDataFetchingRequest>),
40 StatementFetchingV1(OutgoingRequest<v1::StatementFetchingRequest>),
42 DisputeSendingV1(OutgoingRequest<v1::DisputeRequest>),
44
45 AttestedCandidateV2(OutgoingRequest<v2::AttestedCandidateRequest>),
47 CollationFetchingV2(OutgoingRequest<v2::CollationFetchingRequest>),
50}
51
52impl Requests {
53 pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
61 match self {
62 Self::ChunkFetching(r) => r.encode_request(),
63 Self::CollationFetchingV1(r) => r.encode_request(),
64 Self::CollationFetchingV2(r) => r.encode_request(),
65 Self::PoVFetchingV1(r) => r.encode_request(),
66 Self::AvailableDataFetchingV1(r) => r.encode_request(),
67 Self::StatementFetchingV1(r) => r.encode_request(),
68 Self::DisputeSendingV1(r) => r.encode_request(),
69 Self::AttestedCandidateV2(r) => r.encode_request(),
70 }
71 }
72}
73
74pub type ResponseSender = oneshot::Sender<Result<(Vec<u8>, ProtocolName), network::RequestFailure>>;
76
77#[derive(Debug, thiserror::Error)]
79pub enum RequestError {
80 #[error("Response could not be decoded: {0}")]
82 InvalidResponse(#[from] DecodingError),
83
84 #[error("{0}")]
86 NetworkError(#[from] network::RequestFailure),
87
88 #[error("Response channel got canceled")]
90 Canceled(#[from] oneshot::Canceled),
91}
92
93impl RequestError {
94 pub fn is_timed_out(&self) -> bool {
96 match self {
97 Self::Canceled(_) |
98 Self::NetworkError(network::RequestFailure::Obsolete) |
99 Self::NetworkError(network::RequestFailure::Network(
100 network::OutboundFailure::Timeout,
101 )) => true,
102 _ => false,
103 }
104 }
105}
106
107#[derive(Debug)]
118pub struct OutgoingRequest<Req, FallbackReq = Req> {
119 pub peer: Recipient,
121 pub payload: Req,
123 pub fallback_request: Option<(FallbackReq, Protocol)>,
125 pub pending_response: ResponseSender,
127}
128
129#[derive(Debug, Eq, Hash, PartialEq, Clone)]
131pub enum Recipient {
132 Peer(PeerId),
134 Authority(AuthorityDiscoveryId),
136}
137
138pub type OutgoingResult<Res> = Result<Res, RequestError>;
140
141impl<Req, FallbackReq> OutgoingRequest<Req, FallbackReq>
142where
143 Req: IsRequest + Encode,
144 Req::Response: Decode,
145 FallbackReq: IsRequest + Encode,
146 FallbackReq::Response: Decode,
147{
148 pub fn new(
153 peer: Recipient,
154 payload: Req,
155 ) -> (Self, impl Future<Output = OutgoingResult<Req::Response>>) {
156 let (tx, rx) = oneshot::channel();
157 let r = Self { peer, payload, pending_response: tx, fallback_request: None };
158 (r, receive_response::<Req>(rx.map(|r| r.map(|r| r.map(|(resp, _)| resp)))))
159 }
160
161 pub fn new_with_fallback(
168 peer: Recipient,
169 payload: Req,
170 fallback_request: FallbackReq,
171 ) -> (Self, impl Future<Output = OutgoingResult<(Vec<u8>, ProtocolName)>>) {
172 let (tx, rx) = oneshot::channel();
173 let r = Self {
174 peer,
175 payload,
176 pending_response: tx,
177 fallback_request: Some((fallback_request, FallbackReq::PROTOCOL)),
178 };
179 (r, async { Ok(rx.await??) })
180 }
181
182 pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
187 let OutgoingRequest { peer, payload, pending_response, fallback_request } = self;
188 let encoded = OutgoingRequest {
189 peer,
190 payload: payload.encode(),
191 fallback_request: fallback_request.map(|(r, p)| (r.encode(), p)),
192 pending_response,
193 };
194 (Req::PROTOCOL, encoded)
195 }
196}
197
198async fn receive_response<Req>(
200 rec: impl Future<Output = Result<Result<Vec<u8>, network::RequestFailure>, oneshot::Canceled>>,
201) -> OutgoingResult<Req::Response>
202where
203 Req: IsRequest,
204 Req::Response: Decode,
205{
206 let raw = rec.await??;
207 Ok(Decode::decode(&mut raw.as_ref())?)
208}