referrerpolicy=no-referrer-when-downgrade

sc_network/
request_responses.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Collection of request-response protocols.
20//!
21//! The [`RequestResponsesBehaviour`] struct defined in this module provides support for zero or
22//! more so-called "request-response" protocols.
23//!
24//! A request-response protocol works in the following way:
25//!
26//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
27//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
28//! with the request itself. The remote then sends the size of the response as a LEB128 number,
29//! followed with the response.
30//!
31//! - Requests have a certain time limit before they time out. This time includes the time it
32//! takes to send/receive the request and response.
33//!
34//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
35//! is used to handle incoming requests.
36
37use crate::{
38	peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39	service::traits::RequestResponseConfig as RequestResponseConfigT,
40	types::ProtocolName,
41	ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46	core::{transport::PortUse, Endpoint, Multiaddr},
47	request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48	swarm::{
49		behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50		NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51	},
52	PeerId,
53};
54
55use std::{
56	collections::{hash_map::Entry, HashMap},
57	io, iter,
58	ops::Deref,
59	pin::Pin,
60	sync::Arc,
61	task::{Context, Poll},
62	time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67/// Logging target for the file.
68const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70/// Periodically check if requests are taking too long.
71const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73/// Possible failures occurring in the context of sending an outbound request and receiving the
74/// response.
75#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77	/// The request could not be sent because a dialing attempt failed.
78	#[error("Failed to dial the requested peer")]
79	DialFailure,
80	/// The request timed out before a response was received.
81	#[error("Timeout while waiting for a response")]
82	Timeout,
83	/// The connection closed before a response was received.
84	#[error("Connection was closed before a response was received")]
85	ConnectionClosed,
86	/// The remote supports none of the requested protocols.
87	#[error("The remote supports none of the requested protocols")]
88	UnsupportedProtocols,
89	/// An IO failure happened on an outbound stream.
90	#[error("An IO failure happened on an outbound stream")]
91	Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95	fn from(out: request_response::OutboundFailure) -> Self {
96		match out {
97			request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98			request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99			request_response::OutboundFailure::ConnectionClosed => {
100				OutboundFailure::ConnectionClosed
101			},
102			request_response::OutboundFailure::UnsupportedProtocols => {
103				OutboundFailure::UnsupportedProtocols
104			},
105			request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
106		}
107	}
108}
109
110/// Possible failures occurring in the context of receiving an inbound request and sending a
111/// response.
112#[derive(Debug, thiserror::Error)]
113pub enum InboundFailure {
114	/// The inbound request timed out, either while reading the incoming request or before a
115	/// response is sent
116	#[error("Timeout while receiving request or sending response")]
117	Timeout,
118	/// The connection closed before a response could be send.
119	#[error("Connection was closed before a response could be sent")]
120	ConnectionClosed,
121	/// The local peer supports none of the protocols requested by the remote.
122	#[error("The local peer supports none of the protocols requested by the remote")]
123	UnsupportedProtocols,
124	/// The local peer failed to respond to an inbound request
125	#[error("The response channel was dropped without sending a response to the remote")]
126	ResponseOmission,
127	/// An IO failure happened on an inbound stream.
128	#[error("An IO failure happened on an inbound stream")]
129	Io(Arc<io::Error>),
130}
131
132impl From<request_response::InboundFailure> for InboundFailure {
133	fn from(out: request_response::InboundFailure) -> Self {
134		match out {
135			request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
136			request_response::InboundFailure::Timeout => InboundFailure::Timeout,
137			request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
138			request_response::InboundFailure::UnsupportedProtocols => {
139				InboundFailure::UnsupportedProtocols
140			},
141			request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
142		}
143	}
144}
145
146/// Error in a request.
147#[derive(Debug, thiserror::Error)]
148#[allow(missing_docs)]
149pub enum RequestFailure {
150	#[error("We are not currently connected to the requested peer.")]
151	NotConnected,
152	#[error("Given protocol hasn't been registered.")]
153	UnknownProtocol,
154	#[error("The outbound request payload or parameters are invalid for the selected protocol.")]
155	InvalidRequest,
156	#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
157	Refused,
158	#[error("The remote replied, but the local node is no longer interested in the response.")]
159	Obsolete,
160	#[error("Problem on the network: {0}")]
161	Network(OutboundFailure),
162}
163
164/// Configuration for a single request-response protocol.
165#[derive(Debug, Clone)]
166pub struct ProtocolConfig {
167	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
168	pub name: ProtocolName,
169
170	/// Fallback on the wire protocol names to support.
171	pub fallback_names: Vec<ProtocolName>,
172
173	/// Maximum allowed size, in bytes, of a request.
174	///
175	/// Any request larger than this value will be declined as a way to avoid allocating too
176	/// much memory for it.
177	pub max_request_size: u64,
178
179	/// Maximum allowed size, in bytes, of a response.
180	///
181	/// Any response larger than this value will be declined as a way to avoid allocating too
182	/// much memory for it.
183	pub max_response_size: u64,
184
185	/// Duration after which emitted requests are considered timed out.
186	///
187	/// If you expect the response to come back quickly, you should set this to a smaller duration.
188	pub request_timeout: Duration,
189
190	/// Channel on which the networking service will send incoming requests.
191	///
192	/// Every time a peer sends a request to the local node using this protocol, the networking
193	/// service will push an element on this channel. The receiving side of this channel then has
194	/// to pull this element, process the request, and send back the response to send back to the
195	/// peer.
196	///
197	/// The size of the channel has to be carefully chosen. If the channel is full, the networking
198	/// service will discard the incoming request send back an error to the peer. Consequently,
199	/// the channel being full is an indicator that the node is overloaded.
200	///
201	/// You can typically set the size of the channel to `T / d`, where `T` is the
202	/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
203	/// build a response.
204	///
205	/// Can be `None` if the local node does not support answering incoming requests.
206	/// If this is `None`, then the local node will not advertise support for this protocol towards
207	/// other peers. If this is `Some` but the channel is closed, then the local node will
208	/// advertise support for this protocol, but any incoming request will lead to an error being
209	/// sent back.
210	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
211}
212
213impl RequestResponseConfigT for ProtocolConfig {
214	fn protocol_name(&self) -> &ProtocolName {
215		&self.name
216	}
217}
218
219/// A single request received by a peer on a request-response protocol.
220#[derive(Debug)]
221pub struct IncomingRequest {
222	/// Who sent the request.
223	pub peer: sc_network_types::PeerId,
224
225	/// Request sent by the remote. Will always be smaller than
226	/// [`ProtocolConfig::max_request_size`].
227	pub payload: Vec<u8>,
228
229	/// Channel to send back the response.
230	///
231	/// There are two ways to indicate that handling the request failed:
232	///
233	/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
234	///
235	/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
236	/// the given peer.
237	pub pending_response: oneshot::Sender<OutgoingResponse>,
238}
239
240/// Response for an incoming request to be send by a request protocol handler.
241#[derive(Debug)]
242pub struct OutgoingResponse {
243	/// The payload of the response.
244	///
245	/// `Err(())` if none is available e.g. due an error while handling the request.
246	pub result: Result<Vec<u8>, ()>,
247
248	/// Reputation changes accrued while handling the request. To be applied to the reputation of
249	/// the peer sending the request.
250	pub reputation_changes: Vec<ReputationChange>,
251
252	/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
253	/// peer.
254	///
255	/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
256	/// >			outgoing data for each TCP socket, and it is not possible for a user
257	/// >			application to inspect this buffer. This channel here is not actually notified
258	/// >			when the response has been fully sent out, but rather when it has fully been
259	/// >			written to the buffer managed by the operating system.
260	pub sent_feedback: Option<oneshot::Sender<()>>,
261}
262
263/// Information stored about a pending request.
264struct PendingRequest {
265	/// The time when the request was sent to the libp2p request-response protocol.
266	started_at: Instant,
267	/// The channel to send the response back to the caller.
268	///
269	/// This is wrapped in an `Option` to allow for the channel to be taken out
270	/// on force-detected timeouts.
271	response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
272	/// Fallback request to send if the primary request fails.
273	fallback_request: Option<(Vec<u8>, ProtocolName)>,
274}
275
276/// When sending a request, what to do on a disconnected recipient.
277#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
278pub enum IfDisconnected {
279	/// Try to connect to the peer.
280	TryConnect,
281	/// Just fail if the destination is not yet connected.
282	ImmediateError,
283}
284
285/// Convenience functions for `IfDisconnected`.
286impl IfDisconnected {
287	/// Shall we connect to a disconnected peer?
288	pub fn should_connect(self) -> bool {
289		match self {
290			Self::TryConnect => true,
291			Self::ImmediateError => false,
292		}
293	}
294}
295
296/// Event generated by the [`RequestResponsesBehaviour`].
297#[derive(Debug)]
298pub enum Event {
299	/// A remote sent a request and either we have successfully answered it or an error happened.
300	///
301	/// This event is generated for statistics purposes.
302	InboundRequest {
303		/// Peer which has emitted the request.
304		peer: PeerId,
305		/// Name of the protocol in question.
306		protocol: ProtocolName,
307		/// Whether handling the request was successful or unsuccessful.
308		///
309		/// When successful contains the time elapsed between when we received the request and when
310		/// we sent back the response. When unsuccessful contains the failure reason.
311		result: Result<Duration, ResponseFailure>,
312	},
313
314	/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
315	/// failed.
316	///
317	/// This event is generated for statistics purposes.
318	RequestFinished {
319		/// Peer that we send a request to.
320		peer: PeerId,
321		/// Name of the protocol in question.
322		protocol: ProtocolName,
323		/// Duration the request took.
324		duration: Duration,
325		/// Result of the request.
326		result: Result<(), RequestFailure>,
327	},
328
329	/// A request protocol handler issued reputation changes for the given peer.
330	ReputationChanges {
331		/// Peer whose reputation needs to be adjust.
332		peer: PeerId,
333		/// Reputation changes.
334		changes: Vec<ReputationChange>,
335	},
336}
337
338/// Combination of a protocol name and a request id.
339///
340/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
341/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
342/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
343/// [`ProtocolRequestId`]s.
344#[derive(Debug, Clone, PartialEq, Eq, Hash)]
345struct ProtocolRequestId<RequestId> {
346	protocol: ProtocolName,
347	request_id: RequestId,
348}
349
350impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
351	fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
352		Self { protocol, request_id }
353	}
354}
355
356/// Details of a request-response protocol.
357struct ProtocolDetails {
358	behaviour: Behaviour<GenericCodec>,
359	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
360	request_timeout: Duration,
361}
362
363/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
364pub struct RequestResponsesBehaviour {
365	/// The multiple sub-protocols, by name.
366	///
367	/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
368	/// "response builder" used to build responses for incoming requests.
369	protocols: HashMap<ProtocolName, ProtocolDetails>,
370
371	/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
372	pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
373
374	/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
375	/// start time and the response to send back to the remote.
376	pending_responses: stream::FuturesUnordered<
377		Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
378	>,
379
380	/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
381	pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
382
383	/// Whenever a response is received on `pending_responses`, insert a channel to be notified
384	/// when the request has been sent out.
385	send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
386
387	/// Primarily used to get a reputation of a node.
388	peer_store: Arc<dyn PeerStoreProvider>,
389
390	/// Interval to check that the requests are not taking too long.
391	///
392	/// We had issues in the past where libp2p did not produce a timeout event in due time.
393	///
394	/// For more details, see:
395	/// - <https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096>
396	periodic_request_check: tokio::time::Interval,
397}
398
399/// Generated by the response builder and waiting to be processed.
400struct RequestProcessingOutcome {
401	peer: PeerId,
402	request_id: InboundRequestId,
403	protocol: ProtocolName,
404	inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
405	response: OutgoingResponse,
406}
407
408impl RequestResponsesBehaviour {
409	/// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
410	/// the same protocol is passed twice.
411	pub fn new(
412		list: impl Iterator<Item = ProtocolConfig>,
413		peer_store: Arc<dyn PeerStoreProvider>,
414	) -> Result<Self, RegisterError> {
415		let mut protocols = HashMap::new();
416		for protocol in list {
417			let cfg = Config::default().with_request_timeout(protocol.request_timeout);
418
419			let protocol_support = if protocol.inbound_queue.is_some() {
420				ProtocolSupport::Full
421			} else {
422				ProtocolSupport::Outbound
423			};
424
425			let behaviour = Behaviour::with_codec(
426				GenericCodec {
427					max_request_size: protocol.max_request_size,
428					max_response_size: protocol.max_response_size,
429				},
430				iter::once(protocol.name.clone())
431					.chain(protocol.fallback_names)
432					.zip(iter::repeat(protocol_support)),
433				cfg,
434			);
435
436			match protocols.entry(protocol.name) {
437				Entry::Vacant(e) => e.insert(ProtocolDetails {
438					behaviour,
439					inbound_queue: protocol.inbound_queue,
440					request_timeout: protocol.request_timeout,
441				}),
442				Entry::Occupied(e) => {
443					return Err(RegisterError::DuplicateProtocol(e.key().clone()))
444				},
445			};
446		}
447
448		Ok(Self {
449			protocols,
450			pending_requests: Default::default(),
451			pending_responses: Default::default(),
452			pending_responses_arrival_time: Default::default(),
453			send_feedback: Default::default(),
454			peer_store,
455			periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
456		})
457	}
458
459	/// Initiates sending a request.
460	///
461	/// If there is no established connection to the target peer, the behavior is determined by the
462	/// choice of `connect`.
463	///
464	/// An error is returned if the protocol doesn't match one that has been registered.
465	pub fn send_request(
466		&mut self,
467		target: &PeerId,
468		protocol_name: ProtocolName,
469		request: Vec<u8>,
470		fallback_request: Option<(Vec<u8>, ProtocolName)>,
471		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
472		connect: IfDisconnected,
473	) {
474		log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
475
476		if let Some(ProtocolDetails { behaviour, .. }) =
477			self.protocols.get_mut(protocol_name.deref())
478		{
479			Self::send_request_inner(
480				behaviour,
481				&mut self.pending_requests,
482				target,
483				protocol_name,
484				request,
485				fallback_request,
486				pending_response,
487				connect,
488			)
489		} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
490			log::debug!(
491				target: LOG_TARGET,
492				"Unknown protocol {:?}. At the same time local \
493				 node is no longer interested in the result.",
494				protocol_name,
495			);
496		}
497	}
498
499	fn send_request_inner(
500		behaviour: &mut Behaviour<GenericCodec>,
501		pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
502		target: &PeerId,
503		protocol_name: ProtocolName,
504		request: Vec<u8>,
505		fallback_request: Option<(Vec<u8>, ProtocolName)>,
506		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
507		connect: IfDisconnected,
508	) {
509		if behaviour.is_connected(target) || connect.should_connect() {
510			let request_id = behaviour.send_request(target, request);
511			let prev_req_id = pending_requests.insert(
512				(protocol_name.to_string().into(), request_id).into(),
513				PendingRequest {
514					started_at: Instant::now(),
515					response_tx: Some(pending_response),
516					fallback_request,
517				},
518			);
519			debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
520		} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
521			log::debug!(
522				target: LOG_TARGET,
523				"Not connected to peer {:?}. At the same time local \
524				 node is no longer interested in the result.",
525				target,
526			);
527		}
528	}
529}
530
531impl NetworkBehaviour for RequestResponsesBehaviour {
532	type ConnectionHandler =
533		MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
534	type ToSwarm = Event;
535
536	fn handle_pending_inbound_connection(
537		&mut self,
538		_connection_id: ConnectionId,
539		_local_addr: &Multiaddr,
540		_remote_addr: &Multiaddr,
541	) -> Result<(), ConnectionDenied> {
542		Ok(())
543	}
544
545	fn handle_pending_outbound_connection(
546		&mut self,
547		_connection_id: ConnectionId,
548		_maybe_peer: Option<PeerId>,
549		_addresses: &[Multiaddr],
550		_effective_role: Endpoint,
551	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
552		Ok(Vec::new())
553	}
554
555	fn handle_established_inbound_connection(
556		&mut self,
557		connection_id: ConnectionId,
558		peer: PeerId,
559		local_addr: &Multiaddr,
560		remote_addr: &Multiaddr,
561	) -> Result<THandler<Self>, ConnectionDenied> {
562		let iter =
563			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
564				if let Ok(handler) = behaviour.handle_established_inbound_connection(
565					connection_id,
566					peer,
567					local_addr,
568					remote_addr,
569				) {
570					Some((p.to_string(), handler))
571				} else {
572					None
573				}
574			});
575
576		Ok(MultiHandler::try_from_iter(iter).expect(
577			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
578			 which is the only possible error; qed",
579		))
580	}
581
582	fn handle_established_outbound_connection(
583		&mut self,
584		connection_id: ConnectionId,
585		peer: PeerId,
586		addr: &Multiaddr,
587		role_override: Endpoint,
588		port_use: PortUse,
589	) -> Result<THandler<Self>, ConnectionDenied> {
590		let iter =
591			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
592				if let Ok(handler) = behaviour.handle_established_outbound_connection(
593					connection_id,
594					peer,
595					addr,
596					role_override,
597					port_use,
598				) {
599					Some((p.to_string(), handler))
600				} else {
601					None
602				}
603			});
604
605		Ok(MultiHandler::try_from_iter(iter).expect(
606			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
607			 which is the only possible error; qed",
608		))
609	}
610
611	fn on_swarm_event(&mut self, event: FromSwarm) {
612		for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
613			behaviour.on_swarm_event(event);
614		}
615	}
616
617	fn on_connection_handler_event(
618		&mut self,
619		peer_id: PeerId,
620		connection_id: ConnectionId,
621		event: THandlerOutEvent<Self>,
622	) {
623		let p_name = event.0;
624		if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
625			return behaviour.on_connection_handler_event(peer_id, connection_id, event.1);
626		} else {
627			log::warn!(
628				target: LOG_TARGET,
629				"on_connection_handler_event: no request-response instance registered for protocol {:?}",
630				p_name
631			);
632		}
633	}
634
635	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
636		'poll_all: loop {
637			// Poll the periodic request check.
638			if self.periodic_request_check.poll_tick(cx).is_ready() {
639				self.pending_requests.retain(|id, req| {
640					let Some(ProtocolDetails { request_timeout, .. }) =
641						self.protocols.get(&id.protocol)
642					else {
643						log::warn!(
644							target: LOG_TARGET,
645							"Request {id:?} has no protocol registered.",
646						);
647
648						if let Some(response_tx) = req.response_tx.take() {
649							if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
650								log::debug!(
651									target: LOG_TARGET,
652									"Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
653								);
654							}
655						}
656						return false
657					};
658
659					let elapsed = req.started_at.elapsed();
660					if elapsed > *request_timeout {
661						log::debug!(
662							target: LOG_TARGET,
663							"Request {id:?} force detected as timeout.",
664						);
665
666						if let Some(response_tx) = req.response_tx.take() {
667							if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
668								log::debug!(
669									target: LOG_TARGET,
670									"Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
671								);
672							}
673						}
674
675						false
676					} else {
677						true
678					}
679				});
680			}
681
682			// Poll to see if any response is ready to be sent back.
683			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
684				let RequestProcessingOutcome {
685					peer,
686					request_id,
687					protocol: protocol_name,
688					inner_channel,
689					response: OutgoingResponse { result, reputation_changes, sent_feedback },
690				} = match outcome {
691					Some(outcome) => outcome,
692					// The response builder was too busy or handling the request failed. This is
693					// later on reported as a `InboundFailure::Omission`.
694					None => continue,
695				};
696
697				if let Ok(payload) = result {
698					if let Some(ProtocolDetails { behaviour, .. }) =
699						self.protocols.get_mut(&*protocol_name)
700					{
701						log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
702
703						if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
704							// Note: Failure is handled further below when receiving
705							// `InboundFailure` event from request-response [`Behaviour`].
706							log::debug!(
707								target: LOG_TARGET,
708								"Failed to send response for {:?} on protocol {:?} due to a \
709								 timeout or due to the connection to the peer being closed. \
710								 Dropping response",
711								request_id, protocol_name,
712							);
713						} else if let Some(sent_feedback) = sent_feedback {
714							self.send_feedback
715								.insert((protocol_name, request_id).into(), sent_feedback);
716						}
717					}
718				}
719
720				if !reputation_changes.is_empty() {
721					return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
722						peer,
723						changes: reputation_changes,
724					}));
725				}
726			}
727
728			let mut fallback_requests = vec![];
729
730			// Poll request-responses protocols.
731			for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
732			{
733				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
734					let ev = match ev {
735						// Main events we are interested in.
736						ToSwarm::GenerateEvent(ev) => ev,
737
738						// Other events generated by the underlying behaviour are transparently
739						// passed through.
740						ToSwarm::Dial { opts } => {
741							if opts.get_peer_id().is_none() {
742								log::error!(
743									target: LOG_TARGET,
744									"The request-response isn't supposed to start dialing addresses"
745								);
746							}
747							return Poll::Ready(ToSwarm::Dial { opts });
748						},
749						event => {
750							return Poll::Ready(
751								event.map_in(|event| ((*protocol).to_string(), event)).map_out(
752									|_| {
753										unreachable!(
754											"`GenerateEvent` is handled in a branch above; qed"
755										)
756									},
757								),
758							);
759						},
760					};
761
762					match ev {
763						// Received a request from a remote.
764						request_response::Event::Message {
765							peer,
766							message: Message::Request { request_id, request, channel, .. },
767						} => {
768							self.pending_responses_arrival_time
769								.insert((protocol.clone(), request_id).into(), Instant::now());
770
771							let reputation = self.peer_store.peer_reputation(&peer.into());
772
773							if reputation < BANNED_THRESHOLD {
774								log::debug!(
775									target: LOG_TARGET,
776									"Cannot handle requests from a node with a low reputation {}: {}",
777									peer,
778									reputation,
779								);
780								continue 'poll_protocol;
781							}
782
783							let (tx, rx) = oneshot::channel();
784
785							// Submit the request to the "response builder" passed by the user at
786							// initialization.
787							if let Some(resp_builder) = inbound_queue {
788								// If the response builder is too busy, silently drop `tx`. This
789								// will be reported by the corresponding request-response
790								// [`Behaviour`] through an `InboundFailure::Omission` event.
791								// Note that we use `async_channel::bounded` and not `mpsc::channel`
792								// because the latter allocates an extra slot for every cloned
793								// sender.
794								let _ = resp_builder.try_send(IncomingRequest {
795									peer: peer.into(),
796									payload: request,
797									pending_response: tx,
798								});
799							} else {
800								debug_assert!(false, "Received message on outbound-only protocol.");
801							}
802
803							let protocol = protocol.clone();
804
805							self.pending_responses.push(Box::pin(async move {
806								// The `tx` created above can be dropped if we are not capable of
807								// processing this request, which is reflected as a
808								// `InboundFailure::Omission` event.
809								rx.await.map_or(None, |response| {
810									Some(RequestProcessingOutcome {
811										peer,
812										request_id,
813										protocol,
814										inner_channel: channel,
815										response,
816									})
817								})
818							}));
819
820							// This `continue` makes sure that `pending_responses` gets polled
821							// after we have added the new element.
822							continue 'poll_all;
823						},
824
825						// Received a response from a remote to one of our requests.
826						request_response::Event::Message {
827							peer,
828							message: Message::Response { request_id, response },
829							..
830						} => {
831							let (started, delivered) = match self
832								.pending_requests
833								.remove(&(protocol.clone(), request_id).into())
834							{
835								Some(PendingRequest {
836									started_at,
837									response_tx: Some(response_tx),
838									..
839								}) => {
840									log::trace!(
841										target: LOG_TARGET,
842										"received response from {peer} ({protocol:?}), {} bytes",
843										response.as_ref().map_or(0usize, |response| response.len()),
844									);
845
846									let delivered = response_tx
847										.send(
848											response
849												.map_err(|()| RequestFailure::Refused)
850												.map(|resp| (resp, protocol.clone())),
851										)
852										.map_err(|_| RequestFailure::Obsolete);
853									(started_at, delivered)
854								},
855								_ => {
856									log::debug!(
857										target: LOG_TARGET,
858										"Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
859										request_id,
860										peer,
861									);
862									continue;
863								},
864							};
865
866							let out = Event::RequestFinished {
867								peer,
868								protocol: protocol.clone(),
869								duration: started.elapsed(),
870								result: delivered,
871							};
872
873							return Poll::Ready(ToSwarm::GenerateEvent(out));
874						},
875
876						// One of our requests has failed.
877						request_response::Event::OutboundFailure {
878							peer,
879							request_id,
880							error,
881							..
882						} => {
883							let error = OutboundFailure::from(error);
884							let started = match self
885								.pending_requests
886								.remove(&(protocol.clone(), request_id).into())
887							{
888								Some(PendingRequest {
889									started_at,
890									response_tx: Some(response_tx),
891									fallback_request,
892								}) => {
893									// Try using the fallback request if the protocol was not
894									// supported.
895									if matches!(error, OutboundFailure::UnsupportedProtocols) {
896										if let Some((fallback_request, fallback_protocol)) =
897											fallback_request
898										{
899											log::trace!(
900												target: LOG_TARGET,
901												"Request with id {:?} failed. Trying the fallback protocol. {}",
902												request_id,
903												fallback_protocol.deref()
904											);
905											fallback_requests.push((
906												peer,
907												fallback_protocol,
908												fallback_request,
909												response_tx,
910											));
911											continue;
912										}
913									}
914
915									if response_tx
916										.send(Err(RequestFailure::Network(error.clone())))
917										.is_err()
918									{
919										log::debug!(
920											target: LOG_TARGET,
921											"Request with id {:?} failed. At the same time local \
922											 node is no longer interested in the result.",
923											request_id,
924										);
925									}
926									started_at
927								},
928								_ => {
929									log::debug!(
930										target: LOG_TARGET,
931										"Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
932										request_id,
933										error,
934										peer
935									);
936									continue;
937								},
938							};
939
940							let out = Event::RequestFinished {
941								peer,
942								protocol: protocol.clone(),
943								duration: started.elapsed(),
944								result: Err(RequestFailure::Network(error)),
945							};
946
947							return Poll::Ready(ToSwarm::GenerateEvent(out));
948						},
949
950						// An inbound request failed, either while reading the request or due to
951						// failing to send a response.
952						request_response::Event::InboundFailure {
953							request_id, peer, error, ..
954						} => {
955							self.pending_responses_arrival_time
956								.remove(&(protocol.clone(), request_id).into());
957							self.send_feedback.remove(&(protocol.clone(), request_id).into());
958							let out = Event::InboundRequest {
959								peer,
960								protocol: protocol.clone(),
961								result: Err(ResponseFailure::Network(error.into())),
962							};
963							return Poll::Ready(ToSwarm::GenerateEvent(out));
964						},
965
966						// A response to an inbound request has been sent.
967						request_response::Event::ResponseSent { request_id, peer } => {
968							let arrival_time = self
969								.pending_responses_arrival_time
970								.remove(&(protocol.clone(), request_id).into())
971								.map(|t| t.elapsed())
972								.expect(
973									"Time is added for each inbound request on arrival and only \
974									 removed on success (`ResponseSent`) or failure \
975									 (`InboundFailure`). One can not receive a success event for a \
976									 request that either never arrived, or that has previously \
977									 failed; qed.",
978								);
979
980							if let Some(send_feedback) =
981								self.send_feedback.remove(&(protocol.clone(), request_id).into())
982							{
983								let _ = send_feedback.send(());
984							}
985
986							let out = Event::InboundRequest {
987								peer,
988								protocol: protocol.clone(),
989								result: Ok(arrival_time),
990							};
991
992							return Poll::Ready(ToSwarm::GenerateEvent(out));
993						},
994					};
995				}
996			}
997
998			// Send out fallback requests.
999			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
1000				if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
1001					Self::send_request_inner(
1002						behaviour,
1003						&mut self.pending_requests,
1004						&peer,
1005						protocol,
1006						request,
1007						None,
1008						pending_response,
1009						// We can error if not connected because the
1010						// previous attempt would have tried to establish a
1011						// connection already or errored and we wouldn't have gotten here.
1012						IfDisconnected::ImmediateError,
1013					);
1014				}
1015			}
1016
1017			break Poll::Pending;
1018		}
1019	}
1020}
1021
1022/// Error when registering a protocol.
1023#[derive(Debug, thiserror::Error)]
1024pub enum RegisterError {
1025	/// A protocol has been specified multiple times.
1026	#[error("{0}")]
1027	DuplicateProtocol(ProtocolName),
1028}
1029
1030/// Error when processing a request sent by a remote.
1031#[derive(Debug, thiserror::Error)]
1032pub enum ResponseFailure {
1033	/// Problem on the network.
1034	#[error("Problem on the network: {0}")]
1035	Network(InboundFailure),
1036}
1037
1038/// Implements the libp2p [`Codec`] trait. Defines how streams of bytes are turned
1039/// into requests and responses and vice-versa.
1040#[derive(Debug, Clone)]
1041#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
1042pub struct GenericCodec {
1043	max_request_size: u64,
1044	max_response_size: u64,
1045}
1046
1047#[async_trait::async_trait]
1048impl Codec for GenericCodec {
1049	type Protocol = ProtocolName;
1050	type Request = Vec<u8>;
1051	type Response = Result<Vec<u8>, ()>;
1052
1053	async fn read_request<T>(
1054		&mut self,
1055		_: &Self::Protocol,
1056		mut io: &mut T,
1057	) -> io::Result<Self::Request>
1058	where
1059		T: AsyncRead + Unpin + Send,
1060	{
1061		// Read the length.
1062		let length = unsigned_varint::aio::read_usize(&mut io)
1063			.await
1064			.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1065		if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1066			return Err(io::Error::new(
1067				io::ErrorKind::InvalidInput,
1068				format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1069			));
1070		}
1071
1072		// Read the payload.
1073		let mut buffer = vec![0; length];
1074		io.read_exact(&mut buffer).await?;
1075		Ok(buffer)
1076	}
1077
1078	async fn read_response<T>(
1079		&mut self,
1080		_: &Self::Protocol,
1081		mut io: &mut T,
1082	) -> io::Result<Self::Response>
1083	where
1084		T: AsyncRead + Unpin + Send,
1085	{
1086		// Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
1087		// considered as a protocol error and will result in the entire connection being closed.
1088		// Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1089		// that this response is an error.
1090
1091		// Read the length.
1092		let length = match unsigned_varint::aio::read_usize(&mut io).await {
1093			Ok(l) => l,
1094			Err(unsigned_varint::io::ReadError::Io(err))
1095				if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1096			{
1097				return Ok(Err(()))
1098			},
1099			Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1100		};
1101
1102		if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1103			return Err(io::Error::new(
1104				io::ErrorKind::InvalidInput,
1105				format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1106			));
1107		}
1108
1109		// Read the payload.
1110		let mut buffer = vec![0; length];
1111		io.read_exact(&mut buffer).await?;
1112		Ok(Ok(buffer))
1113	}
1114
1115	async fn write_request<T>(
1116		&mut self,
1117		_: &Self::Protocol,
1118		io: &mut T,
1119		req: Self::Request,
1120	) -> io::Result<()>
1121	where
1122		T: AsyncWrite + Unpin + Send,
1123	{
1124		// TODO: check the length?
1125		// Write the length.
1126		{
1127			let mut buffer = unsigned_varint::encode::usize_buffer();
1128			io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1129		}
1130
1131		// Write the payload.
1132		io.write_all(&req).await?;
1133
1134		io.close().await?;
1135		Ok(())
1136	}
1137
1138	async fn write_response<T>(
1139		&mut self,
1140		_: &Self::Protocol,
1141		io: &mut T,
1142		res: Self::Response,
1143	) -> io::Result<()>
1144	where
1145		T: AsyncWrite + Unpin + Send,
1146	{
1147		// If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1148		if let Ok(res) = res {
1149			// TODO: check the length?
1150			// Write the length.
1151			{
1152				let mut buffer = unsigned_varint::encode::usize_buffer();
1153				io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1154			}
1155
1156			// Write the payload.
1157			io.write_all(&res).await?;
1158		}
1159
1160		io.close().await?;
1161		Ok(())
1162	}
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167	use super::*;
1168
1169	use crate::mock::MockPeerStore;
1170	use assert_matches::assert_matches;
1171	use futures::channel::oneshot;
1172	use libp2p::{
1173		core::{
1174			transport::{MemoryTransport, Transport},
1175			upgrade,
1176		},
1177		identity::Keypair,
1178		noise,
1179		swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1180		Multiaddr,
1181	};
1182	use std::{iter, time::Duration};
1183
1184	struct TokioExecutor;
1185	impl Executor for TokioExecutor {
1186		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1187			tokio::spawn(f);
1188		}
1189	}
1190
1191	fn build_swarm(
1192		list: impl Iterator<Item = ProtocolConfig>,
1193	) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1194		let keypair = Keypair::generate_ed25519();
1195
1196		let transport = MemoryTransport::new()
1197			.upgrade(upgrade::Version::V1)
1198			.authenticate(noise::Config::new(&keypair).unwrap())
1199			.multiplex(libp2p::yamux::Config::default())
1200			.boxed();
1201
1202		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1203
1204		let mut swarm = Swarm::new(
1205			transport,
1206			behaviour,
1207			keypair.public().to_peer_id(),
1208			SwarmConfig::with_executor(TokioExecutor {})
1209				// This is taken care of by notification protocols in non-test environment
1210				// It is very slow in test environment for some reason, hence larger timeout
1211				.with_idle_connection_timeout(Duration::from_secs(10)),
1212		);
1213
1214		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1215
1216		swarm.listen_on(listen_addr.clone()).unwrap();
1217
1218		(swarm, listen_addr)
1219	}
1220
1221	#[tokio::test]
1222	async fn basic_request_response_works() {
1223		let protocol_name = ProtocolName::from("/test/req-resp/1");
1224
1225		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1226		let mut swarms = (0..2)
1227			.map(|_| {
1228				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1229
1230				tokio::spawn(async move {
1231					while let Some(rq) = rx.next().await {
1232						let (fb_tx, fb_rx) = oneshot::channel();
1233						assert_eq!(rq.payload, b"this is a request");
1234						let _ = rq.pending_response.send(super::OutgoingResponse {
1235							result: Ok(b"this is a response".to_vec()),
1236							reputation_changes: Vec::new(),
1237							sent_feedback: Some(fb_tx),
1238						});
1239						fb_rx.await.unwrap();
1240					}
1241				});
1242
1243				let protocol_config = ProtocolConfig {
1244					name: protocol_name.clone(),
1245					fallback_names: Vec::new(),
1246					max_request_size: 1024,
1247					max_response_size: 1024 * 1024,
1248					request_timeout: Duration::from_secs(30),
1249					inbound_queue: Some(tx),
1250				};
1251
1252				build_swarm(iter::once(protocol_config))
1253			})
1254			.collect::<Vec<_>>();
1255
1256		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1257		// this test, so they wouldn't connect to each other.
1258		{
1259			let dial_addr = swarms[1].1.clone();
1260			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1261		}
1262
1263		let (mut swarm, _) = swarms.remove(0);
1264		// Running `swarm[0]` in the background.
1265		tokio::spawn(async move {
1266			loop {
1267				match swarm.select_next_some().await {
1268					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1269						result.unwrap();
1270					},
1271					_ => {},
1272				}
1273			}
1274		});
1275
1276		// Remove and run the remaining swarm.
1277		let (mut swarm, _) = swarms.remove(0);
1278		let mut response_receiver = None;
1279
1280		loop {
1281			match swarm.select_next_some().await {
1282				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1283					let (sender, receiver) = oneshot::channel();
1284					swarm.behaviour_mut().send_request(
1285						&peer_id,
1286						protocol_name.clone(),
1287						b"this is a request".to_vec(),
1288						None,
1289						sender,
1290						IfDisconnected::ImmediateError,
1291					);
1292					assert!(response_receiver.is_none());
1293					response_receiver = Some(receiver);
1294				},
1295				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1296					result.unwrap();
1297					break;
1298				},
1299				_ => {},
1300			}
1301		}
1302
1303		assert_eq!(
1304			response_receiver.unwrap().await.unwrap().unwrap(),
1305			(b"this is a response".to_vec(), protocol_name)
1306		);
1307	}
1308
1309	#[tokio::test]
1310	async fn max_response_size_exceeded() {
1311		let protocol_name = ProtocolName::from("/test/req-resp/1");
1312
1313		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1314		let mut swarms = (0..2)
1315			.map(|_| {
1316				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1317
1318				tokio::spawn(async move {
1319					while let Some(rq) = rx.next().await {
1320						assert_eq!(rq.payload, b"this is a request");
1321						let _ = rq.pending_response.send(super::OutgoingResponse {
1322							result: Ok(b"this response exceeds the limit".to_vec()),
1323							reputation_changes: Vec::new(),
1324							sent_feedback: None,
1325						});
1326					}
1327				});
1328
1329				let protocol_config = ProtocolConfig {
1330					name: protocol_name.clone(),
1331					fallback_names: Vec::new(),
1332					max_request_size: 1024,
1333					max_response_size: 8, // <-- important for the test
1334					request_timeout: Duration::from_secs(30),
1335					inbound_queue: Some(tx),
1336				};
1337
1338				build_swarm(iter::once(protocol_config))
1339			})
1340			.collect::<Vec<_>>();
1341
1342		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1343		// this test, so they wouldn't connect to each other.
1344		{
1345			let dial_addr = swarms[1].1.clone();
1346			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1347		}
1348
1349		// Running `swarm[0]` in the background until a `InboundRequest` event happens,
1350		// which is a hint about the test having ended.
1351		let (mut swarm, _) = swarms.remove(0);
1352		tokio::spawn(async move {
1353			loop {
1354				match swarm.select_next_some().await {
1355					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1356						assert!(result.is_ok());
1357					},
1358					SwarmEvent::ConnectionClosed { .. } => {
1359						break;
1360					},
1361					_ => {},
1362				}
1363			}
1364		});
1365
1366		// Remove and run the remaining swarm.
1367		let (mut swarm, _) = swarms.remove(0);
1368
1369		let mut response_receiver = None;
1370
1371		loop {
1372			match swarm.select_next_some().await {
1373				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1374					let (sender, receiver) = oneshot::channel();
1375					swarm.behaviour_mut().send_request(
1376						&peer_id,
1377						protocol_name.clone(),
1378						b"this is a request".to_vec(),
1379						None,
1380						sender,
1381						IfDisconnected::ImmediateError,
1382					);
1383					assert!(response_receiver.is_none());
1384					response_receiver = Some(receiver);
1385				},
1386				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1387					assert!(result.is_err());
1388					break;
1389				},
1390				_ => {},
1391			}
1392		}
1393
1394		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1395			RequestFailure::Network(OutboundFailure::Io(_)) => {},
1396			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1397		}
1398	}
1399
1400	/// A `RequestId` is a unique identifier among either all inbound or all outbound requests for
1401	/// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across
1402	/// multiple [`RequestResponsesBehaviour`] behaviours. Thus, when handling `RequestId` in the
1403	/// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the
1404	/// protocol name with the `RequestId` to get a unique request identifier.
1405	///
1406	/// This test ensures that two requests on different protocols can be handled concurrently
1407	/// without a `RequestId` collision.
1408	///
1409	/// See [`ProtocolRequestId`] for additional information.
1410	#[tokio::test]
1411	async fn request_id_collision() {
1412		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1413		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1414
1415		let mut swarm_1 = {
1416			let protocol_configs = vec![
1417				ProtocolConfig {
1418					name: protocol_name_1.clone(),
1419					fallback_names: Vec::new(),
1420					max_request_size: 1024,
1421					max_response_size: 1024 * 1024,
1422					request_timeout: Duration::from_secs(30),
1423					inbound_queue: None,
1424				},
1425				ProtocolConfig {
1426					name: protocol_name_2.clone(),
1427					fallback_names: Vec::new(),
1428					max_request_size: 1024,
1429					max_response_size: 1024 * 1024,
1430					request_timeout: Duration::from_secs(30),
1431					inbound_queue: None,
1432				},
1433			];
1434
1435			build_swarm(protocol_configs.into_iter()).0
1436		};
1437
1438		let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1439			let (tx_1, rx_1) = async_channel::bounded(64);
1440			let (tx_2, rx_2) = async_channel::bounded(64);
1441
1442			let protocol_configs = vec![
1443				ProtocolConfig {
1444					name: protocol_name_1.clone(),
1445					fallback_names: Vec::new(),
1446					max_request_size: 1024,
1447					max_response_size: 1024 * 1024,
1448					request_timeout: Duration::from_secs(30),
1449					inbound_queue: Some(tx_1),
1450				},
1451				ProtocolConfig {
1452					name: protocol_name_2.clone(),
1453					fallback_names: Vec::new(),
1454					max_request_size: 1024,
1455					max_response_size: 1024 * 1024,
1456					request_timeout: Duration::from_secs(30),
1457					inbound_queue: Some(tx_2),
1458				},
1459			];
1460
1461			let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1462
1463			(swarm, rx_1, rx_2, listen_addr)
1464		};
1465
1466		// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
1467		// so they wouldn't connect to each other.
1468		swarm_1.dial(listen_add_2).unwrap();
1469
1470		// Run swarm 2 in the background, receiving two requests.
1471		tokio::spawn(async move {
1472			loop {
1473				match swarm_2.select_next_some().await {
1474					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1475						result.unwrap();
1476					},
1477					_ => {},
1478				}
1479			}
1480		});
1481
1482		// Handle both requests sent by swarm 1 to swarm 2 in the background.
1483		//
1484		// Make sure both requests overlap, by answering the first only after receiving the
1485		// second.
1486		tokio::spawn(async move {
1487			let protocol_1_request = swarm_2_handler_1.next().await;
1488			let protocol_2_request = swarm_2_handler_2.next().await;
1489
1490			protocol_1_request
1491				.unwrap()
1492				.pending_response
1493				.send(OutgoingResponse {
1494					result: Ok(b"this is a response".to_vec()),
1495					reputation_changes: Vec::new(),
1496					sent_feedback: None,
1497				})
1498				.unwrap();
1499			protocol_2_request
1500				.unwrap()
1501				.pending_response
1502				.send(OutgoingResponse {
1503					result: Ok(b"this is a response".to_vec()),
1504					reputation_changes: Vec::new(),
1505					sent_feedback: None,
1506				})
1507				.unwrap();
1508		});
1509
1510		// Have swarm 1 send two requests to swarm 2 and await responses.
1511
1512		let mut response_receivers = None;
1513		let mut num_responses = 0;
1514
1515		loop {
1516			match swarm_1.select_next_some().await {
1517				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1518					let (sender_1, receiver_1) = oneshot::channel();
1519					let (sender_2, receiver_2) = oneshot::channel();
1520					swarm_1.behaviour_mut().send_request(
1521						&peer_id,
1522						protocol_name_1.clone(),
1523						b"this is a request".to_vec(),
1524						None,
1525						sender_1,
1526						IfDisconnected::ImmediateError,
1527					);
1528					swarm_1.behaviour_mut().send_request(
1529						&peer_id,
1530						protocol_name_2.clone(),
1531						b"this is a request".to_vec(),
1532						None,
1533						sender_2,
1534						IfDisconnected::ImmediateError,
1535					);
1536					assert!(response_receivers.is_none());
1537					response_receivers = Some((receiver_1, receiver_2));
1538				},
1539				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1540					num_responses += 1;
1541					result.unwrap();
1542					if num_responses == 2 {
1543						break;
1544					}
1545				},
1546				_ => {},
1547			}
1548		}
1549		let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1550		assert_eq!(
1551			response_receiver_1.await.unwrap().unwrap(),
1552			(b"this is a response".to_vec(), protocol_name_1)
1553		);
1554		assert_eq!(
1555			response_receiver_2.await.unwrap().unwrap(),
1556			(b"this is a response".to_vec(), protocol_name_2)
1557		);
1558	}
1559
1560	#[tokio::test]
1561	async fn request_fallback() {
1562		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1563		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1564		let protocol_name_2 = ProtocolName::from("/test/another");
1565
1566		let protocol_config_1 = ProtocolConfig {
1567			name: protocol_name_1.clone(),
1568			fallback_names: Vec::new(),
1569			max_request_size: 1024,
1570			max_response_size: 1024 * 1024,
1571			request_timeout: Duration::from_secs(30),
1572			inbound_queue: None,
1573		};
1574		let protocol_config_1_fallback = ProtocolConfig {
1575			name: protocol_name_1_fallback.clone(),
1576			fallback_names: Vec::new(),
1577			max_request_size: 1024,
1578			max_response_size: 1024 * 1024,
1579			request_timeout: Duration::from_secs(30),
1580			inbound_queue: None,
1581		};
1582		let protocol_config_2 = ProtocolConfig {
1583			name: protocol_name_2.clone(),
1584			fallback_names: Vec::new(),
1585			max_request_size: 1024,
1586			max_response_size: 1024 * 1024,
1587			request_timeout: Duration::from_secs(30),
1588			inbound_queue: None,
1589		};
1590
1591		// This swarm only speaks protocol_name_1_fallback and protocol_name_2.
1592		// It only responds to requests.
1593		let mut older_swarm = {
1594			let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1595			let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1596			let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1597			protocol_config_1_fallback.inbound_queue = Some(tx_1);
1598
1599			let mut protocol_config_2 = protocol_config_2.clone();
1600			protocol_config_2.inbound_queue = Some(tx_2);
1601
1602			tokio::spawn(async move {
1603				for _ in 0..2 {
1604					if let Some(rq) = rx_1.next().await {
1605						let (fb_tx, fb_rx) = oneshot::channel();
1606						assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1607						let _ = rq.pending_response.send(super::OutgoingResponse {
1608							result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1609							reputation_changes: Vec::new(),
1610							sent_feedback: Some(fb_tx),
1611						});
1612						fb_rx.await.unwrap();
1613					}
1614				}
1615
1616				if let Some(rq) = rx_2.next().await {
1617					let (fb_tx, fb_rx) = oneshot::channel();
1618					assert_eq!(rq.payload, b"request on protocol /test/other");
1619					let _ = rq.pending_response.send(super::OutgoingResponse {
1620						result: Ok(b"this is a response on protocol /test/other".to_vec()),
1621						reputation_changes: Vec::new(),
1622						sent_feedback: Some(fb_tx),
1623					});
1624					fb_rx.await.unwrap();
1625				}
1626			});
1627
1628			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1629		};
1630
1631		// This swarm speaks all protocols.
1632		let mut new_swarm = build_swarm(
1633			vec![
1634				protocol_config_1.clone(),
1635				protocol_config_1_fallback.clone(),
1636				protocol_config_2.clone(),
1637			]
1638			.into_iter(),
1639		);
1640
1641		{
1642			let dial_addr = older_swarm.1.clone();
1643			Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1644		}
1645
1646		// Running `older_swarm`` in the background.
1647		tokio::spawn(async move {
1648			loop {
1649				_ = older_swarm.0.select_next_some().await;
1650			}
1651		});
1652
1653		// Run the newer swarm. Attempt to make requests on all protocols.
1654		let (mut swarm, _) = new_swarm;
1655		let mut older_peer_id = None;
1656
1657		let mut response_receiver = None;
1658		// Try the new protocol with a fallback.
1659		loop {
1660			match swarm.select_next_some().await {
1661				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1662					older_peer_id = Some(peer_id);
1663					let (sender, receiver) = oneshot::channel();
1664					swarm.behaviour_mut().send_request(
1665						&peer_id,
1666						protocol_name_1.clone(),
1667						b"request on protocol /test/req-resp/2".to_vec(),
1668						Some((
1669							b"request on protocol /test/req-resp/1".to_vec(),
1670							protocol_config_1_fallback.name.clone(),
1671						)),
1672						sender,
1673						IfDisconnected::ImmediateError,
1674					);
1675					response_receiver = Some(receiver);
1676				},
1677				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1678					result.unwrap();
1679					break;
1680				},
1681				_ => {},
1682			}
1683		}
1684		assert_eq!(
1685			response_receiver.unwrap().await.unwrap().unwrap(),
1686			(
1687				b"this is a response on protocol /test/req-resp/1".to_vec(),
1688				protocol_name_1_fallback.clone()
1689			)
1690		);
1691		// Try the old protocol with a useless fallback.
1692		let (sender, response_receiver) = oneshot::channel();
1693		swarm.behaviour_mut().send_request(
1694			older_peer_id.as_ref().unwrap(),
1695			protocol_name_1_fallback.clone(),
1696			b"request on protocol /test/req-resp/1".to_vec(),
1697			Some((
1698				b"dummy request, will fail if processed".to_vec(),
1699				protocol_config_1_fallback.name.clone(),
1700			)),
1701			sender,
1702			IfDisconnected::ImmediateError,
1703		);
1704		loop {
1705			match swarm.select_next_some().await {
1706				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1707					result.unwrap();
1708					break;
1709				},
1710				_ => {},
1711			}
1712		}
1713		assert_eq!(
1714			response_receiver.await.unwrap().unwrap(),
1715			(
1716				b"this is a response on protocol /test/req-resp/1".to_vec(),
1717				protocol_name_1_fallback.clone()
1718			)
1719		);
1720		// Try the new protocol with no fallback. Should fail.
1721		let (sender, response_receiver) = oneshot::channel();
1722		swarm.behaviour_mut().send_request(
1723			older_peer_id.as_ref().unwrap(),
1724			protocol_name_1.clone(),
1725			b"request on protocol /test/req-resp-2".to_vec(),
1726			None,
1727			sender,
1728			IfDisconnected::ImmediateError,
1729		);
1730		loop {
1731			match swarm.select_next_some().await {
1732				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1733					assert_matches!(
1734						result.unwrap_err(),
1735						RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1736					);
1737					break;
1738				},
1739				_ => {},
1740			}
1741		}
1742		assert!(response_receiver.await.unwrap().is_err());
1743		// Try the other protocol with no fallback.
1744		let (sender, response_receiver) = oneshot::channel();
1745		swarm.behaviour_mut().send_request(
1746			older_peer_id.as_ref().unwrap(),
1747			protocol_name_2.clone(),
1748			b"request on protocol /test/other".to_vec(),
1749			None,
1750			sender,
1751			IfDisconnected::ImmediateError,
1752		);
1753		loop {
1754			match swarm.select_next_some().await {
1755				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1756					result.unwrap();
1757					break;
1758				},
1759				_ => {},
1760			}
1761		}
1762		assert_eq!(
1763			response_receiver.await.unwrap().unwrap(),
1764			(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1765		);
1766	}
1767
1768	/// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error
1769	/// even if the libp2p component hangs.
1770	///
1771	/// For testing purposes, the communication happens on the `/test/req-resp/1` protocol.
1772	///
1773	/// This is achieved by:
1774	/// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10
1775	///   seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in
1776	///   substrate this is set to 1 second.
1777	///
1778	/// - The first swarm introduces a delay of 2 seconds before responding to the request.
1779	///
1780	/// - The second swarm must enforce the 1 second timeout.
1781	#[tokio::test]
1782	async fn enforce_outbound_timeouts() {
1783		const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1784		const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1785
1786		// These swarms only speaks protocol_name.
1787		let protocol_name = ProtocolName::from("/test/req-resp/1");
1788
1789		let protocol_config = ProtocolConfig {
1790			name: protocol_name.clone(),
1791			fallback_names: Vec::new(),
1792			max_request_size: 1024,
1793			max_response_size: 1024 * 1024,
1794			request_timeout: REQUEST_TIMEOUT, // <-- important for the test
1795			inbound_queue: None,
1796		};
1797
1798		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1799		let (mut first_swarm, _) = {
1800			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1801
1802			tokio::spawn(async move {
1803				if let Some(rq) = rx.next().await {
1804					assert_eq!(rq.payload, b"this is a request");
1805
1806					// Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than
1807					// `REQUEST_TIMEOUT`.
1808					tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1809
1810					// By the time the response is sent back, the second swarm
1811					// received Timeout.
1812					let _ = rq.pending_response.send(super::OutgoingResponse {
1813						result: Ok(b"Second swarm already timedout".to_vec()),
1814						reputation_changes: Vec::new(),
1815						sent_feedback: None,
1816					});
1817				}
1818			});
1819
1820			let mut protocol_config = protocol_config.clone();
1821			protocol_config.inbound_queue = Some(tx);
1822
1823			build_swarm(iter::once(protocol_config))
1824		};
1825
1826		let (mut second_swarm, second_address) = {
1827			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1828
1829			tokio::spawn(async move {
1830				while let Some(rq) = rx.next().await {
1831					let _ = rq.pending_response.send(super::OutgoingResponse {
1832						result: Ok(b"This is the response".to_vec()),
1833						reputation_changes: Vec::new(),
1834						sent_feedback: None,
1835					});
1836				}
1837			});
1838			let mut protocol_config = protocol_config.clone();
1839			protocol_config.inbound_queue = Some(tx);
1840
1841			build_swarm(iter::once(protocol_config.clone()))
1842		};
1843		// Modify the second swarm to have a shorter timeout.
1844		second_swarm
1845			.behaviour_mut()
1846			.protocols
1847			.get_mut(&protocol_name)
1848			.unwrap()
1849			.request_timeout = REQUEST_TIMEOUT_SHORT;
1850
1851		// Ask first swarm to dial the second swarm.
1852		{
1853			Swarm::dial(&mut first_swarm, second_address).unwrap();
1854		}
1855
1856		// Running the first swarm in the background until a `InboundRequest` event happens,
1857		// which is a hint about the test having ended.
1858		tokio::spawn(async move {
1859			loop {
1860				let event = first_swarm.select_next_some().await;
1861				match event {
1862					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1863						assert!(result.is_ok());
1864						break;
1865					},
1866					SwarmEvent::ConnectionClosed { .. } => {
1867						break;
1868					},
1869					_ => {},
1870				}
1871			}
1872		});
1873
1874		// Run the second swarm.
1875		// - on connection established send the request to the first swarm
1876		// - expect to receive a timeout
1877		let mut response_receiver = None;
1878		loop {
1879			let event = second_swarm.select_next_some().await;
1880
1881			match event {
1882				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1883					let (sender, receiver) = oneshot::channel();
1884					second_swarm.behaviour_mut().send_request(
1885						&peer_id,
1886						protocol_name.clone(),
1887						b"this is a request".to_vec(),
1888						None,
1889						sender,
1890						IfDisconnected::ImmediateError,
1891					);
1892					assert!(response_receiver.is_none());
1893					response_receiver = Some(receiver);
1894				},
1895				SwarmEvent::ConnectionClosed { .. } => {
1896					break;
1897				},
1898				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1899					assert!(result.is_err());
1900					break;
1901				},
1902				_ => {},
1903			}
1904		}
1905
1906		// Expect the timeout.
1907		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1908			RequestFailure::Network(OutboundFailure::Timeout) => {},
1909			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1910		}
1911	}
1912}