sc_network/litep2p/shim/request_response/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Shim for litep2p's request-response implementation to make it work with `sc_network`'s
20//! request-response API.
21
22use crate::{
23	litep2p::shim::request_response::metrics::RequestResponseMetrics,
24	peer_store::PeerStoreProvider,
25	request_responses::{IncomingRequest, OutgoingResponse},
26	service::{metrics::Metrics, traits::RequestResponseConfig as RequestResponseConfigT},
27	IfDisconnected, OutboundFailure, ProtocolName, RequestFailure,
28};
29
30use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
31use litep2p::{
32	error::{ImmediateDialError, NegotiationError, SubstreamError},
33	protocol::request_response::{
34		DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
35		RequestResponseHandle,
36	},
37	types::RequestId,
38};
39
40use sc_network_types::PeerId;
41use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
42
43use std::{
44	collections::HashMap,
45	sync::Arc,
46	time::{Duration, Instant},
47};
48
49mod metrics;
50
51#[cfg(test)]
52mod tests;
53
54/// Logging target for the file.
55const LOG_TARGET: &str = "sub-libp2p::request-response";
56
57/// Type containing information related to an outbound request.
58#[derive(Debug)]
59pub struct OutboundRequest {
60	/// Peer ID.
61	peer: PeerId,
62
63	/// Request.
64	request: Vec<u8>,
65
66	/// Fallback request, if provided.
67	fallback_request: Option<(Vec<u8>, ProtocolName)>,
68
69	/// `oneshot::Sender` for sending the received response, or failure.
70	sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
71
72	/// What should the node do if `peer` is disconnected.
73	dial_behavior: IfDisconnected,
74}
75
76impl OutboundRequest {
77	/// Create new [`OutboundRequest`].
78	pub fn new(
79		peer: PeerId,
80		request: Vec<u8>,
81		sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
82		fallback_request: Option<(Vec<u8>, ProtocolName)>,
83		dial_behavior: IfDisconnected,
84	) -> Self {
85		OutboundRequest { peer, request, sender, fallback_request, dial_behavior }
86	}
87}
88
89/// Pending request.
90struct PendingRequest {
91	tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
92	started: Instant,
93	fallback_request: Option<(Vec<u8>, ProtocolName)>,
94}
95
96impl PendingRequest {
97	/// Create new [`PendingRequest`].
98	fn new(
99		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
100		started: Instant,
101		fallback_request: Option<(Vec<u8>, ProtocolName)>,
102	) -> Self {
103		Self { tx, started, fallback_request }
104	}
105}
106
107/// Request-response protocol configuration.
108///
109/// See [`RequestResponseConfiguration`](crate::request_response::ProtocolConfig) for more details.
110#[derive(Debug)]
111pub struct RequestResponseConfig {
112	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
113	pub protocol_name: ProtocolName,
114
115	/// Fallback on the wire protocol names to support.
116	pub fallback_names: Vec<ProtocolName>,
117
118	/// Maximum allowed size, in bytes, of a request.
119	pub max_request_size: u64,
120
121	/// Maximum allowed size, in bytes, of a response.
122	pub max_response_size: u64,
123
124	/// Duration after which emitted requests are considered timed out.
125	pub request_timeout: Duration,
126
127	/// Channel on which the networking service will send incoming requests.
128	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
129}
130
131impl RequestResponseConfig {
132	/// Create new [`RequestResponseConfig`].
133	pub(crate) fn new(
134		protocol_name: ProtocolName,
135		fallback_names: Vec<ProtocolName>,
136		max_request_size: u64,
137		max_response_size: u64,
138		request_timeout: Duration,
139		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
140	) -> Self {
141		Self {
142			protocol_name,
143			fallback_names,
144			max_request_size,
145			max_response_size,
146			request_timeout,
147			inbound_queue,
148		}
149	}
150}
151
152impl RequestResponseConfigT for RequestResponseConfig {
153	fn protocol_name(&self) -> &ProtocolName {
154		&self.protocol_name
155	}
156}
157
158/// Request-response protocol.
159///
160/// This is slightly different from the `RequestResponsesBehaviour` in that it is protocol-specific,
161/// meaning there is an instance of `RequestResponseProtocol` for each installed request-response
162/// protocol and that instance deals only with the requests and responses of that protocol, nothing
163/// else. It also differs from the other implementation by combining both inbound and outbound
164/// requests under one instance so all request-response-related behavior of any given protocol is
165/// handled through one instance of `RequestResponseProtocol`.
166pub struct RequestResponseProtocol {
167	/// Protocol name.
168	protocol: ProtocolName,
169
170	/// Handle to request-response protocol.
171	handle: RequestResponseHandle,
172
173	/// Inbound queue for sending received requests to protocol implementation in Polkadot SDK.
174	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
175
176	/// Handle to `Peerstore`.
177	peerstore_handle: Arc<dyn PeerStoreProvider>,
178
179	/// Pending responses.
180	pending_inbound_responses: HashMap<RequestId, PendingRequest>,
181
182	/// Pending outbound responses.
183	pending_outbound_responses: FuturesUnordered<
184		BoxFuture<'static, (litep2p::PeerId, RequestId, Result<OutgoingResponse, ()>, Instant)>,
185	>,
186
187	/// RX channel for receiving info for outbound requests.
188	request_rx: TracingUnboundedReceiver<OutboundRequest>,
189
190	/// Map of supported request-response protocols which are used to support fallback requests.
191	///
192	/// If negotiation for the main protocol fails and the request was sent with a fallback,
193	/// [`RequestResponseProtocol`] queries this map and sends the request that protocol for
194	/// processing.
195	request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
196
197	/// Metrics, if enabled.
198	metrics: RequestResponseMetrics,
199}
200
201impl RequestResponseProtocol {
202	/// Create new [`RequestResponseProtocol`].
203	pub fn new(
204		protocol: ProtocolName,
205		handle: RequestResponseHandle,
206		peerstore_handle: Arc<dyn PeerStoreProvider>,
207		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
208		request_rx: TracingUnboundedReceiver<OutboundRequest>,
209		request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
210		metrics: Option<Metrics>,
211	) -> Self {
212		Self {
213			handle,
214			request_rx,
215			request_tx,
216			inbound_queue,
217			peerstore_handle,
218			protocol: protocol.clone(),
219			pending_inbound_responses: HashMap::new(),
220			pending_outbound_responses: FuturesUnordered::new(),
221			metrics: RequestResponseMetrics::new(metrics, protocol),
222		}
223	}
224
225	/// Send `request` to `peer`.
226	async fn on_send_request(
227		&mut self,
228		peer: PeerId,
229		request: Vec<u8>,
230		fallback_request: Option<(Vec<u8>, ProtocolName)>,
231		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
232		connect: IfDisconnected,
233	) {
234		let dial_options = match connect {
235			IfDisconnected::TryConnect => DialOptions::Dial,
236			IfDisconnected::ImmediateError => DialOptions::Reject,
237		};
238
239		log::trace!(
240			target: LOG_TARGET,
241			"{}: send request to {:?} (fallback {:?}) (dial options: {:?})",
242			self.protocol,
243			peer,
244			fallback_request,
245			dial_options,
246		);
247
248		match self.handle.try_send_request(peer.into(), request, dial_options) {
249			Ok(request_id) => {
250				self.pending_inbound_responses
251					.insert(request_id, PendingRequest::new(tx, Instant::now(), fallback_request));
252			},
253			Err(error) => {
254				log::warn!(
255					target: LOG_TARGET,
256					"{}: failed to send request to {peer:?}: {error:?}",
257					self.protocol,
258				);
259
260				let _ = tx.send(Err(RequestFailure::Refused));
261				self.metrics.register_inbound_request_failure(error.to_string().as_ref());
262			},
263		}
264	}
265
266	/// Handle inbound request from `peer`
267	///
268	/// If the protocol is configured outbound only, reject the request immediately.
269	fn on_inbound_request(
270		&mut self,
271		peer: litep2p::PeerId,
272		fallback: Option<litep2p::ProtocolName>,
273		request_id: RequestId,
274		request: Vec<u8>,
275	) {
276		let Some(inbound_queue) = &self.inbound_queue else {
277			log::trace!(
278				target: LOG_TARGET,
279				"{}: rejecting inbound request from {peer:?}, protocol configured as outbound only",
280				self.protocol,
281			);
282
283			self.handle.reject_request(request_id);
284			return;
285		};
286
287		log::trace!(
288			target: LOG_TARGET,
289			"{}: request received from {peer:?} ({fallback:?} {request_id:?}), request size {:?}",
290			self.protocol,
291			request.len(),
292		);
293		let (tx, rx) = oneshot::channel();
294
295		match inbound_queue.try_send(IncomingRequest {
296			peer: peer.into(),
297			payload: request,
298			pending_response: tx,
299		}) {
300			Ok(_) => {
301				self.pending_outbound_responses.push(Box::pin(async move {
302					(peer, request_id, rx.await.map_err(|_| ()), Instant::now())
303				}));
304			},
305			Err(error) => {
306				log::trace!(
307					target: LOG_TARGET,
308					"{:?}: dropping request from {peer:?} ({request_id:?}), inbound queue full",
309					self.protocol,
310				);
311
312				self.handle.reject_request(request_id);
313				self.metrics.register_inbound_request_failure(error.to_string().as_ref());
314			},
315		}
316	}
317
318	/// Handle received inbound response.
319	fn on_inbound_response(
320		&mut self,
321		peer: litep2p::PeerId,
322		request_id: RequestId,
323		fallback: Option<litep2p::ProtocolName>,
324		response: Vec<u8>,
325	) {
326		match self.pending_inbound_responses.remove(&request_id) {
327			None => log::warn!(
328				target: LOG_TARGET,
329				"{:?}: response received for {peer:?} but {request_id:?} doesn't exist",
330				self.protocol,
331			),
332			Some(PendingRequest { tx, started, .. }) => {
333				log::trace!(
334					target: LOG_TARGET,
335					"{:?}: response received for {peer:?} ({request_id:?}), response size {:?}",
336					self.protocol,
337					response.len(),
338				);
339
340				let _ = tx.send(Ok((
341					response,
342					fallback.map_or_else(|| self.protocol.clone(), Into::into),
343				)));
344				self.metrics.register_outbound_request_success(started.elapsed());
345			},
346		}
347	}
348
349	/// Handle failed outbound request.
350	fn on_request_failed(
351		&mut self,
352		peer: litep2p::PeerId,
353		request_id: RequestId,
354		error: RequestResponseError,
355	) {
356		log::debug!(
357			target: LOG_TARGET,
358			"{:?}: request failed for {peer:?} ({request_id:?}): {error:?}",
359			self.protocol
360		);
361
362		let Some(PendingRequest { tx, fallback_request, .. }) =
363			self.pending_inbound_responses.remove(&request_id)
364		else {
365			log::warn!(
366				target: LOG_TARGET,
367				"{:?}: request failed for peer {peer:?} but {request_id:?} doesn't exist",
368				self.protocol,
369			);
370
371			return
372		};
373
374		let status = match error {
375			RequestResponseError::NotConnected =>
376				Some((RequestFailure::NotConnected, "not-connected")),
377			RequestResponseError::Rejected(reason) => {
378				let reason = match reason {
379					RejectReason::ConnectionClosed => "connection-closed",
380					RejectReason::SubstreamClosed => "substream-closed",
381					RejectReason::SubstreamOpenError(substream_error) => match substream_error {
382						SubstreamError::NegotiationError(NegotiationError::Timeout) =>
383							"substream-timeout",
384						_ => "substream-open-error",
385					},
386					RejectReason::DialFailed(None) => "dial-failed",
387					RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
388						"dial-already-connected",
389					RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
390						"dial-peerid-missing",
391					RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
392						"dial-tried-to-dial-self",
393					RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
394						"dial-no-address-available",
395					RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
396						"dial-task-closed",
397					RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
398						"dial-channel-clogged",
399				};
400
401				Some((RequestFailure::Refused, reason))
402			},
403			RequestResponseError::Timeout =>
404				Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
405			RequestResponseError::Canceled => {
406				log::debug!(
407					target: LOG_TARGET,
408					"{}: request canceled by local node to {peer:?} ({request_id:?})",
409					self.protocol,
410				);
411				None
412			},
413			RequestResponseError::TooLargePayload => {
414				log::warn!(
415					target: LOG_TARGET,
416					"{}: tried to send too large request to {peer:?} ({request_id:?})",
417					self.protocol,
418				);
419				Some((RequestFailure::Refused, "payload-too-large"))
420			},
421			RequestResponseError::UnsupportedProtocol => match fallback_request {
422				Some((request, protocol)) => match self.request_tx.get(&protocol) {
423					Some(sender) => {
424						log::debug!(
425							target: LOG_TARGET,
426							"{}: failed to negotiate protocol with {:?}, try fallback request: ({})",
427							self.protocol,
428							peer,
429							protocol,
430						);
431
432						let outbound_request = OutboundRequest::new(
433							peer.into(),
434							request,
435							tx,
436							None,
437							IfDisconnected::ImmediateError,
438						);
439
440						// since remote peer doesn't support the main protocol (`self.protocol`),
441						// try to send the request over a fallback protocol by creating a new
442						// `OutboundRequest` from the original data, now with the fallback request
443						// payload, and send it over to the (fallback) request handler like it was
444						// a normal request.
445						let _ = sender.unbounded_send(outbound_request);
446
447						return;
448					},
449					None => {
450						log::warn!(
451							target: LOG_TARGET,
452							"{}: fallback request provided but protocol ({}) doesn't exist (peer {:?})",
453							self.protocol,
454							protocol,
455							peer,
456						);
457
458						Some((RequestFailure::Refused, "invalid-fallback-protocol"))
459					},
460				},
461				None => Some((RequestFailure::Refused, "unsupported-protocol")),
462			},
463		};
464
465		if let Some((error, reason)) = status {
466			self.metrics.register_outbound_request_failure(reason);
467			let _ = tx.send(Err(error));
468		}
469	}
470
471	/// Handle outbound response.
472	fn on_outbound_response(
473		&mut self,
474		peer: litep2p::PeerId,
475		request_id: RequestId,
476		response: OutgoingResponse,
477		started: Instant,
478	) {
479		let OutgoingResponse { result, reputation_changes, sent_feedback } = response;
480
481		for change in reputation_changes {
482			log::trace!(target: LOG_TARGET, "{}: report {peer:?}: {change:?}", self.protocol);
483			self.peerstore_handle.report_peer(peer.into(), change);
484		}
485
486		match result {
487			Err(()) => {
488				log::debug!(
489					target: LOG_TARGET,
490					"{}: response rejected ({request_id:?}) for {peer:?}",
491					self.protocol,
492				);
493
494				self.handle.reject_request(request_id);
495				self.metrics.register_inbound_request_failure("rejected");
496			},
497			Ok(response) => {
498				log::trace!(
499					target: LOG_TARGET,
500					"{}: send response ({request_id:?}) to {peer:?}, response size {}",
501					self.protocol,
502					response.len(),
503				);
504
505				match sent_feedback {
506					None => self.handle.send_response(request_id, response),
507					Some(feedback) =>
508						self.handle.send_response_with_feedback(request_id, response, feedback),
509				}
510
511				self.metrics.register_inbound_request_success(started.elapsed());
512			},
513		}
514	}
515
516	/// Start running event loop of the request-response protocol.
517	pub async fn run(mut self) {
518		loop {
519			tokio::select! {
520				event = self.handle.next() => match event {
521					None => return,
522					Some(RequestResponseEvent::RequestReceived {
523						peer,
524						fallback,
525						request_id,
526						request,
527					}) => self.on_inbound_request(peer, fallback, request_id, request),
528					Some(RequestResponseEvent::ResponseReceived { peer, request_id, fallback, response }) => {
529						self.on_inbound_response(peer, request_id, fallback, response);
530					},
531					Some(RequestResponseEvent::RequestFailed { peer, request_id, error }) => {
532						self.on_request_failed(peer, request_id, error);
533					},
534				},
535				event = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => match event {
536					None => return,
537					Some((peer, request_id, Err(()), _)) => {
538						log::debug!(target: LOG_TARGET, "{}: reject request ({request_id:?}) from {peer:?}", self.protocol);
539
540						self.handle.reject_request(request_id);
541						self.metrics.register_inbound_request_failure("rejected");
542					}
543					Some((peer, request_id, Ok(response), started)) => {
544						self.on_outbound_response(peer, request_id, response, started);
545					}
546				},
547				event = self.request_rx.next() => match event {
548					None => return,
549					Some(outbound_request) => {
550						let OutboundRequest { peer, request, sender, dial_behavior, fallback_request } = outbound_request;
551
552						self.on_send_request(peer, request, fallback_request, sender, dial_behavior).await;
553					}
554				}
555			}
556		}
557	}
558}