polkadot_node_network_protocol/request_response/
outgoing.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use futures::{channel::oneshot, prelude::Future, FutureExt};
18
19use codec::{Decode, Encode, Error as DecodingError};
20use network::ProtocolName;
21
22use sc_network as network;
23use sc_network_types::PeerId;
24
25use polkadot_primitives::AuthorityDiscoveryId;
26
27use super::{v1, v2, IsRequest, Protocol};
28
29/// All requests that can be sent to the network bridge via `NetworkBridgeTxMessage::SendRequest`.
30#[derive(Debug)]
31pub enum Requests {
32	/// Request an availability chunk from a node.
33	ChunkFetching(OutgoingRequest<v2::ChunkFetchingRequest, v1::ChunkFetchingRequest>),
34	/// Fetch a collation from a collator which previously announced it.
35	CollationFetchingV1(OutgoingRequest<v1::CollationFetchingRequest>),
36	/// Fetch a PoV from a validator which previously sent out a seconded statement.
37	PoVFetchingV1(OutgoingRequest<v1::PoVFetchingRequest>),
38	/// Request full available data from a node.
39	AvailableDataFetchingV1(OutgoingRequest<v1::AvailableDataFetchingRequest>),
40	/// Requests for fetching large statements as part of statement distribution.
41	StatementFetchingV1(OutgoingRequest<v1::StatementFetchingRequest>),
42	/// Requests for notifying about an ongoing dispute.
43	DisputeSendingV1(OutgoingRequest<v1::DisputeRequest>),
44
45	/// Request a candidate and attestations.
46	AttestedCandidateV2(OutgoingRequest<v2::AttestedCandidateRequest>),
47	/// Fetch a collation from a collator which previously announced it.
48	/// Compared to V1 it requires specifying which candidate is requested by its hash.
49	CollationFetchingV2(OutgoingRequest<v2::CollationFetchingRequest>),
50}
51
52impl Requests {
53	/// Encode the request.
54	///
55	/// The corresponding protocol is returned as well, as we are now leaving typed territory.
56	///
57	/// Note: `Requests` is just an enum collecting all supported requests supported by network
58	/// bridge, it is never sent over the wire. This function just encodes the individual requests
59	/// contained in the `enum`.
60	pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
61		match self {
62			Self::ChunkFetching(r) => r.encode_request(),
63			Self::CollationFetchingV1(r) => r.encode_request(),
64			Self::CollationFetchingV2(r) => r.encode_request(),
65			Self::PoVFetchingV1(r) => r.encode_request(),
66			Self::AvailableDataFetchingV1(r) => r.encode_request(),
67			Self::StatementFetchingV1(r) => r.encode_request(),
68			Self::DisputeSendingV1(r) => r.encode_request(),
69			Self::AttestedCandidateV2(r) => r.encode_request(),
70		}
71	}
72}
73
74/// Used by the network to send us a response to a request.
75pub type ResponseSender = oneshot::Sender<Result<(Vec<u8>, ProtocolName), network::RequestFailure>>;
76
77/// Any error that can occur when sending a request.
78#[derive(Debug, thiserror::Error)]
79pub enum RequestError {
80	/// Response could not be decoded.
81	#[error("Response could not be decoded: {0}")]
82	InvalidResponse(#[from] DecodingError),
83
84	/// Some error in substrate/libp2p happened.
85	#[error("{0}")]
86	NetworkError(#[from] network::RequestFailure),
87
88	/// Response got canceled by networking.
89	#[error("Response channel got canceled")]
90	Canceled(#[from] oneshot::Canceled),
91}
92
93impl RequestError {
94	/// Whether the error represents some kind of timeout condition.
95	pub fn is_timed_out(&self) -> bool {
96		match self {
97			Self::Canceled(_) |
98			Self::NetworkError(network::RequestFailure::Obsolete) |
99			Self::NetworkError(network::RequestFailure::Network(
100				network::OutboundFailure::Timeout,
101			)) => true,
102			_ => false,
103		}
104	}
105}
106
107/// A request to be sent to the network bridge, including a sender for sending responses/failures.
108///
109/// The network implementation will make use of that sender for informing the requesting subsystem
110/// about responses/errors.
111///
112/// When using `Recipient::Peer`, keep in mind that no address (as in IP address and port) might
113/// be known for that specific peer. You are encouraged to use `Peer` for peers that you are
114/// expected to be already connected to.
115/// When using `Recipient::Authority`, the addresses can be found thanks to the authority
116/// discovery system.
117#[derive(Debug)]
118pub struct OutgoingRequest<Req, FallbackReq = Req> {
119	/// Intended recipient of this request.
120	pub peer: Recipient,
121	/// The actual request to send over the wire.
122	pub payload: Req,
123	/// Optional fallback request and protocol.
124	pub fallback_request: Option<(FallbackReq, Protocol)>,
125	/// Sender which is used by networking to get us back a response.
126	pub pending_response: ResponseSender,
127}
128
129/// Potential recipients of an outgoing request.
130#[derive(Debug, Eq, Hash, PartialEq, Clone)]
131pub enum Recipient {
132	/// Recipient is a regular peer and we know its peer id.
133	Peer(PeerId),
134	/// Recipient is a validator, we address it via this `AuthorityDiscoveryId`.
135	Authority(AuthorityDiscoveryId),
136}
137
138/// Responses received for an `OutgoingRequest`.
139pub type OutgoingResult<Res> = Result<Res, RequestError>;
140
141impl<Req, FallbackReq> OutgoingRequest<Req, FallbackReq>
142where
143	Req: IsRequest + Encode,
144	Req::Response: Decode,
145	FallbackReq: IsRequest + Encode,
146	FallbackReq::Response: Decode,
147{
148	/// Create a new `OutgoingRequest`.
149	///
150	/// It will contain a sender that is used by the networking for sending back responses. The
151	/// connected receiver is returned as the second element in the returned tuple.
152	pub fn new(
153		peer: Recipient,
154		payload: Req,
155	) -> (Self, impl Future<Output = OutgoingResult<Req::Response>>) {
156		let (tx, rx) = oneshot::channel();
157		let r = Self { peer, payload, pending_response: tx, fallback_request: None };
158		(r, receive_response::<Req>(rx.map(|r| r.map(|r| r.map(|(resp, _)| resp)))))
159	}
160
161	/// Create a new `OutgoingRequest` with a fallback in case the remote does not support this
162	/// protocol. Useful when adding a new version of a req-response protocol, to achieve
163	/// compatibility with the older version.
164	///
165	/// Returns a raw `Vec<u8>` response over the channel. Use the associated `ProtocolName` to know
166	/// which request was the successful one and appropriately decode the response.
167	pub fn new_with_fallback(
168		peer: Recipient,
169		payload: Req,
170		fallback_request: FallbackReq,
171	) -> (Self, impl Future<Output = OutgoingResult<(Vec<u8>, ProtocolName)>>) {
172		let (tx, rx) = oneshot::channel();
173		let r = Self {
174			peer,
175			payload,
176			pending_response: tx,
177			fallback_request: Some((fallback_request, FallbackReq::PROTOCOL)),
178		};
179		(r, async { Ok(rx.await??) })
180	}
181
182	/// Encode a request into a `Vec<u8>`.
183	///
184	/// As this throws away type information, we also return the `Protocol` this encoded request
185	/// adheres to.
186	pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
187		let OutgoingRequest { peer, payload, pending_response, fallback_request } = self;
188		let encoded = OutgoingRequest {
189			peer,
190			payload: payload.encode(),
191			fallback_request: fallback_request.map(|(r, p)| (r.encode(), p)),
192			pending_response,
193		};
194		(Req::PROTOCOL, encoded)
195	}
196}
197
198/// Future for actually receiving a typed response for an `OutgoingRequest`.
199async fn receive_response<Req>(
200	rec: impl Future<Output = Result<Result<Vec<u8>, network::RequestFailure>, oneshot::Canceled>>,
201) -> OutgoingResult<Req::Response>
202where
203	Req: IsRequest,
204	Req::Response: Decode,
205{
206	let raw = rec.await??;
207	Ok(Decode::decode(&mut raw.as_ref())?)
208}