referrerpolicy=no-referrer-when-downgrade

polkadot_node_network_protocol/request_response/incoming/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::marker::PhantomData;
18
19use futures::{channel::oneshot, StreamExt};
20
21use codec::{Decode, Encode};
22
23use sc_network::{config as netconfig, NetworkBackend};
24use sc_network_types::PeerId;
25use sp_runtime::traits::Block;
26
27use super::{IsRequest, ReqProtocolNames};
28use crate::UnifiedReputationChange;
29
30mod error;
31pub use error::{Error, FatalError, JfyiError, Result};
32
33/// A request coming in, including a sender for sending responses.
34///
35/// Typed `IncomingRequest`s, see `IncomingRequest::get_config_receiver` and substrate
36/// `NetworkConfiguration` for more information.
37#[derive(Debug)]
38pub struct IncomingRequest<Req> {
39	/// `PeerId` of sending peer.
40	pub peer: PeerId,
41	/// The sent request.
42	pub payload: Req,
43	/// Sender for sending response back.
44	pub pending_response: OutgoingResponseSender<Req>,
45}
46
47impl<Req> IncomingRequest<Req>
48where
49	Req: IsRequest + Decode + Encode,
50	Req::Response: Encode,
51{
52	/// Create configuration for `NetworkConfiguration::request_response_protocols` and a
53	/// corresponding typed receiver.
54	///
55	/// This Register that config with substrate networking and receive incoming requests via the
56	/// returned `IncomingRequestReceiver`.
57	pub fn get_config_receiver<B: Block, N: NetworkBackend<B, <B as Block>::Hash>>(
58		req_protocol_names: &ReqProtocolNames,
59	) -> (IncomingRequestReceiver<Req>, N::RequestResponseProtocolConfig) {
60		let (raw, cfg) = Req::PROTOCOL.get_config::<B, N>(req_protocol_names);
61		(IncomingRequestReceiver { raw, phantom: PhantomData {} }, cfg)
62	}
63
64	/// Create new `IncomingRequest`.
65	pub fn new(
66		peer: PeerId,
67		payload: Req,
68		pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
69	) -> Self {
70		Self {
71			peer,
72			payload,
73			pending_response: OutgoingResponseSender { pending_response, phantom: PhantomData {} },
74		}
75	}
76
77	/// Try building from raw substrate request.
78	///
79	/// This function will fail if the request cannot be decoded and will apply passed in
80	/// reputation changes in that case.
81	///
82	/// Params:
83	/// 		- The raw request to decode
84	/// 		- Reputation changes to apply for the peer in case decoding fails.
85	fn try_from_raw(
86		raw: sc_network::config::IncomingRequest,
87		reputation_changes: Vec<UnifiedReputationChange>,
88	) -> std::result::Result<Self, JfyiError> {
89		let sc_network::config::IncomingRequest { payload, peer, pending_response } = raw;
90		let payload = match Req::decode(&mut payload.as_ref()) {
91			Ok(payload) => payload,
92			Err(err) => {
93				let reputation_changes = reputation_changes.into_iter().map(|r| r.into()).collect();
94				let response = sc_network::config::OutgoingResponse {
95					result: Err(()),
96					reputation_changes,
97					sent_feedback: None,
98				};
99
100				if let Err(_) = pending_response.send(response) {
101					return Err(JfyiError::DecodingErrorNoReputationChange(peer, err))
102				}
103				return Err(JfyiError::DecodingError(peer, err))
104			},
105		};
106		Ok(Self::new(peer, payload, pending_response))
107	}
108
109	/// Convert into raw untyped substrate `IncomingRequest`.
110	///
111	/// This is mostly useful for testing.
112	pub fn into_raw(self) -> sc_network::config::IncomingRequest {
113		sc_network::config::IncomingRequest {
114			peer: self.peer,
115			payload: self.payload.encode(),
116			pending_response: self.pending_response.pending_response,
117		}
118	}
119
120	/// Send the response back.
121	///
122	/// Calls [`OutgoingResponseSender::send_response`].
123	pub fn send_response(self, resp: Req::Response) -> std::result::Result<(), Req::Response> {
124		self.pending_response.send_response(resp)
125	}
126
127	/// Send response with additional options.
128	///
129	/// Calls [`OutgoingResponseSender::send_outgoing_response`].
130	pub fn send_outgoing_response(
131		self,
132		resp: OutgoingResponse<<Req as IsRequest>::Response>,
133	) -> std::result::Result<(), ()> {
134		self.pending_response.send_outgoing_response(resp)
135	}
136}
137
138/// Sender for sending back responses on an `IncomingRequest`.
139#[derive(Debug)]
140pub struct OutgoingResponseSender<Req> {
141	pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
142	phantom: PhantomData<Req>,
143}
144
145impl<Req> OutgoingResponseSender<Req>
146where
147	Req: IsRequest + Decode,
148	Req::Response: Encode,
149{
150	/// Send the response back.
151	///
152	/// On success we return `Ok(())`, on error we return the not sent `Response`.
153	///
154	/// `netconfig::OutgoingResponse` exposes a way of modifying the peer's reputation. If needed we
155	/// can change this function to expose this feature as well.
156	pub fn send_response(self, resp: Req::Response) -> std::result::Result<(), Req::Response> {
157		self.pending_response
158			.send(netconfig::OutgoingResponse {
159				result: Ok(resp.encode()),
160				reputation_changes: Vec::new(),
161				sent_feedback: None,
162			})
163			.map_err(|_| resp)
164	}
165
166	/// Send response with additional options.
167	///
168	/// This variant allows for waiting for the response to be sent out, allows for changing peer's
169	/// reputation and allows for not sending a response at all (for only changing the peer's
170	/// reputation).
171	pub fn send_outgoing_response(
172		self,
173		resp: OutgoingResponse<<Req as IsRequest>::Response>,
174	) -> std::result::Result<(), ()> {
175		let OutgoingResponse { result, reputation_changes, sent_feedback } = resp;
176
177		let response = netconfig::OutgoingResponse {
178			result: result.map(|v| v.encode()),
179			reputation_changes: reputation_changes.into_iter().map(|c| c.into()).collect(),
180			sent_feedback,
181		};
182
183		self.pending_response.send(response).map_err(|_| ())
184	}
185}
186
187/// Typed variant of [`netconfig::OutgoingResponse`].
188///
189/// Responses to `IncomingRequest`s.
190pub struct OutgoingResponse<Response> {
191	/// The payload of the response.
192	///
193	/// `Err(())` if none is available e.g. due to an error while handling the request.
194	pub result: std::result::Result<Response, ()>,
195
196	/// Reputation changes accrued while handling the request. To be applied to the reputation of
197	/// the peer sending the request.
198	pub reputation_changes: Vec<UnifiedReputationChange>,
199
200	/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
201	/// peer.
202	pub sent_feedback: Option<oneshot::Sender<()>>,
203}
204
205/// Receiver for incoming requests.
206///
207/// Takes care of decoding and handling of invalid encoded requests.
208pub struct IncomingRequestReceiver<Req> {
209	raw: async_channel::Receiver<netconfig::IncomingRequest>,
210	phantom: PhantomData<Req>,
211}
212
213impl<Req> IncomingRequestReceiver<Req>
214where
215	Req: IsRequest + Decode + Encode,
216	Req::Response: Encode,
217{
218	/// Try to receive the next incoming request.
219	///
220	/// Any received request will be decoded, on decoding errors the provided reputation changes
221	/// will be applied and an error will be reported.
222	pub async fn recv<F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<Req>>
223	where
224		F: FnOnce() -> Vec<UnifiedReputationChange>,
225	{
226		let req = match self.raw.next().await {
227			None => return Err(FatalError::RequestChannelExhausted.into()),
228			Some(raw) => IncomingRequest::<Req>::try_from_raw(raw, reputation_changes())?,
229		};
230		Ok(req)
231	}
232}