referrerpolicy=no-referrer-when-downgrade

sc_network/
request_responses.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Collection of request-response protocols.
20//!
21//! The [`RequestResponsesBehaviour`] struct defined in this module provides support for zero or
22//! more so-called "request-response" protocols.
23//!
24//! A request-response protocol works in the following way:
25//!
26//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
27//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
28//! with the request itself. The remote then sends the size of the response as a LEB128 number,
29//! followed with the response.
30//!
31//! - Requests have a certain time limit before they time out. This time includes the time it
32//! takes to send/receive the request and response.
33//!
34//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
35//! is used to handle incoming requests.
36
37use crate::{
38	peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39	service::traits::RequestResponseConfig as RequestResponseConfigT,
40	types::ProtocolName,
41	ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46	core::{transport::PortUse, Endpoint, Multiaddr},
47	request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48	swarm::{
49		behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50		NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51	},
52	PeerId,
53};
54
55use std::{
56	collections::{hash_map::Entry, HashMap},
57	io, iter,
58	ops::Deref,
59	pin::Pin,
60	sync::Arc,
61	task::{Context, Poll},
62	time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67/// Logging target for the file.
68const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70/// Periodically check if requests are taking too long.
71const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73/// Possible failures occurring in the context of sending an outbound request and receiving the
74/// response.
75#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77	/// The request could not be sent because a dialing attempt failed.
78	#[error("Failed to dial the requested peer")]
79	DialFailure,
80	/// The request timed out before a response was received.
81	#[error("Timeout while waiting for a response")]
82	Timeout,
83	/// The connection closed before a response was received.
84	#[error("Connection was closed before a response was received")]
85	ConnectionClosed,
86	/// The remote supports none of the requested protocols.
87	#[error("The remote supports none of the requested protocols")]
88	UnsupportedProtocols,
89	/// An IO failure happened on an outbound stream.
90	#[error("An IO failure happened on an outbound stream")]
91	Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95	fn from(out: request_response::OutboundFailure) -> Self {
96		match out {
97			request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98			request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99			request_response::OutboundFailure::ConnectionClosed =>
100				OutboundFailure::ConnectionClosed,
101			request_response::OutboundFailure::UnsupportedProtocols =>
102				OutboundFailure::UnsupportedProtocols,
103			request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
104		}
105	}
106}
107
108/// Possible failures occurring in the context of receiving an inbound request and sending a
109/// response.
110#[derive(Debug, thiserror::Error)]
111pub enum InboundFailure {
112	/// The inbound request timed out, either while reading the incoming request or before a
113	/// response is sent
114	#[error("Timeout while receiving request or sending response")]
115	Timeout,
116	/// The connection closed before a response could be send.
117	#[error("Connection was closed before a response could be sent")]
118	ConnectionClosed,
119	/// The local peer supports none of the protocols requested by the remote.
120	#[error("The local peer supports none of the protocols requested by the remote")]
121	UnsupportedProtocols,
122	/// The local peer failed to respond to an inbound request
123	#[error("The response channel was dropped without sending a response to the remote")]
124	ResponseOmission,
125	/// An IO failure happened on an inbound stream.
126	#[error("An IO failure happened on an inbound stream")]
127	Io(Arc<io::Error>),
128}
129
130impl From<request_response::InboundFailure> for InboundFailure {
131	fn from(out: request_response::InboundFailure) -> Self {
132		match out {
133			request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
134			request_response::InboundFailure::Timeout => InboundFailure::Timeout,
135			request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
136			request_response::InboundFailure::UnsupportedProtocols =>
137				InboundFailure::UnsupportedProtocols,
138			request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
139		}
140	}
141}
142
143/// Error in a request.
144#[derive(Debug, thiserror::Error)]
145#[allow(missing_docs)]
146pub enum RequestFailure {
147	#[error("We are not currently connected to the requested peer.")]
148	NotConnected,
149	#[error("Given protocol hasn't been registered.")]
150	UnknownProtocol,
151	#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
152	Refused,
153	#[error("The remote replied, but the local node is no longer interested in the response.")]
154	Obsolete,
155	#[error("Problem on the network: {0}")]
156	Network(OutboundFailure),
157}
158
159/// Configuration for a single request-response protocol.
160#[derive(Debug, Clone)]
161pub struct ProtocolConfig {
162	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
163	pub name: ProtocolName,
164
165	/// Fallback on the wire protocol names to support.
166	pub fallback_names: Vec<ProtocolName>,
167
168	/// Maximum allowed size, in bytes, of a request.
169	///
170	/// Any request larger than this value will be declined as a way to avoid allocating too
171	/// much memory for it.
172	pub max_request_size: u64,
173
174	/// Maximum allowed size, in bytes, of a response.
175	///
176	/// Any response larger than this value will be declined as a way to avoid allocating too
177	/// much memory for it.
178	pub max_response_size: u64,
179
180	/// Duration after which emitted requests are considered timed out.
181	///
182	/// If you expect the response to come back quickly, you should set this to a smaller duration.
183	pub request_timeout: Duration,
184
185	/// Channel on which the networking service will send incoming requests.
186	///
187	/// Every time a peer sends a request to the local node using this protocol, the networking
188	/// service will push an element on this channel. The receiving side of this channel then has
189	/// to pull this element, process the request, and send back the response to send back to the
190	/// peer.
191	///
192	/// The size of the channel has to be carefully chosen. If the channel is full, the networking
193	/// service will discard the incoming request send back an error to the peer. Consequently,
194	/// the channel being full is an indicator that the node is overloaded.
195	///
196	/// You can typically set the size of the channel to `T / d`, where `T` is the
197	/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
198	/// build a response.
199	///
200	/// Can be `None` if the local node does not support answering incoming requests.
201	/// If this is `None`, then the local node will not advertise support for this protocol towards
202	/// other peers. If this is `Some` but the channel is closed, then the local node will
203	/// advertise support for this protocol, but any incoming request will lead to an error being
204	/// sent back.
205	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
206}
207
208impl RequestResponseConfigT for ProtocolConfig {
209	fn protocol_name(&self) -> &ProtocolName {
210		&self.name
211	}
212}
213
214/// A single request received by a peer on a request-response protocol.
215#[derive(Debug)]
216pub struct IncomingRequest {
217	/// Who sent the request.
218	pub peer: sc_network_types::PeerId,
219
220	/// Request sent by the remote. Will always be smaller than
221	/// [`ProtocolConfig::max_request_size`].
222	pub payload: Vec<u8>,
223
224	/// Channel to send back the response.
225	///
226	/// There are two ways to indicate that handling the request failed:
227	///
228	/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
229	///
230	/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
231	/// the given peer.
232	pub pending_response: oneshot::Sender<OutgoingResponse>,
233}
234
235/// Response for an incoming request to be send by a request protocol handler.
236#[derive(Debug)]
237pub struct OutgoingResponse {
238	/// The payload of the response.
239	///
240	/// `Err(())` if none is available e.g. due an error while handling the request.
241	pub result: Result<Vec<u8>, ()>,
242
243	/// Reputation changes accrued while handling the request. To be applied to the reputation of
244	/// the peer sending the request.
245	pub reputation_changes: Vec<ReputationChange>,
246
247	/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
248	/// peer.
249	///
250	/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
251	/// >			outgoing data for each TCP socket, and it is not possible for a user
252	/// >			application to inspect this buffer. This channel here is not actually notified
253	/// >			when the response has been fully sent out, but rather when it has fully been
254	/// >			written to the buffer managed by the operating system.
255	pub sent_feedback: Option<oneshot::Sender<()>>,
256}
257
258/// Information stored about a pending request.
259struct PendingRequest {
260	/// The time when the request was sent to the libp2p request-response protocol.
261	started_at: Instant,
262	/// The channel to send the response back to the caller.
263	///
264	/// This is wrapped in an `Option` to allow for the channel to be taken out
265	/// on force-detected timeouts.
266	response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
267	/// Fallback request to send if the primary request fails.
268	fallback_request: Option<(Vec<u8>, ProtocolName)>,
269}
270
271/// When sending a request, what to do on a disconnected recipient.
272#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
273pub enum IfDisconnected {
274	/// Try to connect to the peer.
275	TryConnect,
276	/// Just fail if the destination is not yet connected.
277	ImmediateError,
278}
279
280/// Convenience functions for `IfDisconnected`.
281impl IfDisconnected {
282	/// Shall we connect to a disconnected peer?
283	pub fn should_connect(self) -> bool {
284		match self {
285			Self::TryConnect => true,
286			Self::ImmediateError => false,
287		}
288	}
289}
290
291/// Event generated by the [`RequestResponsesBehaviour`].
292#[derive(Debug)]
293pub enum Event {
294	/// A remote sent a request and either we have successfully answered it or an error happened.
295	///
296	/// This event is generated for statistics purposes.
297	InboundRequest {
298		/// Peer which has emitted the request.
299		peer: PeerId,
300		/// Name of the protocol in question.
301		protocol: ProtocolName,
302		/// Whether handling the request was successful or unsuccessful.
303		///
304		/// When successful contains the time elapsed between when we received the request and when
305		/// we sent back the response. When unsuccessful contains the failure reason.
306		result: Result<Duration, ResponseFailure>,
307	},
308
309	/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
310	/// failed.
311	///
312	/// This event is generated for statistics purposes.
313	RequestFinished {
314		/// Peer that we send a request to.
315		peer: PeerId,
316		/// Name of the protocol in question.
317		protocol: ProtocolName,
318		/// Duration the request took.
319		duration: Duration,
320		/// Result of the request.
321		result: Result<(), RequestFailure>,
322	},
323
324	/// A request protocol handler issued reputation changes for the given peer.
325	ReputationChanges {
326		/// Peer whose reputation needs to be adjust.
327		peer: PeerId,
328		/// Reputation changes.
329		changes: Vec<ReputationChange>,
330	},
331}
332
333/// Combination of a protocol name and a request id.
334///
335/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
336/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
337/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
338/// [`ProtocolRequestId`]s.
339#[derive(Debug, Clone, PartialEq, Eq, Hash)]
340struct ProtocolRequestId<RequestId> {
341	protocol: ProtocolName,
342	request_id: RequestId,
343}
344
345impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
346	fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
347		Self { protocol, request_id }
348	}
349}
350
351/// Details of a request-response protocol.
352struct ProtocolDetails {
353	behaviour: Behaviour<GenericCodec>,
354	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
355	request_timeout: Duration,
356}
357
358/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
359pub struct RequestResponsesBehaviour {
360	/// The multiple sub-protocols, by name.
361	///
362	/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
363	/// "response builder" used to build responses for incoming requests.
364	protocols: HashMap<ProtocolName, ProtocolDetails>,
365
366	/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
367	pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
368
369	/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
370	/// start time and the response to send back to the remote.
371	pending_responses: stream::FuturesUnordered<
372		Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
373	>,
374
375	/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
376	pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
377
378	/// Whenever a response is received on `pending_responses`, insert a channel to be notified
379	/// when the request has been sent out.
380	send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
381
382	/// Primarily used to get a reputation of a node.
383	peer_store: Arc<dyn PeerStoreProvider>,
384
385	/// Interval to check that the requests are not taking too long.
386	///
387	/// We had issues in the past where libp2p did not produce a timeout event in due time.
388	///
389	/// For more details, see:
390	/// - <https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096>
391	periodic_request_check: tokio::time::Interval,
392}
393
394/// Generated by the response builder and waiting to be processed.
395struct RequestProcessingOutcome {
396	peer: PeerId,
397	request_id: InboundRequestId,
398	protocol: ProtocolName,
399	inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
400	response: OutgoingResponse,
401}
402
403impl RequestResponsesBehaviour {
404	/// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
405	/// the same protocol is passed twice.
406	pub fn new(
407		list: impl Iterator<Item = ProtocolConfig>,
408		peer_store: Arc<dyn PeerStoreProvider>,
409	) -> Result<Self, RegisterError> {
410		let mut protocols = HashMap::new();
411		for protocol in list {
412			let cfg = Config::default().with_request_timeout(protocol.request_timeout);
413
414			let protocol_support = if protocol.inbound_queue.is_some() {
415				ProtocolSupport::Full
416			} else {
417				ProtocolSupport::Outbound
418			};
419
420			let behaviour = Behaviour::with_codec(
421				GenericCodec {
422					max_request_size: protocol.max_request_size,
423					max_response_size: protocol.max_response_size,
424				},
425				iter::once(protocol.name.clone())
426					.chain(protocol.fallback_names)
427					.zip(iter::repeat(protocol_support)),
428				cfg,
429			);
430
431			match protocols.entry(protocol.name) {
432				Entry::Vacant(e) => e.insert(ProtocolDetails {
433					behaviour,
434					inbound_queue: protocol.inbound_queue,
435					request_timeout: protocol.request_timeout,
436				}),
437				Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
438			};
439		}
440
441		Ok(Self {
442			protocols,
443			pending_requests: Default::default(),
444			pending_responses: Default::default(),
445			pending_responses_arrival_time: Default::default(),
446			send_feedback: Default::default(),
447			peer_store,
448			periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
449		})
450	}
451
452	/// Initiates sending a request.
453	///
454	/// If there is no established connection to the target peer, the behavior is determined by the
455	/// choice of `connect`.
456	///
457	/// An error is returned if the protocol doesn't match one that has been registered.
458	pub fn send_request(
459		&mut self,
460		target: &PeerId,
461		protocol_name: ProtocolName,
462		request: Vec<u8>,
463		fallback_request: Option<(Vec<u8>, ProtocolName)>,
464		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
465		connect: IfDisconnected,
466	) {
467		log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
468
469		if let Some(ProtocolDetails { behaviour, .. }) =
470			self.protocols.get_mut(protocol_name.deref())
471		{
472			Self::send_request_inner(
473				behaviour,
474				&mut self.pending_requests,
475				target,
476				protocol_name,
477				request,
478				fallback_request,
479				pending_response,
480				connect,
481			)
482		} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
483			log::debug!(
484				target: LOG_TARGET,
485				"Unknown protocol {:?}. At the same time local \
486				 node is no longer interested in the result.",
487				protocol_name,
488			);
489		}
490	}
491
492	fn send_request_inner(
493		behaviour: &mut Behaviour<GenericCodec>,
494		pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
495		target: &PeerId,
496		protocol_name: ProtocolName,
497		request: Vec<u8>,
498		fallback_request: Option<(Vec<u8>, ProtocolName)>,
499		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
500		connect: IfDisconnected,
501	) {
502		if behaviour.is_connected(target) || connect.should_connect() {
503			let request_id = behaviour.send_request(target, request);
504			let prev_req_id = pending_requests.insert(
505				(protocol_name.to_string().into(), request_id).into(),
506				PendingRequest {
507					started_at: Instant::now(),
508					response_tx: Some(pending_response),
509					fallback_request,
510				},
511			);
512			debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
513		} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
514			log::debug!(
515				target: LOG_TARGET,
516				"Not connected to peer {:?}. At the same time local \
517				 node is no longer interested in the result.",
518				target,
519			);
520		}
521	}
522}
523
524impl NetworkBehaviour for RequestResponsesBehaviour {
525	type ConnectionHandler =
526		MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
527	type ToSwarm = Event;
528
529	fn handle_pending_inbound_connection(
530		&mut self,
531		_connection_id: ConnectionId,
532		_local_addr: &Multiaddr,
533		_remote_addr: &Multiaddr,
534	) -> Result<(), ConnectionDenied> {
535		Ok(())
536	}
537
538	fn handle_pending_outbound_connection(
539		&mut self,
540		_connection_id: ConnectionId,
541		_maybe_peer: Option<PeerId>,
542		_addresses: &[Multiaddr],
543		_effective_role: Endpoint,
544	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
545		Ok(Vec::new())
546	}
547
548	fn handle_established_inbound_connection(
549		&mut self,
550		connection_id: ConnectionId,
551		peer: PeerId,
552		local_addr: &Multiaddr,
553		remote_addr: &Multiaddr,
554	) -> Result<THandler<Self>, ConnectionDenied> {
555		let iter =
556			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
557				if let Ok(handler) = behaviour.handle_established_inbound_connection(
558					connection_id,
559					peer,
560					local_addr,
561					remote_addr,
562				) {
563					Some((p.to_string(), handler))
564				} else {
565					None
566				}
567			});
568
569		Ok(MultiHandler::try_from_iter(iter).expect(
570			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
571			 which is the only possible error; qed",
572		))
573	}
574
575	fn handle_established_outbound_connection(
576		&mut self,
577		connection_id: ConnectionId,
578		peer: PeerId,
579		addr: &Multiaddr,
580		role_override: Endpoint,
581		port_use: PortUse,
582	) -> Result<THandler<Self>, ConnectionDenied> {
583		let iter =
584			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
585				if let Ok(handler) = behaviour.handle_established_outbound_connection(
586					connection_id,
587					peer,
588					addr,
589					role_override,
590					port_use,
591				) {
592					Some((p.to_string(), handler))
593				} else {
594					None
595				}
596			});
597
598		Ok(MultiHandler::try_from_iter(iter).expect(
599			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
600			 which is the only possible error; qed",
601		))
602	}
603
604	fn on_swarm_event(&mut self, event: FromSwarm) {
605		for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
606			behaviour.on_swarm_event(event);
607		}
608	}
609
610	fn on_connection_handler_event(
611		&mut self,
612		peer_id: PeerId,
613		connection_id: ConnectionId,
614		event: THandlerOutEvent<Self>,
615	) {
616		let p_name = event.0;
617		if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
618			return behaviour.on_connection_handler_event(peer_id, connection_id, event.1)
619		} else {
620			log::warn!(
621				target: LOG_TARGET,
622				"on_connection_handler_event: no request-response instance registered for protocol {:?}",
623				p_name
624			);
625		}
626	}
627
628	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
629		'poll_all: loop {
630			// Poll the periodic request check.
631			if self.periodic_request_check.poll_tick(cx).is_ready() {
632				self.pending_requests.retain(|id, req| {
633					let Some(ProtocolDetails { request_timeout, .. }) =
634						self.protocols.get(&id.protocol)
635					else {
636						log::warn!(
637							target: LOG_TARGET,
638							"Request {id:?} has no protocol registered.",
639						);
640
641						if let Some(response_tx) = req.response_tx.take() {
642							if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
643								log::debug!(
644									target: LOG_TARGET,
645									"Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
646								);
647							}
648						}
649						return false
650					};
651
652					let elapsed = req.started_at.elapsed();
653					if elapsed > *request_timeout {
654						log::debug!(
655							target: LOG_TARGET,
656							"Request {id:?} force detected as timeout.",
657						);
658
659						if let Some(response_tx) = req.response_tx.take() {
660							if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
661								log::debug!(
662									target: LOG_TARGET,
663									"Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
664								);
665							}
666						}
667
668						false
669					} else {
670						true
671					}
672				});
673			}
674
675			// Poll to see if any response is ready to be sent back.
676			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
677				let RequestProcessingOutcome {
678					peer,
679					request_id,
680					protocol: protocol_name,
681					inner_channel,
682					response: OutgoingResponse { result, reputation_changes, sent_feedback },
683				} = match outcome {
684					Some(outcome) => outcome,
685					// The response builder was too busy or handling the request failed. This is
686					// later on reported as a `InboundFailure::Omission`.
687					None => continue,
688				};
689
690				if let Ok(payload) = result {
691					if let Some(ProtocolDetails { behaviour, .. }) =
692						self.protocols.get_mut(&*protocol_name)
693					{
694						log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
695
696						if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
697							// Note: Failure is handled further below when receiving
698							// `InboundFailure` event from request-response [`Behaviour`].
699							log::debug!(
700								target: LOG_TARGET,
701								"Failed to send response for {:?} on protocol {:?} due to a \
702								 timeout or due to the connection to the peer being closed. \
703								 Dropping response",
704								request_id, protocol_name,
705							);
706						} else if let Some(sent_feedback) = sent_feedback {
707							self.send_feedback
708								.insert((protocol_name, request_id).into(), sent_feedback);
709						}
710					}
711				}
712
713				if !reputation_changes.is_empty() {
714					return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
715						peer,
716						changes: reputation_changes,
717					}))
718				}
719			}
720
721			let mut fallback_requests = vec![];
722
723			// Poll request-responses protocols.
724			for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
725			{
726				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
727					let ev = match ev {
728						// Main events we are interested in.
729						ToSwarm::GenerateEvent(ev) => ev,
730
731						// Other events generated by the underlying behaviour are transparently
732						// passed through.
733						ToSwarm::Dial { opts } => {
734							if opts.get_peer_id().is_none() {
735								log::error!(
736									target: LOG_TARGET,
737									"The request-response isn't supposed to start dialing addresses"
738								);
739							}
740							return Poll::Ready(ToSwarm::Dial { opts })
741						},
742						event => {
743							return Poll::Ready(
744								event.map_in(|event| ((*protocol).to_string(), event)).map_out(
745									|_| {
746										unreachable!(
747											"`GenerateEvent` is handled in a branch above; qed"
748										)
749									},
750								),
751							);
752						},
753					};
754
755					match ev {
756						// Received a request from a remote.
757						request_response::Event::Message {
758							peer,
759							message: Message::Request { request_id, request, channel, .. },
760						} => {
761							self.pending_responses_arrival_time
762								.insert((protocol.clone(), request_id).into(), Instant::now());
763
764							let reputation = self.peer_store.peer_reputation(&peer.into());
765
766							if reputation < BANNED_THRESHOLD {
767								log::debug!(
768									target: LOG_TARGET,
769									"Cannot handle requests from a node with a low reputation {}: {}",
770									peer,
771									reputation,
772								);
773								continue 'poll_protocol
774							}
775
776							let (tx, rx) = oneshot::channel();
777
778							// Submit the request to the "response builder" passed by the user at
779							// initialization.
780							if let Some(resp_builder) = inbound_queue {
781								// If the response builder is too busy, silently drop `tx`. This
782								// will be reported by the corresponding request-response
783								// [`Behaviour`] through an `InboundFailure::Omission` event.
784								// Note that we use `async_channel::bounded` and not `mpsc::channel`
785								// because the latter allocates an extra slot for every cloned
786								// sender.
787								let _ = resp_builder.try_send(IncomingRequest {
788									peer: peer.into(),
789									payload: request,
790									pending_response: tx,
791								});
792							} else {
793								debug_assert!(false, "Received message on outbound-only protocol.");
794							}
795
796							let protocol = protocol.clone();
797
798							self.pending_responses.push(Box::pin(async move {
799								// The `tx` created above can be dropped if we are not capable of
800								// processing this request, which is reflected as a
801								// `InboundFailure::Omission` event.
802								rx.await.map_or(None, |response| {
803									Some(RequestProcessingOutcome {
804										peer,
805										request_id,
806										protocol,
807										inner_channel: channel,
808										response,
809									})
810								})
811							}));
812
813							// This `continue` makes sure that `pending_responses` gets polled
814							// after we have added the new element.
815							continue 'poll_all
816						},
817
818						// Received a response from a remote to one of our requests.
819						request_response::Event::Message {
820							peer,
821							message: Message::Response { request_id, response },
822							..
823						} => {
824							let (started, delivered) = match self
825								.pending_requests
826								.remove(&(protocol.clone(), request_id).into())
827							{
828								Some(PendingRequest {
829									started_at,
830									response_tx: Some(response_tx),
831									..
832								}) => {
833									log::trace!(
834										target: LOG_TARGET,
835										"received response from {peer} ({protocol:?}), {} bytes",
836										response.as_ref().map_or(0usize, |response| response.len()),
837									);
838
839									let delivered = response_tx
840										.send(
841											response
842												.map_err(|()| RequestFailure::Refused)
843												.map(|resp| (resp, protocol.clone())),
844										)
845										.map_err(|_| RequestFailure::Obsolete);
846									(started_at, delivered)
847								},
848								_ => {
849									log::debug!(
850										target: LOG_TARGET,
851										"Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
852										request_id,
853										peer,
854									);
855									continue
856								},
857							};
858
859							let out = Event::RequestFinished {
860								peer,
861								protocol: protocol.clone(),
862								duration: started.elapsed(),
863								result: delivered,
864							};
865
866							return Poll::Ready(ToSwarm::GenerateEvent(out))
867						},
868
869						// One of our requests has failed.
870						request_response::Event::OutboundFailure {
871							peer,
872							request_id,
873							error,
874							..
875						} => {
876							let error = OutboundFailure::from(error);
877							let started = match self
878								.pending_requests
879								.remove(&(protocol.clone(), request_id).into())
880							{
881								Some(PendingRequest {
882									started_at,
883									response_tx: Some(response_tx),
884									fallback_request,
885								}) => {
886									// Try using the fallback request if the protocol was not
887									// supported.
888									if matches!(error, OutboundFailure::UnsupportedProtocols) {
889										if let Some((fallback_request, fallback_protocol)) =
890											fallback_request
891										{
892											log::trace!(
893												target: LOG_TARGET,
894												"Request with id {:?} failed. Trying the fallback protocol. {}",
895												request_id,
896												fallback_protocol.deref()
897											);
898											fallback_requests.push((
899												peer,
900												fallback_protocol,
901												fallback_request,
902												response_tx,
903											));
904											continue
905										}
906									}
907
908									if response_tx
909										.send(Err(RequestFailure::Network(error.clone())))
910										.is_err()
911									{
912										log::debug!(
913											target: LOG_TARGET,
914											"Request with id {:?} failed. At the same time local \
915											 node is no longer interested in the result.",
916											request_id,
917										);
918									}
919									started_at
920								},
921								_ => {
922									log::debug!(
923										target: LOG_TARGET,
924										"Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
925										request_id,
926										error,
927										peer
928									);
929									continue
930								},
931							};
932
933							let out = Event::RequestFinished {
934								peer,
935								protocol: protocol.clone(),
936								duration: started.elapsed(),
937								result: Err(RequestFailure::Network(error)),
938							};
939
940							return Poll::Ready(ToSwarm::GenerateEvent(out))
941						},
942
943						// An inbound request failed, either while reading the request or due to
944						// failing to send a response.
945						request_response::Event::InboundFailure {
946							request_id, peer, error, ..
947						} => {
948							self.pending_responses_arrival_time
949								.remove(&(protocol.clone(), request_id).into());
950							self.send_feedback.remove(&(protocol.clone(), request_id).into());
951							let out = Event::InboundRequest {
952								peer,
953								protocol: protocol.clone(),
954								result: Err(ResponseFailure::Network(error.into())),
955							};
956							return Poll::Ready(ToSwarm::GenerateEvent(out))
957						},
958
959						// A response to an inbound request has been sent.
960						request_response::Event::ResponseSent { request_id, peer } => {
961							let arrival_time = self
962								.pending_responses_arrival_time
963								.remove(&(protocol.clone(), request_id).into())
964								.map(|t| t.elapsed())
965								.expect(
966									"Time is added for each inbound request on arrival and only \
967									 removed on success (`ResponseSent`) or failure \
968									 (`InboundFailure`). One can not receive a success event for a \
969									 request that either never arrived, or that has previously \
970									 failed; qed.",
971								);
972
973							if let Some(send_feedback) =
974								self.send_feedback.remove(&(protocol.clone(), request_id).into())
975							{
976								let _ = send_feedback.send(());
977							}
978
979							let out = Event::InboundRequest {
980								peer,
981								protocol: protocol.clone(),
982								result: Ok(arrival_time),
983							};
984
985							return Poll::Ready(ToSwarm::GenerateEvent(out))
986						},
987					};
988				}
989			}
990
991			// Send out fallback requests.
992			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
993				if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
994					Self::send_request_inner(
995						behaviour,
996						&mut self.pending_requests,
997						&peer,
998						protocol,
999						request,
1000						None,
1001						pending_response,
1002						// We can error if not connected because the
1003						// previous attempt would have tried to establish a
1004						// connection already or errored and we wouldn't have gotten here.
1005						IfDisconnected::ImmediateError,
1006					);
1007				}
1008			}
1009
1010			break Poll::Pending
1011		}
1012	}
1013}
1014
1015/// Error when registering a protocol.
1016#[derive(Debug, thiserror::Error)]
1017pub enum RegisterError {
1018	/// A protocol has been specified multiple times.
1019	#[error("{0}")]
1020	DuplicateProtocol(ProtocolName),
1021}
1022
1023/// Error when processing a request sent by a remote.
1024#[derive(Debug, thiserror::Error)]
1025pub enum ResponseFailure {
1026	/// Problem on the network.
1027	#[error("Problem on the network: {0}")]
1028	Network(InboundFailure),
1029}
1030
1031/// Implements the libp2p [`Codec`] trait. Defines how streams of bytes are turned
1032/// into requests and responses and vice-versa.
1033#[derive(Debug, Clone)]
1034#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
1035pub struct GenericCodec {
1036	max_request_size: u64,
1037	max_response_size: u64,
1038}
1039
1040#[async_trait::async_trait]
1041impl Codec for GenericCodec {
1042	type Protocol = ProtocolName;
1043	type Request = Vec<u8>;
1044	type Response = Result<Vec<u8>, ()>;
1045
1046	async fn read_request<T>(
1047		&mut self,
1048		_: &Self::Protocol,
1049		mut io: &mut T,
1050	) -> io::Result<Self::Request>
1051	where
1052		T: AsyncRead + Unpin + Send,
1053	{
1054		// Read the length.
1055		let length = unsigned_varint::aio::read_usize(&mut io)
1056			.await
1057			.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1058		if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1059			return Err(io::Error::new(
1060				io::ErrorKind::InvalidInput,
1061				format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1062			))
1063		}
1064
1065		// Read the payload.
1066		let mut buffer = vec![0; length];
1067		io.read_exact(&mut buffer).await?;
1068		Ok(buffer)
1069	}
1070
1071	async fn read_response<T>(
1072		&mut self,
1073		_: &Self::Protocol,
1074		mut io: &mut T,
1075	) -> io::Result<Self::Response>
1076	where
1077		T: AsyncRead + Unpin + Send,
1078	{
1079		// Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
1080		// considered as a protocol error and will result in the entire connection being closed.
1081		// Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1082		// that this response is an error.
1083
1084		// Read the length.
1085		let length = match unsigned_varint::aio::read_usize(&mut io).await {
1086			Ok(l) => l,
1087			Err(unsigned_varint::io::ReadError::Io(err))
1088				if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1089				return Ok(Err(())),
1090			Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1091		};
1092
1093		if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1094			return Err(io::Error::new(
1095				io::ErrorKind::InvalidInput,
1096				format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1097			))
1098		}
1099
1100		// Read the payload.
1101		let mut buffer = vec![0; length];
1102		io.read_exact(&mut buffer).await?;
1103		Ok(Ok(buffer))
1104	}
1105
1106	async fn write_request<T>(
1107		&mut self,
1108		_: &Self::Protocol,
1109		io: &mut T,
1110		req: Self::Request,
1111	) -> io::Result<()>
1112	where
1113		T: AsyncWrite + Unpin + Send,
1114	{
1115		// TODO: check the length?
1116		// Write the length.
1117		{
1118			let mut buffer = unsigned_varint::encode::usize_buffer();
1119			io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1120		}
1121
1122		// Write the payload.
1123		io.write_all(&req).await?;
1124
1125		io.close().await?;
1126		Ok(())
1127	}
1128
1129	async fn write_response<T>(
1130		&mut self,
1131		_: &Self::Protocol,
1132		io: &mut T,
1133		res: Self::Response,
1134	) -> io::Result<()>
1135	where
1136		T: AsyncWrite + Unpin + Send,
1137	{
1138		// If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1139		if let Ok(res) = res {
1140			// TODO: check the length?
1141			// Write the length.
1142			{
1143				let mut buffer = unsigned_varint::encode::usize_buffer();
1144				io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1145			}
1146
1147			// Write the payload.
1148			io.write_all(&res).await?;
1149		}
1150
1151		io.close().await?;
1152		Ok(())
1153	}
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158	use super::*;
1159
1160	use crate::mock::MockPeerStore;
1161	use assert_matches::assert_matches;
1162	use futures::channel::oneshot;
1163	use libp2p::{
1164		core::{
1165			transport::{MemoryTransport, Transport},
1166			upgrade,
1167		},
1168		identity::Keypair,
1169		noise,
1170		swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1171		Multiaddr,
1172	};
1173	use std::{iter, time::Duration};
1174
1175	struct TokioExecutor;
1176	impl Executor for TokioExecutor {
1177		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1178			tokio::spawn(f);
1179		}
1180	}
1181
1182	fn build_swarm(
1183		list: impl Iterator<Item = ProtocolConfig>,
1184	) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1185		let keypair = Keypair::generate_ed25519();
1186
1187		let transport = MemoryTransport::new()
1188			.upgrade(upgrade::Version::V1)
1189			.authenticate(noise::Config::new(&keypair).unwrap())
1190			.multiplex(libp2p::yamux::Config::default())
1191			.boxed();
1192
1193		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1194
1195		let mut swarm = Swarm::new(
1196			transport,
1197			behaviour,
1198			keypair.public().to_peer_id(),
1199			SwarmConfig::with_executor(TokioExecutor {})
1200				// This is taken care of by notification protocols in non-test environment
1201				// It is very slow in test environment for some reason, hence larger timeout
1202				.with_idle_connection_timeout(Duration::from_secs(10)),
1203		);
1204
1205		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1206
1207		swarm.listen_on(listen_addr.clone()).unwrap();
1208
1209		(swarm, listen_addr)
1210	}
1211
1212	#[tokio::test]
1213	async fn basic_request_response_works() {
1214		let protocol_name = ProtocolName::from("/test/req-resp/1");
1215
1216		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1217		let mut swarms = (0..2)
1218			.map(|_| {
1219				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1220
1221				tokio::spawn(async move {
1222					while let Some(rq) = rx.next().await {
1223						let (fb_tx, fb_rx) = oneshot::channel();
1224						assert_eq!(rq.payload, b"this is a request");
1225						let _ = rq.pending_response.send(super::OutgoingResponse {
1226							result: Ok(b"this is a response".to_vec()),
1227							reputation_changes: Vec::new(),
1228							sent_feedback: Some(fb_tx),
1229						});
1230						fb_rx.await.unwrap();
1231					}
1232				});
1233
1234				let protocol_config = ProtocolConfig {
1235					name: protocol_name.clone(),
1236					fallback_names: Vec::new(),
1237					max_request_size: 1024,
1238					max_response_size: 1024 * 1024,
1239					request_timeout: Duration::from_secs(30),
1240					inbound_queue: Some(tx),
1241				};
1242
1243				build_swarm(iter::once(protocol_config))
1244			})
1245			.collect::<Vec<_>>();
1246
1247		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1248		// this test, so they wouldn't connect to each other.
1249		{
1250			let dial_addr = swarms[1].1.clone();
1251			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1252		}
1253
1254		let (mut swarm, _) = swarms.remove(0);
1255		// Running `swarm[0]` in the background.
1256		tokio::spawn(async move {
1257			loop {
1258				match swarm.select_next_some().await {
1259					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1260						result.unwrap();
1261					},
1262					_ => {},
1263				}
1264			}
1265		});
1266
1267		// Remove and run the remaining swarm.
1268		let (mut swarm, _) = swarms.remove(0);
1269		let mut response_receiver = None;
1270
1271		loop {
1272			match swarm.select_next_some().await {
1273				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1274					let (sender, receiver) = oneshot::channel();
1275					swarm.behaviour_mut().send_request(
1276						&peer_id,
1277						protocol_name.clone(),
1278						b"this is a request".to_vec(),
1279						None,
1280						sender,
1281						IfDisconnected::ImmediateError,
1282					);
1283					assert!(response_receiver.is_none());
1284					response_receiver = Some(receiver);
1285				},
1286				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1287					result.unwrap();
1288					break
1289				},
1290				_ => {},
1291			}
1292		}
1293
1294		assert_eq!(
1295			response_receiver.unwrap().await.unwrap().unwrap(),
1296			(b"this is a response".to_vec(), protocol_name)
1297		);
1298	}
1299
1300	#[tokio::test]
1301	async fn max_response_size_exceeded() {
1302		let protocol_name = ProtocolName::from("/test/req-resp/1");
1303
1304		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1305		let mut swarms = (0..2)
1306			.map(|_| {
1307				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1308
1309				tokio::spawn(async move {
1310					while let Some(rq) = rx.next().await {
1311						assert_eq!(rq.payload, b"this is a request");
1312						let _ = rq.pending_response.send(super::OutgoingResponse {
1313							result: Ok(b"this response exceeds the limit".to_vec()),
1314							reputation_changes: Vec::new(),
1315							sent_feedback: None,
1316						});
1317					}
1318				});
1319
1320				let protocol_config = ProtocolConfig {
1321					name: protocol_name.clone(),
1322					fallback_names: Vec::new(),
1323					max_request_size: 1024,
1324					max_response_size: 8, // <-- important for the test
1325					request_timeout: Duration::from_secs(30),
1326					inbound_queue: Some(tx),
1327				};
1328
1329				build_swarm(iter::once(protocol_config))
1330			})
1331			.collect::<Vec<_>>();
1332
1333		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1334		// this test, so they wouldn't connect to each other.
1335		{
1336			let dial_addr = swarms[1].1.clone();
1337			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1338		}
1339
1340		// Running `swarm[0]` in the background until a `InboundRequest` event happens,
1341		// which is a hint about the test having ended.
1342		let (mut swarm, _) = swarms.remove(0);
1343		tokio::spawn(async move {
1344			loop {
1345				match swarm.select_next_some().await {
1346					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1347						assert!(result.is_ok());
1348					},
1349					SwarmEvent::ConnectionClosed { .. } => {
1350						break;
1351					},
1352					_ => {},
1353				}
1354			}
1355		});
1356
1357		// Remove and run the remaining swarm.
1358		let (mut swarm, _) = swarms.remove(0);
1359
1360		let mut response_receiver = None;
1361
1362		loop {
1363			match swarm.select_next_some().await {
1364				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1365					let (sender, receiver) = oneshot::channel();
1366					swarm.behaviour_mut().send_request(
1367						&peer_id,
1368						protocol_name.clone(),
1369						b"this is a request".to_vec(),
1370						None,
1371						sender,
1372						IfDisconnected::ImmediateError,
1373					);
1374					assert!(response_receiver.is_none());
1375					response_receiver = Some(receiver);
1376				},
1377				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1378					assert!(result.is_err());
1379					break
1380				},
1381				_ => {},
1382			}
1383		}
1384
1385		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1386			RequestFailure::Network(OutboundFailure::Io(_)) => {},
1387			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1388		}
1389	}
1390
1391	/// A `RequestId` is a unique identifier among either all inbound or all outbound requests for
1392	/// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across
1393	/// multiple [`RequestResponsesBehaviour`] behaviours. Thus, when handling `RequestId` in the
1394	/// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the
1395	/// protocol name with the `RequestId` to get a unique request identifier.
1396	///
1397	/// This test ensures that two requests on different protocols can be handled concurrently
1398	/// without a `RequestId` collision.
1399	///
1400	/// See [`ProtocolRequestId`] for additional information.
1401	#[tokio::test]
1402	async fn request_id_collision() {
1403		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1404		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1405
1406		let mut swarm_1 = {
1407			let protocol_configs = vec![
1408				ProtocolConfig {
1409					name: protocol_name_1.clone(),
1410					fallback_names: Vec::new(),
1411					max_request_size: 1024,
1412					max_response_size: 1024 * 1024,
1413					request_timeout: Duration::from_secs(30),
1414					inbound_queue: None,
1415				},
1416				ProtocolConfig {
1417					name: protocol_name_2.clone(),
1418					fallback_names: Vec::new(),
1419					max_request_size: 1024,
1420					max_response_size: 1024 * 1024,
1421					request_timeout: Duration::from_secs(30),
1422					inbound_queue: None,
1423				},
1424			];
1425
1426			build_swarm(protocol_configs.into_iter()).0
1427		};
1428
1429		let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1430			let (tx_1, rx_1) = async_channel::bounded(64);
1431			let (tx_2, rx_2) = async_channel::bounded(64);
1432
1433			let protocol_configs = vec![
1434				ProtocolConfig {
1435					name: protocol_name_1.clone(),
1436					fallback_names: Vec::new(),
1437					max_request_size: 1024,
1438					max_response_size: 1024 * 1024,
1439					request_timeout: Duration::from_secs(30),
1440					inbound_queue: Some(tx_1),
1441				},
1442				ProtocolConfig {
1443					name: protocol_name_2.clone(),
1444					fallback_names: Vec::new(),
1445					max_request_size: 1024,
1446					max_response_size: 1024 * 1024,
1447					request_timeout: Duration::from_secs(30),
1448					inbound_queue: Some(tx_2),
1449				},
1450			];
1451
1452			let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1453
1454			(swarm, rx_1, rx_2, listen_addr)
1455		};
1456
1457		// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
1458		// so they wouldn't connect to each other.
1459		swarm_1.dial(listen_add_2).unwrap();
1460
1461		// Run swarm 2 in the background, receiving two requests.
1462		tokio::spawn(async move {
1463			loop {
1464				match swarm_2.select_next_some().await {
1465					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1466						result.unwrap();
1467					},
1468					_ => {},
1469				}
1470			}
1471		});
1472
1473		// Handle both requests sent by swarm 1 to swarm 2 in the background.
1474		//
1475		// Make sure both requests overlap, by answering the first only after receiving the
1476		// second.
1477		tokio::spawn(async move {
1478			let protocol_1_request = swarm_2_handler_1.next().await;
1479			let protocol_2_request = swarm_2_handler_2.next().await;
1480
1481			protocol_1_request
1482				.unwrap()
1483				.pending_response
1484				.send(OutgoingResponse {
1485					result: Ok(b"this is a response".to_vec()),
1486					reputation_changes: Vec::new(),
1487					sent_feedback: None,
1488				})
1489				.unwrap();
1490			protocol_2_request
1491				.unwrap()
1492				.pending_response
1493				.send(OutgoingResponse {
1494					result: Ok(b"this is a response".to_vec()),
1495					reputation_changes: Vec::new(),
1496					sent_feedback: None,
1497				})
1498				.unwrap();
1499		});
1500
1501		// Have swarm 1 send two requests to swarm 2 and await responses.
1502
1503		let mut response_receivers = None;
1504		let mut num_responses = 0;
1505
1506		loop {
1507			match swarm_1.select_next_some().await {
1508				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1509					let (sender_1, receiver_1) = oneshot::channel();
1510					let (sender_2, receiver_2) = oneshot::channel();
1511					swarm_1.behaviour_mut().send_request(
1512						&peer_id,
1513						protocol_name_1.clone(),
1514						b"this is a request".to_vec(),
1515						None,
1516						sender_1,
1517						IfDisconnected::ImmediateError,
1518					);
1519					swarm_1.behaviour_mut().send_request(
1520						&peer_id,
1521						protocol_name_2.clone(),
1522						b"this is a request".to_vec(),
1523						None,
1524						sender_2,
1525						IfDisconnected::ImmediateError,
1526					);
1527					assert!(response_receivers.is_none());
1528					response_receivers = Some((receiver_1, receiver_2));
1529				},
1530				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1531					num_responses += 1;
1532					result.unwrap();
1533					if num_responses == 2 {
1534						break
1535					}
1536				},
1537				_ => {},
1538			}
1539		}
1540		let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1541		assert_eq!(
1542			response_receiver_1.await.unwrap().unwrap(),
1543			(b"this is a response".to_vec(), protocol_name_1)
1544		);
1545		assert_eq!(
1546			response_receiver_2.await.unwrap().unwrap(),
1547			(b"this is a response".to_vec(), protocol_name_2)
1548		);
1549	}
1550
1551	#[tokio::test]
1552	async fn request_fallback() {
1553		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1554		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1555		let protocol_name_2 = ProtocolName::from("/test/another");
1556
1557		let protocol_config_1 = ProtocolConfig {
1558			name: protocol_name_1.clone(),
1559			fallback_names: Vec::new(),
1560			max_request_size: 1024,
1561			max_response_size: 1024 * 1024,
1562			request_timeout: Duration::from_secs(30),
1563			inbound_queue: None,
1564		};
1565		let protocol_config_1_fallback = ProtocolConfig {
1566			name: protocol_name_1_fallback.clone(),
1567			fallback_names: Vec::new(),
1568			max_request_size: 1024,
1569			max_response_size: 1024 * 1024,
1570			request_timeout: Duration::from_secs(30),
1571			inbound_queue: None,
1572		};
1573		let protocol_config_2 = ProtocolConfig {
1574			name: protocol_name_2.clone(),
1575			fallback_names: Vec::new(),
1576			max_request_size: 1024,
1577			max_response_size: 1024 * 1024,
1578			request_timeout: Duration::from_secs(30),
1579			inbound_queue: None,
1580		};
1581
1582		// This swarm only speaks protocol_name_1_fallback and protocol_name_2.
1583		// It only responds to requests.
1584		let mut older_swarm = {
1585			let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1586			let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1587			let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1588			protocol_config_1_fallback.inbound_queue = Some(tx_1);
1589
1590			let mut protocol_config_2 = protocol_config_2.clone();
1591			protocol_config_2.inbound_queue = Some(tx_2);
1592
1593			tokio::spawn(async move {
1594				for _ in 0..2 {
1595					if let Some(rq) = rx_1.next().await {
1596						let (fb_tx, fb_rx) = oneshot::channel();
1597						assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1598						let _ = rq.pending_response.send(super::OutgoingResponse {
1599							result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1600							reputation_changes: Vec::new(),
1601							sent_feedback: Some(fb_tx),
1602						});
1603						fb_rx.await.unwrap();
1604					}
1605				}
1606
1607				if let Some(rq) = rx_2.next().await {
1608					let (fb_tx, fb_rx) = oneshot::channel();
1609					assert_eq!(rq.payload, b"request on protocol /test/other");
1610					let _ = rq.pending_response.send(super::OutgoingResponse {
1611						result: Ok(b"this is a response on protocol /test/other".to_vec()),
1612						reputation_changes: Vec::new(),
1613						sent_feedback: Some(fb_tx),
1614					});
1615					fb_rx.await.unwrap();
1616				}
1617			});
1618
1619			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1620		};
1621
1622		// This swarm speaks all protocols.
1623		let mut new_swarm = build_swarm(
1624			vec![
1625				protocol_config_1.clone(),
1626				protocol_config_1_fallback.clone(),
1627				protocol_config_2.clone(),
1628			]
1629			.into_iter(),
1630		);
1631
1632		{
1633			let dial_addr = older_swarm.1.clone();
1634			Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1635		}
1636
1637		// Running `older_swarm`` in the background.
1638		tokio::spawn(async move {
1639			loop {
1640				_ = older_swarm.0.select_next_some().await;
1641			}
1642		});
1643
1644		// Run the newer swarm. Attempt to make requests on all protocols.
1645		let (mut swarm, _) = new_swarm;
1646		let mut older_peer_id = None;
1647
1648		let mut response_receiver = None;
1649		// Try the new protocol with a fallback.
1650		loop {
1651			match swarm.select_next_some().await {
1652				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1653					older_peer_id = Some(peer_id);
1654					let (sender, receiver) = oneshot::channel();
1655					swarm.behaviour_mut().send_request(
1656						&peer_id,
1657						protocol_name_1.clone(),
1658						b"request on protocol /test/req-resp/2".to_vec(),
1659						Some((
1660							b"request on protocol /test/req-resp/1".to_vec(),
1661							protocol_config_1_fallback.name.clone(),
1662						)),
1663						sender,
1664						IfDisconnected::ImmediateError,
1665					);
1666					response_receiver = Some(receiver);
1667				},
1668				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1669					result.unwrap();
1670					break
1671				},
1672				_ => {},
1673			}
1674		}
1675		assert_eq!(
1676			response_receiver.unwrap().await.unwrap().unwrap(),
1677			(
1678				b"this is a response on protocol /test/req-resp/1".to_vec(),
1679				protocol_name_1_fallback.clone()
1680			)
1681		);
1682		// Try the old protocol with a useless fallback.
1683		let (sender, response_receiver) = oneshot::channel();
1684		swarm.behaviour_mut().send_request(
1685			older_peer_id.as_ref().unwrap(),
1686			protocol_name_1_fallback.clone(),
1687			b"request on protocol /test/req-resp/1".to_vec(),
1688			Some((
1689				b"dummy request, will fail if processed".to_vec(),
1690				protocol_config_1_fallback.name.clone(),
1691			)),
1692			sender,
1693			IfDisconnected::ImmediateError,
1694		);
1695		loop {
1696			match swarm.select_next_some().await {
1697				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1698					result.unwrap();
1699					break
1700				},
1701				_ => {},
1702			}
1703		}
1704		assert_eq!(
1705			response_receiver.await.unwrap().unwrap(),
1706			(
1707				b"this is a response on protocol /test/req-resp/1".to_vec(),
1708				protocol_name_1_fallback.clone()
1709			)
1710		);
1711		// Try the new protocol with no fallback. Should fail.
1712		let (sender, response_receiver) = oneshot::channel();
1713		swarm.behaviour_mut().send_request(
1714			older_peer_id.as_ref().unwrap(),
1715			protocol_name_1.clone(),
1716			b"request on protocol /test/req-resp-2".to_vec(),
1717			None,
1718			sender,
1719			IfDisconnected::ImmediateError,
1720		);
1721		loop {
1722			match swarm.select_next_some().await {
1723				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1724					assert_matches!(
1725						result.unwrap_err(),
1726						RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1727					);
1728					break
1729				},
1730				_ => {},
1731			}
1732		}
1733		assert!(response_receiver.await.unwrap().is_err());
1734		// Try the other protocol with no fallback.
1735		let (sender, response_receiver) = oneshot::channel();
1736		swarm.behaviour_mut().send_request(
1737			older_peer_id.as_ref().unwrap(),
1738			protocol_name_2.clone(),
1739			b"request on protocol /test/other".to_vec(),
1740			None,
1741			sender,
1742			IfDisconnected::ImmediateError,
1743		);
1744		loop {
1745			match swarm.select_next_some().await {
1746				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1747					result.unwrap();
1748					break
1749				},
1750				_ => {},
1751			}
1752		}
1753		assert_eq!(
1754			response_receiver.await.unwrap().unwrap(),
1755			(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1756		);
1757	}
1758
1759	/// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error
1760	/// even if the libp2p component hangs.
1761	///
1762	/// For testing purposes, the communication happens on the `/test/req-resp/1` protocol.
1763	///
1764	/// This is achieved by:
1765	/// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10
1766	///   seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in
1767	///   substrate this is set to 1 second.
1768	///
1769	/// - The first swarm introduces a delay of 2 seconds before responding to the request.
1770	///
1771	/// - The second swarm must enforce the 1 second timeout.
1772	#[tokio::test]
1773	async fn enforce_outbound_timeouts() {
1774		const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1775		const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1776
1777		// These swarms only speaks protocol_name.
1778		let protocol_name = ProtocolName::from("/test/req-resp/1");
1779
1780		let protocol_config = ProtocolConfig {
1781			name: protocol_name.clone(),
1782			fallback_names: Vec::new(),
1783			max_request_size: 1024,
1784			max_response_size: 1024 * 1024,
1785			request_timeout: REQUEST_TIMEOUT, // <-- important for the test
1786			inbound_queue: None,
1787		};
1788
1789		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1790		let (mut first_swarm, _) = {
1791			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1792
1793			tokio::spawn(async move {
1794				if let Some(rq) = rx.next().await {
1795					assert_eq!(rq.payload, b"this is a request");
1796
1797					// Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than
1798					// `REQUEST_TIMEOUT`.
1799					tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1800
1801					// By the time the response is sent back, the second swarm
1802					// received Timeout.
1803					let _ = rq.pending_response.send(super::OutgoingResponse {
1804						result: Ok(b"Second swarm already timedout".to_vec()),
1805						reputation_changes: Vec::new(),
1806						sent_feedback: None,
1807					});
1808				}
1809			});
1810
1811			let mut protocol_config = protocol_config.clone();
1812			protocol_config.inbound_queue = Some(tx);
1813
1814			build_swarm(iter::once(protocol_config))
1815		};
1816
1817		let (mut second_swarm, second_address) = {
1818			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1819
1820			tokio::spawn(async move {
1821				while let Some(rq) = rx.next().await {
1822					let _ = rq.pending_response.send(super::OutgoingResponse {
1823						result: Ok(b"This is the response".to_vec()),
1824						reputation_changes: Vec::new(),
1825						sent_feedback: None,
1826					});
1827				}
1828			});
1829			let mut protocol_config = protocol_config.clone();
1830			protocol_config.inbound_queue = Some(tx);
1831
1832			build_swarm(iter::once(protocol_config.clone()))
1833		};
1834		// Modify the second swarm to have a shorter timeout.
1835		second_swarm
1836			.behaviour_mut()
1837			.protocols
1838			.get_mut(&protocol_name)
1839			.unwrap()
1840			.request_timeout = REQUEST_TIMEOUT_SHORT;
1841
1842		// Ask first swarm to dial the second swarm.
1843		{
1844			Swarm::dial(&mut first_swarm, second_address).unwrap();
1845		}
1846
1847		// Running the first swarm in the background until a `InboundRequest` event happens,
1848		// which is a hint about the test having ended.
1849		tokio::spawn(async move {
1850			loop {
1851				let event = first_swarm.select_next_some().await;
1852				match event {
1853					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1854						assert!(result.is_ok());
1855						break;
1856					},
1857					SwarmEvent::ConnectionClosed { .. } => {
1858						break;
1859					},
1860					_ => {},
1861				}
1862			}
1863		});
1864
1865		// Run the second swarm.
1866		// - on connection established send the request to the first swarm
1867		// - expect to receive a timeout
1868		let mut response_receiver = None;
1869		loop {
1870			let event = second_swarm.select_next_some().await;
1871
1872			match event {
1873				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1874					let (sender, receiver) = oneshot::channel();
1875					second_swarm.behaviour_mut().send_request(
1876						&peer_id,
1877						protocol_name.clone(),
1878						b"this is a request".to_vec(),
1879						None,
1880						sender,
1881						IfDisconnected::ImmediateError,
1882					);
1883					assert!(response_receiver.is_none());
1884					response_receiver = Some(receiver);
1885				},
1886				SwarmEvent::ConnectionClosed { .. } => {
1887					break;
1888				},
1889				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1890					assert!(result.is_err());
1891					break
1892				},
1893				_ => {},
1894			}
1895		}
1896
1897		// Expect the timeout.
1898		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1899			RequestFailure::Network(OutboundFailure::Timeout) => {},
1900			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1901		}
1902	}
1903}