1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

use futures::{channel::oneshot, prelude::Future, FutureExt};

use codec::{Decode, Encode, Error as DecodingError};
use network::ProtocolName;

use sc_network as network;
use sc_network_types::PeerId;

use polkadot_primitives::AuthorityDiscoveryId;

use super::{v1, v2, IsRequest, Protocol};

/// All requests that can be sent to the network bridge via `NetworkBridgeTxMessage::SendRequest`.
#[derive(Debug)]
pub enum Requests {
	/// Request an availability chunk from a node.
	ChunkFetching(OutgoingRequest<v2::ChunkFetchingRequest, v1::ChunkFetchingRequest>),
	/// Fetch a collation from a collator which previously announced it.
	CollationFetchingV1(OutgoingRequest<v1::CollationFetchingRequest>),
	/// Fetch a PoV from a validator which previously sent out a seconded statement.
	PoVFetchingV1(OutgoingRequest<v1::PoVFetchingRequest>),
	/// Request full available data from a node.
	AvailableDataFetchingV1(OutgoingRequest<v1::AvailableDataFetchingRequest>),
	/// Requests for fetching large statements as part of statement distribution.
	StatementFetchingV1(OutgoingRequest<v1::StatementFetchingRequest>),
	/// Requests for notifying about an ongoing dispute.
	DisputeSendingV1(OutgoingRequest<v1::DisputeRequest>),

	/// Request a candidate and attestations.
	AttestedCandidateV2(OutgoingRequest<v2::AttestedCandidateRequest>),
	/// Fetch a collation from a collator which previously announced it.
	/// Compared to V1 it requires specifying which candidate is requested by its hash.
	CollationFetchingV2(OutgoingRequest<v2::CollationFetchingRequest>),
}

impl Requests {
	/// Encode the request.
	///
	/// The corresponding protocol is returned as well, as we are now leaving typed territory.
	///
	/// Note: `Requests` is just an enum collecting all supported requests supported by network
	/// bridge, it is never sent over the wire. This function just encodes the individual requests
	/// contained in the `enum`.
	pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
		match self {
			Self::ChunkFetching(r) => r.encode_request(),
			Self::CollationFetchingV1(r) => r.encode_request(),
			Self::CollationFetchingV2(r) => r.encode_request(),
			Self::PoVFetchingV1(r) => r.encode_request(),
			Self::AvailableDataFetchingV1(r) => r.encode_request(),
			Self::StatementFetchingV1(r) => r.encode_request(),
			Self::DisputeSendingV1(r) => r.encode_request(),
			Self::AttestedCandidateV2(r) => r.encode_request(),
		}
	}
}

/// Used by the network to send us a response to a request.
pub type ResponseSender = oneshot::Sender<Result<(Vec<u8>, ProtocolName), network::RequestFailure>>;

/// Any error that can occur when sending a request.
#[derive(Debug, thiserror::Error)]
pub enum RequestError {
	/// Response could not be decoded.
	#[error("Response could not be decoded: {0}")]
	InvalidResponse(#[from] DecodingError),

	/// Some error in substrate/libp2p happened.
	#[error("{0}")]
	NetworkError(#[from] network::RequestFailure),

	/// Response got canceled by networking.
	#[error("Response channel got canceled")]
	Canceled(#[from] oneshot::Canceled),
}

impl RequestError {
	/// Whether the error represents some kind of timeout condition.
	pub fn is_timed_out(&self) -> bool {
		match self {
			Self::Canceled(_) |
			Self::NetworkError(network::RequestFailure::Obsolete) |
			Self::NetworkError(network::RequestFailure::Network(
				network::OutboundFailure::Timeout,
			)) => true,
			_ => false,
		}
	}
}

/// A request to be sent to the network bridge, including a sender for sending responses/failures.
///
/// The network implementation will make use of that sender for informing the requesting subsystem
/// about responses/errors.
///
/// When using `Recipient::Peer`, keep in mind that no address (as in IP address and port) might
/// be known for that specific peer. You are encouraged to use `Peer` for peers that you are
/// expected to be already connected to.
/// When using `Recipient::Authority`, the addresses can be found thanks to the authority
/// discovery system.
#[derive(Debug)]
pub struct OutgoingRequest<Req, FallbackReq = Req> {
	/// Intended recipient of this request.
	pub peer: Recipient,
	/// The actual request to send over the wire.
	pub payload: Req,
	/// Optional fallback request and protocol.
	pub fallback_request: Option<(FallbackReq, Protocol)>,
	/// Sender which is used by networking to get us back a response.
	pub pending_response: ResponseSender,
}

/// Potential recipients of an outgoing request.
#[derive(Debug, Eq, Hash, PartialEq, Clone)]
pub enum Recipient {
	/// Recipient is a regular peer and we know its peer id.
	Peer(PeerId),
	/// Recipient is a validator, we address it via this `AuthorityDiscoveryId`.
	Authority(AuthorityDiscoveryId),
}

/// Responses received for an `OutgoingRequest`.
pub type OutgoingResult<Res> = Result<Res, RequestError>;

impl<Req, FallbackReq> OutgoingRequest<Req, FallbackReq>
where
	Req: IsRequest + Encode,
	Req::Response: Decode,
	FallbackReq: IsRequest + Encode,
	FallbackReq::Response: Decode,
{
	/// Create a new `OutgoingRequest`.
	///
	/// It will contain a sender that is used by the networking for sending back responses. The
	/// connected receiver is returned as the second element in the returned tuple.
	pub fn new(
		peer: Recipient,
		payload: Req,
	) -> (Self, impl Future<Output = OutgoingResult<Req::Response>>) {
		let (tx, rx) = oneshot::channel();
		let r = Self { peer, payload, pending_response: tx, fallback_request: None };
		(r, receive_response::<Req>(rx.map(|r| r.map(|r| r.map(|(resp, _)| resp)))))
	}

	/// Create a new `OutgoingRequest` with a fallback in case the remote does not support this
	/// protocol. Useful when adding a new version of a req-response protocol, to achieve
	/// compatibility with the older version.
	///
	/// Returns a raw `Vec<u8>` response over the channel. Use the associated `ProtocolName` to know
	/// which request was the successful one and appropriately decode the response.
	pub fn new_with_fallback(
		peer: Recipient,
		payload: Req,
		fallback_request: FallbackReq,
	) -> (Self, impl Future<Output = OutgoingResult<(Vec<u8>, ProtocolName)>>) {
		let (tx, rx) = oneshot::channel();
		let r = Self {
			peer,
			payload,
			pending_response: tx,
			fallback_request: Some((fallback_request, FallbackReq::PROTOCOL)),
		};
		(r, async { Ok(rx.await??) })
	}

	/// Encode a request into a `Vec<u8>`.
	///
	/// As this throws away type information, we also return the `Protocol` this encoded request
	/// adheres to.
	pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
		let OutgoingRequest { peer, payload, pending_response, fallback_request } = self;
		let encoded = OutgoingRequest {
			peer,
			payload: payload.encode(),
			fallback_request: fallback_request.map(|(r, p)| (r.encode(), p)),
			pending_response,
		};
		(Req::PROTOCOL, encoded)
	}
}

/// Future for actually receiving a typed response for an `OutgoingRequest`.
async fn receive_response<Req>(
	rec: impl Future<Output = Result<Result<Vec<u8>, network::RequestFailure>, oneshot::Canceled>>,
) -> OutgoingResult<Req::Response>
where
	Req: IsRequest,
	Req::Response: Decode,
{
	let raw = rec.await??;
	Ok(Decode::decode(&mut raw.as_ref())?)
}