1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
34// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
89// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
1314// You should have received a copy of the GNU General Public License
15// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
1617use futures::{channel::oneshot, prelude::Future, FutureExt};
1819use codec::{Decode, Encode, Error as DecodingError};
20use network::ProtocolName;
2122use sc_network as network;
23use sc_network_types::PeerId;
2425use polkadot_primitives::AuthorityDiscoveryId;
2627use super::{v1, v2, IsRequest, Protocol};
2829/// All requests that can be sent to the network bridge via `NetworkBridgeTxMessage::SendRequest`.
30#[derive(Debug)]
31pub enum Requests {
32/// Request an availability chunk from a node.
33ChunkFetching(OutgoingRequest<v2::ChunkFetchingRequest, v1::ChunkFetchingRequest>),
34/// Fetch a collation from a collator which previously announced it.
35CollationFetchingV1(OutgoingRequest<v1::CollationFetchingRequest>),
36/// Fetch a PoV from a validator which previously sent out a seconded statement.
37PoVFetchingV1(OutgoingRequest<v1::PoVFetchingRequest>),
38/// Request full available data from a node.
39AvailableDataFetchingV1(OutgoingRequest<v1::AvailableDataFetchingRequest>),
40/// Requests for notifying about an ongoing dispute.
41DisputeSendingV1(OutgoingRequest<v1::DisputeRequest>),
4243/// Request a candidate and attestations.
44AttestedCandidateV2(OutgoingRequest<v2::AttestedCandidateRequest>),
45/// Fetch a collation from a collator which previously announced it.
46 /// Compared to V1 it requires specifying which candidate is requested by its hash.
47CollationFetchingV2(OutgoingRequest<v2::CollationFetchingRequest>),
48}
4950impl Requests {
51/// Encode the request.
52 ///
53 /// The corresponding protocol is returned as well, as we are now leaving typed territory.
54 ///
55 /// Note: `Requests` is just an enum collecting all supported requests supported by network
56 /// bridge, it is never sent over the wire. This function just encodes the individual requests
57 /// contained in the `enum`.
58pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
59match self {
60Self::ChunkFetching(r) => r.encode_request(),
61Self::CollationFetchingV1(r) => r.encode_request(),
62Self::CollationFetchingV2(r) => r.encode_request(),
63Self::PoVFetchingV1(r) => r.encode_request(),
64Self::AvailableDataFetchingV1(r) => r.encode_request(),
65Self::DisputeSendingV1(r) => r.encode_request(),
66Self::AttestedCandidateV2(r) => r.encode_request(),
67 }
68 }
69}
7071/// Used by the network to send us a response to a request.
72pub type ResponseSender = oneshot::Sender<Result<(Vec<u8>, ProtocolName), network::RequestFailure>>;
7374/// Any error that can occur when sending a request.
75#[derive(Debug, thiserror::Error)]
76pub enum RequestError {
77/// Response could not be decoded.
78#[error("Response could not be decoded: {0}")]
79InvalidResponse(#[from] DecodingError),
8081/// Some error in substrate/libp2p happened.
82#[error("{0}")]
83NetworkError(#[from] network::RequestFailure),
8485/// Response got canceled by networking.
86#[error("Response channel got canceled")]
87Canceled(#[from] oneshot::Canceled),
88}
8990impl RequestError {
91/// Whether the error represents some kind of timeout condition.
92pub fn is_timed_out(&self) -> bool {
93match self {
94Self::Canceled(_) |
95Self::NetworkError(network::RequestFailure::Obsolete) |
96Self::NetworkError(network::RequestFailure::Network(
97 network::OutboundFailure::Timeout,
98 )) => true,
99_ => false,
100 }
101 }
102}
103104/// A request to be sent to the network bridge, including a sender for sending responses/failures.
105///
106/// The network implementation will make use of that sender for informing the requesting subsystem
107/// about responses/errors.
108///
109/// When using `Recipient::Peer`, keep in mind that no address (as in IP address and port) might
110/// be known for that specific peer. You are encouraged to use `Peer` for peers that you are
111/// expected to be already connected to.
112/// When using `Recipient::Authority`, the addresses can be found thanks to the authority
113/// discovery system.
114#[derive(Debug)]
115pub struct OutgoingRequest<Req, FallbackReq = Req> {
116/// Intended recipient of this request.
117pub peer: Recipient,
118/// The actual request to send over the wire.
119pub payload: Req,
120/// Optional fallback request and protocol.
121pub fallback_request: Option<(FallbackReq, Protocol)>,
122/// Sender which is used by networking to get us back a response.
123pub pending_response: ResponseSender,
124}
125126/// Potential recipients of an outgoing request.
127#[derive(Debug, Eq, Hash, PartialEq, Clone)]
128pub enum Recipient {
129/// Recipient is a regular peer and we know its peer id.
130Peer(PeerId),
131/// Recipient is a validator, we address it via this `AuthorityDiscoveryId`.
132Authority(AuthorityDiscoveryId),
133}
134135/// Responses received for an `OutgoingRequest`.
136pub type OutgoingResult<Res> = Result<Res, RequestError>;
137138impl<Req, FallbackReq> OutgoingRequest<Req, FallbackReq>
139where
140Req: IsRequest + Encode,
141 Req::Response: Decode,
142 FallbackReq: IsRequest + Encode,
143 FallbackReq::Response: Decode,
144{
145/// Create a new `OutgoingRequest`.
146 ///
147 /// It will contain a sender that is used by the networking for sending back responses. The
148 /// connected receiver is returned as the second element in the returned tuple.
149pub fn new(
150 peer: Recipient,
151 payload: Req,
152 ) -> (Self, impl Future<Output = OutgoingResult<Req::Response>>) {
153let (tx, rx) = oneshot::channel();
154let r = Self { peer, payload, pending_response: tx, fallback_request: None };
155 (r, receive_response::<Req>(rx.map(|r| r.map(|r| r.map(|(resp, _)| resp)))))
156 }
157158/// Create a new `OutgoingRequest` with a fallback in case the remote does not support this
159 /// protocol. Useful when adding a new version of a req-response protocol, to achieve
160 /// compatibility with the older version.
161 ///
162 /// Returns a raw `Vec<u8>` response over the channel. Use the associated `ProtocolName` to know
163 /// which request was the successful one and appropriately decode the response.
164pub fn new_with_fallback(
165 peer: Recipient,
166 payload: Req,
167 fallback_request: FallbackReq,
168 ) -> (Self, impl Future<Output = OutgoingResult<(Vec<u8>, ProtocolName)>>) {
169let (tx, rx) = oneshot::channel();
170let r = Self {
171 peer,
172 payload,
173 pending_response: tx,
174 fallback_request: Some((fallback_request, FallbackReq::PROTOCOL)),
175 };
176 (r, async { Ok(rx.await??) })
177 }
178179/// Encode a request into a `Vec<u8>`.
180 ///
181 /// As this throws away type information, we also return the `Protocol` this encoded request
182 /// adheres to.
183pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
184let OutgoingRequest { peer, payload, pending_response, fallback_request } = self;
185let encoded = OutgoingRequest {
186 peer,
187 payload: payload.encode(),
188 fallback_request: fallback_request.map(|(r, p)| (r.encode(), p)),
189 pending_response,
190 };
191 (Req::PROTOCOL, encoded)
192 }
193}
194195/// Future for actually receiving a typed response for an `OutgoingRequest`.
196async fn receive_response<Req>(
197 rec: impl Future<Output = Result<Result<Vec<u8>, network::RequestFailure>, oneshot::Canceled>>,
198) -> OutgoingResult<Req::Response>
199where
200Req: IsRequest,
201 Req::Response: Decode,
202{
203let raw = rec.await??;
204Ok(Decode::decode(&mut raw.as_ref())?)
205}