sc_network/
request_responses.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Collection of request-response protocols.
20//!
21//! The [`RequestResponsesBehaviour`] struct defined in this module provides support for zero or
22//! more so-called "request-response" protocols.
23//!
24//! A request-response protocol works in the following way:
25//!
26//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
27//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
28//! with the request itself. The remote then sends the size of the response as a LEB128 number,
29//! followed with the response.
30//!
31//! - Requests have a certain time limit before they time out. This time includes the time it
32//! takes to send/receive the request and response.
33//!
34//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
35//! is used to handle incoming requests.
36
37use crate::{
38	peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39	service::traits::RequestResponseConfig as RequestResponseConfigT,
40	types::ProtocolName,
41	ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46	core::{Endpoint, Multiaddr},
47	request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48	swarm::{
49		behaviour::{ConnectionClosed, FromSwarm},
50		handler::multi::MultiHandler,
51		ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, THandler,
52		THandlerInEvent, THandlerOutEvent, ToSwarm,
53	},
54	PeerId,
55};
56
57use std::{
58	collections::{hash_map::Entry, HashMap},
59	io, iter,
60	ops::Deref,
61	pin::Pin,
62	sync::Arc,
63	task::{Context, Poll},
64	time::{Duration, Instant},
65};
66
67pub use libp2p::request_response::{Config, RequestId};
68
69/// Possible failures occurring in the context of sending an outbound request and receiving the
70/// response.
71#[derive(Debug, thiserror::Error)]
72pub enum OutboundFailure {
73	/// The request could not be sent because a dialing attempt failed.
74	#[error("Failed to dial the requested peer")]
75	DialFailure,
76	/// The request timed out before a response was received.
77	#[error("Timeout while waiting for a response")]
78	Timeout,
79	/// The connection closed before a response was received.
80	#[error("Connection was closed before a response was received")]
81	ConnectionClosed,
82	/// The remote supports none of the requested protocols.
83	#[error("The remote supports none of the requested protocols")]
84	UnsupportedProtocols,
85}
86
87impl From<request_response::OutboundFailure> for OutboundFailure {
88	fn from(out: request_response::OutboundFailure) -> Self {
89		match out {
90			request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
91			request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
92			request_response::OutboundFailure::ConnectionClosed =>
93				OutboundFailure::ConnectionClosed,
94			request_response::OutboundFailure::UnsupportedProtocols =>
95				OutboundFailure::UnsupportedProtocols,
96		}
97	}
98}
99
100/// Possible failures occurring in the context of receiving an inbound request and sending a
101/// response.
102#[derive(Debug, thiserror::Error)]
103pub enum InboundFailure {
104	/// The inbound request timed out, either while reading the incoming request or before a
105	/// response is sent
106	#[error("Timeout while receiving request or sending response")]
107	Timeout,
108	/// The connection closed before a response could be send.
109	#[error("Connection was closed before a response could be sent")]
110	ConnectionClosed,
111	/// The local peer supports none of the protocols requested by the remote.
112	#[error("The local peer supports none of the protocols requested by the remote")]
113	UnsupportedProtocols,
114	/// The local peer failed to respond to an inbound request
115	#[error("The response channel was dropped without sending a response to the remote")]
116	ResponseOmission,
117}
118
119impl From<request_response::InboundFailure> for InboundFailure {
120	fn from(out: request_response::InboundFailure) -> Self {
121		match out {
122			request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
123			request_response::InboundFailure::Timeout => InboundFailure::Timeout,
124			request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
125			request_response::InboundFailure::UnsupportedProtocols =>
126				InboundFailure::UnsupportedProtocols,
127		}
128	}
129}
130
131/// Error in a request.
132#[derive(Debug, thiserror::Error)]
133#[allow(missing_docs)]
134pub enum RequestFailure {
135	#[error("We are not currently connected to the requested peer.")]
136	NotConnected,
137	#[error("Given protocol hasn't been registered.")]
138	UnknownProtocol,
139	#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
140	Refused,
141	#[error("The remote replied, but the local node is no longer interested in the response.")]
142	Obsolete,
143	#[error("Problem on the network: {0}")]
144	Network(OutboundFailure),
145}
146
147/// Configuration for a single request-response protocol.
148#[derive(Debug, Clone)]
149pub struct ProtocolConfig {
150	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
151	pub name: ProtocolName,
152
153	/// Fallback on the wire protocol names to support.
154	pub fallback_names: Vec<ProtocolName>,
155
156	/// Maximum allowed size, in bytes, of a request.
157	///
158	/// Any request larger than this value will be declined as a way to avoid allocating too
159	/// much memory for it.
160	pub max_request_size: u64,
161
162	/// Maximum allowed size, in bytes, of a response.
163	///
164	/// Any response larger than this value will be declined as a way to avoid allocating too
165	/// much memory for it.
166	pub max_response_size: u64,
167
168	/// Duration after which emitted requests are considered timed out.
169	///
170	/// If you expect the response to come back quickly, you should set this to a smaller duration.
171	pub request_timeout: Duration,
172
173	/// Channel on which the networking service will send incoming requests.
174	///
175	/// Every time a peer sends a request to the local node using this protocol, the networking
176	/// service will push an element on this channel. The receiving side of this channel then has
177	/// to pull this element, process the request, and send back the response to send back to the
178	/// peer.
179	///
180	/// The size of the channel has to be carefully chosen. If the channel is full, the networking
181	/// service will discard the incoming request send back an error to the peer. Consequently,
182	/// the channel being full is an indicator that the node is overloaded.
183	///
184	/// You can typically set the size of the channel to `T / d`, where `T` is the
185	/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
186	/// build a response.
187	///
188	/// Can be `None` if the local node does not support answering incoming requests.
189	/// If this is `None`, then the local node will not advertise support for this protocol towards
190	/// other peers. If this is `Some` but the channel is closed, then the local node will
191	/// advertise support for this protocol, but any incoming request will lead to an error being
192	/// sent back.
193	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
194}
195
196impl RequestResponseConfigT for ProtocolConfig {
197	fn protocol_name(&self) -> &ProtocolName {
198		&self.name
199	}
200}
201
202/// A single request received by a peer on a request-response protocol.
203#[derive(Debug)]
204pub struct IncomingRequest {
205	/// Who sent the request.
206	pub peer: sc_network_types::PeerId,
207
208	/// Request sent by the remote. Will always be smaller than
209	/// [`ProtocolConfig::max_request_size`].
210	pub payload: Vec<u8>,
211
212	/// Channel to send back the response.
213	///
214	/// There are two ways to indicate that handling the request failed:
215	///
216	/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
217	///
218	/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
219	/// the given peer.
220	pub pending_response: oneshot::Sender<OutgoingResponse>,
221}
222
223/// Response for an incoming request to be send by a request protocol handler.
224#[derive(Debug)]
225pub struct OutgoingResponse {
226	/// The payload of the response.
227	///
228	/// `Err(())` if none is available e.g. due an error while handling the request.
229	pub result: Result<Vec<u8>, ()>,
230
231	/// Reputation changes accrued while handling the request. To be applied to the reputation of
232	/// the peer sending the request.
233	pub reputation_changes: Vec<ReputationChange>,
234
235	/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
236	/// peer.
237	///
238	/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
239	/// >			outgoing data for each TCP socket, and it is not possible for a user
240	/// >			application to inspect this buffer. This channel here is not actually notified
241	/// >			when the response has been fully sent out, but rather when it has fully been
242	/// >			written to the buffer managed by the operating system.
243	pub sent_feedback: Option<oneshot::Sender<()>>,
244}
245
246/// Information stored about a pending request.
247struct PendingRequest {
248	started_at: Instant,
249	response_tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
250	fallback_request: Option<(Vec<u8>, ProtocolName)>,
251}
252
253/// When sending a request, what to do on a disconnected recipient.
254#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
255pub enum IfDisconnected {
256	/// Try to connect to the peer.
257	TryConnect,
258	/// Just fail if the destination is not yet connected.
259	ImmediateError,
260}
261
262/// Convenience functions for `IfDisconnected`.
263impl IfDisconnected {
264	/// Shall we connect to a disconnected peer?
265	pub fn should_connect(self) -> bool {
266		match self {
267			Self::TryConnect => true,
268			Self::ImmediateError => false,
269		}
270	}
271}
272
273/// Event generated by the [`RequestResponsesBehaviour`].
274#[derive(Debug)]
275pub enum Event {
276	/// A remote sent a request and either we have successfully answered it or an error happened.
277	///
278	/// This event is generated for statistics purposes.
279	InboundRequest {
280		/// Peer which has emitted the request.
281		peer: PeerId,
282		/// Name of the protocol in question.
283		protocol: ProtocolName,
284		/// Whether handling the request was successful or unsuccessful.
285		///
286		/// When successful contains the time elapsed between when we received the request and when
287		/// we sent back the response. When unsuccessful contains the failure reason.
288		result: Result<Duration, ResponseFailure>,
289	},
290
291	/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
292	/// failed.
293	///
294	/// This event is generated for statistics purposes.
295	RequestFinished {
296		/// Peer that we send a request to.
297		peer: PeerId,
298		/// Name of the protocol in question.
299		protocol: ProtocolName,
300		/// Duration the request took.
301		duration: Duration,
302		/// Result of the request.
303		result: Result<(), RequestFailure>,
304	},
305
306	/// A request protocol handler issued reputation changes for the given peer.
307	ReputationChanges {
308		/// Peer whose reputation needs to be adjust.
309		peer: PeerId,
310		/// Reputation changes.
311		changes: Vec<ReputationChange>,
312	},
313}
314
315/// Combination of a protocol name and a request id.
316///
317/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
318/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
319/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
320/// [`ProtocolRequestId`]s.
321#[derive(Debug, Clone, PartialEq, Eq, Hash)]
322struct ProtocolRequestId {
323	protocol: ProtocolName,
324	request_id: RequestId,
325}
326
327impl From<(ProtocolName, RequestId)> for ProtocolRequestId {
328	fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
329		Self { protocol, request_id }
330	}
331}
332
333/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
334pub struct RequestResponsesBehaviour {
335	/// The multiple sub-protocols, by name.
336	///
337	/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
338	/// "response builder" used to build responses for incoming requests.
339	protocols: HashMap<
340		ProtocolName,
341		(Behaviour<GenericCodec>, Option<async_channel::Sender<IncomingRequest>>),
342	>,
343
344	/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
345	pending_requests: HashMap<ProtocolRequestId, PendingRequest>,
346
347	/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
348	/// start time and the response to send back to the remote.
349	pending_responses: stream::FuturesUnordered<
350		Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
351	>,
352
353	/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
354	pending_responses_arrival_time: HashMap<ProtocolRequestId, Instant>,
355
356	/// Whenever a response is received on `pending_responses`, insert a channel to be notified
357	/// when the request has been sent out.
358	send_feedback: HashMap<ProtocolRequestId, oneshot::Sender<()>>,
359
360	/// Primarily used to get a reputation of a node.
361	peer_store: Arc<dyn PeerStoreProvider>,
362}
363
364/// Generated by the response builder and waiting to be processed.
365struct RequestProcessingOutcome {
366	peer: PeerId,
367	request_id: RequestId,
368	protocol: ProtocolName,
369	inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
370	response: OutgoingResponse,
371}
372
373impl RequestResponsesBehaviour {
374	/// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
375	/// the same protocol is passed twice.
376	pub fn new(
377		list: impl Iterator<Item = ProtocolConfig>,
378		peer_store: Arc<dyn PeerStoreProvider>,
379	) -> Result<Self, RegisterError> {
380		let mut protocols = HashMap::new();
381		for protocol in list {
382			let mut cfg = Config::default();
383			cfg.set_request_timeout(protocol.request_timeout);
384
385			let protocol_support = if protocol.inbound_queue.is_some() {
386				ProtocolSupport::Full
387			} else {
388				ProtocolSupport::Outbound
389			};
390
391			let rq_rp = Behaviour::with_codec(
392				GenericCodec {
393					max_request_size: protocol.max_request_size,
394					max_response_size: protocol.max_response_size,
395				},
396				iter::once(protocol.name.clone())
397					.chain(protocol.fallback_names)
398					.zip(iter::repeat(protocol_support)),
399				cfg,
400			);
401
402			match protocols.entry(protocol.name) {
403				Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)),
404				Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
405			};
406		}
407
408		Ok(Self {
409			protocols,
410			pending_requests: Default::default(),
411			pending_responses: Default::default(),
412			pending_responses_arrival_time: Default::default(),
413			send_feedback: Default::default(),
414			peer_store,
415		})
416	}
417
418	/// Initiates sending a request.
419	///
420	/// If there is no established connection to the target peer, the behavior is determined by the
421	/// choice of `connect`.
422	///
423	/// An error is returned if the protocol doesn't match one that has been registered.
424	pub fn send_request(
425		&mut self,
426		target: &PeerId,
427		protocol_name: ProtocolName,
428		request: Vec<u8>,
429		fallback_request: Option<(Vec<u8>, ProtocolName)>,
430		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
431		connect: IfDisconnected,
432	) {
433		log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len());
434
435		if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) {
436			Self::send_request_inner(
437				protocol,
438				&mut self.pending_requests,
439				target,
440				protocol_name,
441				request,
442				fallback_request,
443				pending_response,
444				connect,
445			)
446		} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
447			log::debug!(
448				target: "sub-libp2p",
449				"Unknown protocol {:?}. At the same time local \
450				 node is no longer interested in the result.",
451				protocol_name,
452			);
453		}
454	}
455
456	fn send_request_inner(
457		behaviour: &mut Behaviour<GenericCodec>,
458		pending_requests: &mut HashMap<ProtocolRequestId, PendingRequest>,
459		target: &PeerId,
460		protocol_name: ProtocolName,
461		request: Vec<u8>,
462		fallback_request: Option<(Vec<u8>, ProtocolName)>,
463		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
464		connect: IfDisconnected,
465	) {
466		if behaviour.is_connected(target) || connect.should_connect() {
467			let request_id = behaviour.send_request(target, request);
468			let prev_req_id = pending_requests.insert(
469				(protocol_name.to_string().into(), request_id).into(),
470				PendingRequest {
471					started_at: Instant::now(),
472					response_tx: pending_response,
473					fallback_request,
474				},
475			);
476			debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
477		} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
478			log::debug!(
479				target: "sub-libp2p",
480				"Not connected to peer {:?}. At the same time local \
481				 node is no longer interested in the result.",
482				target,
483			);
484		}
485	}
486}
487
488impl NetworkBehaviour for RequestResponsesBehaviour {
489	type ConnectionHandler =
490		MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
491	type ToSwarm = Event;
492
493	fn handle_pending_inbound_connection(
494		&mut self,
495		_connection_id: ConnectionId,
496		_local_addr: &Multiaddr,
497		_remote_addr: &Multiaddr,
498	) -> Result<(), ConnectionDenied> {
499		Ok(())
500	}
501
502	fn handle_pending_outbound_connection(
503		&mut self,
504		_connection_id: ConnectionId,
505		_maybe_peer: Option<PeerId>,
506		_addresses: &[Multiaddr],
507		_effective_role: Endpoint,
508	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
509		Ok(Vec::new())
510	}
511
512	fn handle_established_inbound_connection(
513		&mut self,
514		connection_id: ConnectionId,
515		peer: PeerId,
516		local_addr: &Multiaddr,
517		remote_addr: &Multiaddr,
518	) -> Result<THandler<Self>, ConnectionDenied> {
519		let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| {
520			if let Ok(handler) = r.handle_established_inbound_connection(
521				connection_id,
522				peer,
523				local_addr,
524				remote_addr,
525			) {
526				Some((p.to_string(), handler))
527			} else {
528				None
529			}
530		});
531
532		Ok(MultiHandler::try_from_iter(iter).expect(
533			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
534			 which is the only possible error; qed",
535		))
536	}
537
538	fn handle_established_outbound_connection(
539		&mut self,
540		connection_id: ConnectionId,
541		peer: PeerId,
542		addr: &Multiaddr,
543		role_override: Endpoint,
544	) -> Result<THandler<Self>, ConnectionDenied> {
545		let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| {
546			if let Ok(handler) =
547				r.handle_established_outbound_connection(connection_id, peer, addr, role_override)
548			{
549				Some((p.to_string(), handler))
550			} else {
551				None
552			}
553		});
554
555		Ok(MultiHandler::try_from_iter(iter).expect(
556			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
557			 which is the only possible error; qed",
558		))
559	}
560
561	fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
562		match event {
563			FromSwarm::ConnectionEstablished(e) =>
564				for (p, _) in self.protocols.values_mut() {
565					NetworkBehaviour::on_swarm_event(p, FromSwarm::ConnectionEstablished(e));
566				},
567			FromSwarm::ConnectionClosed(ConnectionClosed {
568				peer_id,
569				connection_id,
570				endpoint,
571				handler,
572				remaining_established,
573			}) =>
574				for (p_name, p_handler) in handler.into_iter() {
575					if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
576						proto.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
577							peer_id,
578							connection_id,
579							endpoint,
580							handler: p_handler,
581							remaining_established,
582						}));
583					} else {
584						log::error!(
585						  target: "sub-libp2p",
586						  "on_swarm_event/connection_closed: no request-response instance registered for protocol {:?}",
587						  p_name,
588						)
589					}
590				},
591			FromSwarm::DialFailure(e) =>
592				for (p, _) in self.protocols.values_mut() {
593					NetworkBehaviour::on_swarm_event(p, FromSwarm::DialFailure(e));
594				},
595			FromSwarm::ListenerClosed(e) =>
596				for (p, _) in self.protocols.values_mut() {
597					NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerClosed(e));
598				},
599			FromSwarm::ListenFailure(e) =>
600				for (p, _) in self.protocols.values_mut() {
601					NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenFailure(e));
602				},
603			FromSwarm::ListenerError(e) =>
604				for (p, _) in self.protocols.values_mut() {
605					NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerError(e));
606				},
607			FromSwarm::ExternalAddrExpired(e) =>
608				for (p, _) in self.protocols.values_mut() {
609					NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrExpired(e));
610				},
611			FromSwarm::NewListener(e) =>
612				for (p, _) in self.protocols.values_mut() {
613					NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListener(e));
614				},
615			FromSwarm::ExpiredListenAddr(e) =>
616				for (p, _) in self.protocols.values_mut() {
617					NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredListenAddr(e));
618				},
619			FromSwarm::NewExternalAddrCandidate(e) =>
620				for (p, _) in self.protocols.values_mut() {
621					NetworkBehaviour::on_swarm_event(p, FromSwarm::NewExternalAddrCandidate(e));
622				},
623			FromSwarm::ExternalAddrConfirmed(e) =>
624				for (p, _) in self.protocols.values_mut() {
625					NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrConfirmed(e));
626				},
627			FromSwarm::AddressChange(e) =>
628				for (p, _) in self.protocols.values_mut() {
629					NetworkBehaviour::on_swarm_event(p, FromSwarm::AddressChange(e));
630				},
631			FromSwarm::NewListenAddr(e) =>
632				for (p, _) in self.protocols.values_mut() {
633					NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListenAddr(e));
634				},
635		}
636	}
637
638	fn on_connection_handler_event(
639		&mut self,
640		peer_id: PeerId,
641		connection_id: ConnectionId,
642		event: THandlerOutEvent<Self>,
643	) {
644		let p_name = event.0;
645		if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
646			return proto.on_connection_handler_event(peer_id, connection_id, event.1)
647		} else {
648			log::warn!(
649				target: "sub-libp2p",
650				"on_connection_handler_event: no request-response instance registered for protocol {:?}",
651				p_name
652			);
653		}
654	}
655
656	fn poll(
657		&mut self,
658		cx: &mut Context,
659		params: &mut impl PollParameters,
660	) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
661		'poll_all: loop {
662			// Poll to see if any response is ready to be sent back.
663			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
664				let RequestProcessingOutcome {
665					peer,
666					request_id,
667					protocol: protocol_name,
668					inner_channel,
669					response: OutgoingResponse { result, reputation_changes, sent_feedback },
670				} = match outcome {
671					Some(outcome) => outcome,
672					// The response builder was too busy or handling the request failed. This is
673					// later on reported as a `InboundFailure::Omission`.
674					None => continue,
675				};
676
677				if let Ok(payload) = result {
678					if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
679						log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
680
681						if protocol.send_response(inner_channel, Ok(payload)).is_err() {
682							// Note: Failure is handled further below when receiving
683							// `InboundFailure` event from request-response [`Behaviour`].
684							log::debug!(
685								target: "sub-libp2p",
686								"Failed to send response for {:?} on protocol {:?} due to a \
687								 timeout or due to the connection to the peer being closed. \
688								 Dropping response",
689								request_id, protocol_name,
690							);
691						} else if let Some(sent_feedback) = sent_feedback {
692							self.send_feedback
693								.insert((protocol_name, request_id).into(), sent_feedback);
694						}
695					}
696				}
697
698				if !reputation_changes.is_empty() {
699					return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
700						peer,
701						changes: reputation_changes,
702					}))
703				}
704			}
705
706			let mut fallback_requests = vec![];
707
708			// Poll request-responses protocols.
709			for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols {
710				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) {
711					let ev = match ev {
712						// Main events we are interested in.
713						ToSwarm::GenerateEvent(ev) => ev,
714
715						// Other events generated by the underlying behaviour are transparently
716						// passed through.
717						ToSwarm::Dial { opts } => {
718							if opts.get_peer_id().is_none() {
719								log::error!(
720									"The request-response isn't supposed to start dialing addresses"
721								);
722							}
723							return Poll::Ready(ToSwarm::Dial { opts })
724						},
725						ToSwarm::NotifyHandler { peer_id, handler, event } =>
726							return Poll::Ready(ToSwarm::NotifyHandler {
727								peer_id,
728								handler,
729								event: ((*protocol).to_string(), event),
730							}),
731						ToSwarm::CloseConnection { peer_id, connection } =>
732							return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
733						ToSwarm::NewExternalAddrCandidate(observed) =>
734							return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
735						ToSwarm::ExternalAddrConfirmed(addr) =>
736							return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
737						ToSwarm::ExternalAddrExpired(addr) =>
738							return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
739						ToSwarm::ListenOn { opts } =>
740							return Poll::Ready(ToSwarm::ListenOn { opts }),
741						ToSwarm::RemoveListener { id } =>
742							return Poll::Ready(ToSwarm::RemoveListener { id }),
743					};
744
745					match ev {
746						// Received a request from a remote.
747						request_response::Event::Message {
748							peer,
749							message: Message::Request { request_id, request, channel, .. },
750						} => {
751							self.pending_responses_arrival_time
752								.insert((protocol.clone(), request_id).into(), Instant::now());
753
754							let reputation = self.peer_store.peer_reputation(&peer.into());
755
756							if reputation < BANNED_THRESHOLD {
757								log::debug!(
758									target: "sub-libp2p",
759									"Cannot handle requests from a node with a low reputation {}: {}",
760									peer,
761									reputation,
762								);
763								continue 'poll_protocol
764							}
765
766							let (tx, rx) = oneshot::channel();
767
768							// Submit the request to the "response builder" passed by the user at
769							// initialization.
770							if let Some(resp_builder) = resp_builder {
771								// If the response builder is too busy, silently drop `tx`. This
772								// will be reported by the corresponding request-response
773								// [`Behaviour`] through an `InboundFailure::Omission` event.
774								// Note that we use `async_channel::bounded` and not `mpsc::channel`
775								// because the latter allocates an extra slot for every cloned
776								// sender.
777								let _ = resp_builder.try_send(IncomingRequest {
778									peer: peer.into(),
779									payload: request,
780									pending_response: tx,
781								});
782							} else {
783								debug_assert!(false, "Received message on outbound-only protocol.");
784							}
785
786							let protocol = protocol.clone();
787
788							self.pending_responses.push(Box::pin(async move {
789								// The `tx` created above can be dropped if we are not capable of
790								// processing this request, which is reflected as a
791								// `InboundFailure::Omission` event.
792								rx.await.map_or(None, |response| {
793									Some(RequestProcessingOutcome {
794										peer,
795										request_id,
796										protocol,
797										inner_channel: channel,
798										response,
799									})
800								})
801							}));
802
803							// This `continue` makes sure that `pending_responses` gets polled
804							// after we have added the new element.
805							continue 'poll_all
806						},
807
808						// Received a response from a remote to one of our requests.
809						request_response::Event::Message {
810							peer,
811							message: Message::Response { request_id, response },
812							..
813						} => {
814							let (started, delivered) = match self
815								.pending_requests
816								.remove(&(protocol.clone(), request_id).into())
817							{
818								Some(PendingRequest { started_at, response_tx, .. }) => {
819									log::trace!(
820										target: "sub-libp2p",
821										"received response from {peer} ({protocol:?}), {} bytes",
822										response.as_ref().map_or(0usize, |response| response.len()),
823									);
824
825									let delivered = response_tx
826										.send(
827											response
828												.map_err(|()| RequestFailure::Refused)
829												.map(|resp| (resp, protocol.clone())),
830										)
831										.map_err(|_| RequestFailure::Obsolete);
832									(started_at, delivered)
833								},
834								None => {
835									log::warn!(
836										target: "sub-libp2p",
837										"Received `RequestResponseEvent::Message` with unexpected request id {:?}",
838										request_id,
839									);
840									debug_assert!(false);
841									continue
842								},
843							};
844
845							let out = Event::RequestFinished {
846								peer,
847								protocol: protocol.clone(),
848								duration: started.elapsed(),
849								result: delivered,
850							};
851
852							return Poll::Ready(ToSwarm::GenerateEvent(out))
853						},
854
855						// One of our requests has failed.
856						request_response::Event::OutboundFailure {
857							peer,
858							request_id,
859							error,
860							..
861						} => {
862							let started = match self
863								.pending_requests
864								.remove(&(protocol.clone(), request_id).into())
865							{
866								Some(PendingRequest {
867									started_at,
868									response_tx,
869									fallback_request,
870								}) => {
871									// Try using the fallback request if the protocol was not
872									// supported.
873									if let request_response::OutboundFailure::UnsupportedProtocols =
874										error
875									{
876										if let Some((fallback_request, fallback_protocol)) =
877											fallback_request
878										{
879											log::trace!(
880												target: "sub-libp2p",
881												"Request with id {:?} failed. Trying the fallback protocol. {}",
882												request_id,
883												fallback_protocol.deref()
884											);
885											fallback_requests.push((
886												peer,
887												fallback_protocol,
888												fallback_request,
889												response_tx,
890											));
891											continue
892										}
893									}
894
895									if response_tx
896										.send(Err(RequestFailure::Network(error.clone().into())))
897										.is_err()
898									{
899										log::debug!(
900											target: "sub-libp2p",
901											"Request with id {:?} failed. At the same time local \
902											 node is no longer interested in the result.",
903											request_id,
904										);
905									}
906									started_at
907								},
908								None => {
909									log::warn!(
910										target: "sub-libp2p",
911										"Received `RequestResponseEvent::Message` with unexpected request id {:?}",
912										request_id,
913									);
914									debug_assert!(false);
915									continue
916								},
917							};
918
919							let out = Event::RequestFinished {
920								peer,
921								protocol: protocol.clone(),
922								duration: started.elapsed(),
923								result: Err(RequestFailure::Network(error.into())),
924							};
925
926							return Poll::Ready(ToSwarm::GenerateEvent(out))
927						},
928
929						// An inbound request failed, either while reading the request or due to
930						// failing to send a response.
931						request_response::Event::InboundFailure {
932							request_id, peer, error, ..
933						} => {
934							self.pending_responses_arrival_time
935								.remove(&(protocol.clone(), request_id).into());
936							self.send_feedback.remove(&(protocol.clone(), request_id).into());
937							let out = Event::InboundRequest {
938								peer,
939								protocol: protocol.clone(),
940								result: Err(ResponseFailure::Network(error.into())),
941							};
942							return Poll::Ready(ToSwarm::GenerateEvent(out))
943						},
944
945						// A response to an inbound request has been sent.
946						request_response::Event::ResponseSent { request_id, peer } => {
947							let arrival_time = self
948								.pending_responses_arrival_time
949								.remove(&(protocol.clone(), request_id).into())
950								.map(|t| t.elapsed())
951								.expect(
952									"Time is added for each inbound request on arrival and only \
953									 removed on success (`ResponseSent`) or failure \
954									 (`InboundFailure`). One can not receive a success event for a \
955									 request that either never arrived, or that has previously \
956									 failed; qed.",
957								);
958
959							if let Some(send_feedback) =
960								self.send_feedback.remove(&(protocol.clone(), request_id).into())
961							{
962								let _ = send_feedback.send(());
963							}
964
965							let out = Event::InboundRequest {
966								peer,
967								protocol: protocol.clone(),
968								result: Ok(arrival_time),
969							};
970
971							return Poll::Ready(ToSwarm::GenerateEvent(out))
972						},
973					};
974				}
975			}
976
977			// Send out fallback requests.
978			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
979				if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) {
980					Self::send_request_inner(
981						behaviour,
982						&mut self.pending_requests,
983						&peer,
984						protocol,
985						request,
986						None,
987						pending_response,
988						// We can error if not connected because the
989						// previous attempt would have tried to establish a
990						// connection already or errored and we wouldn't have gotten here.
991						IfDisconnected::ImmediateError,
992					);
993				}
994			}
995
996			break Poll::Pending
997		}
998	}
999}
1000
1001/// Error when registering a protocol.
1002#[derive(Debug, thiserror::Error)]
1003pub enum RegisterError {
1004	/// A protocol has been specified multiple times.
1005	#[error("{0}")]
1006	DuplicateProtocol(ProtocolName),
1007}
1008
1009/// Error when processing a request sent by a remote.
1010#[derive(Debug, thiserror::Error)]
1011pub enum ResponseFailure {
1012	/// Problem on the network.
1013	#[error("Problem on the network: {0}")]
1014	Network(InboundFailure),
1015}
1016
1017/// Implements the libp2p [`Codec`] trait. Defines how streams of bytes are turned
1018/// into requests and responses and vice-versa.
1019#[derive(Debug, Clone)]
1020#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
1021pub struct GenericCodec {
1022	max_request_size: u64,
1023	max_response_size: u64,
1024}
1025
1026#[async_trait::async_trait]
1027impl Codec for GenericCodec {
1028	type Protocol = ProtocolName;
1029	type Request = Vec<u8>;
1030	type Response = Result<Vec<u8>, ()>;
1031
1032	async fn read_request<T>(
1033		&mut self,
1034		_: &Self::Protocol,
1035		mut io: &mut T,
1036	) -> io::Result<Self::Request>
1037	where
1038		T: AsyncRead + Unpin + Send,
1039	{
1040		// Read the length.
1041		let length = unsigned_varint::aio::read_usize(&mut io)
1042			.await
1043			.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1044		if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1045			return Err(io::Error::new(
1046				io::ErrorKind::InvalidInput,
1047				format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1048			))
1049		}
1050
1051		// Read the payload.
1052		let mut buffer = vec![0; length];
1053		io.read_exact(&mut buffer).await?;
1054		Ok(buffer)
1055	}
1056
1057	async fn read_response<T>(
1058		&mut self,
1059		_: &Self::Protocol,
1060		mut io: &mut T,
1061	) -> io::Result<Self::Response>
1062	where
1063		T: AsyncRead + Unpin + Send,
1064	{
1065		// Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
1066		// considered as a protocol error and will result in the entire connection being closed.
1067		// Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1068		// that this response is an error.
1069
1070		// Read the length.
1071		let length = match unsigned_varint::aio::read_usize(&mut io).await {
1072			Ok(l) => l,
1073			Err(unsigned_varint::io::ReadError::Io(err))
1074				if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1075				return Ok(Err(())),
1076			Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1077		};
1078
1079		if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1080			return Err(io::Error::new(
1081				io::ErrorKind::InvalidInput,
1082				format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1083			))
1084		}
1085
1086		// Read the payload.
1087		let mut buffer = vec![0; length];
1088		io.read_exact(&mut buffer).await?;
1089		Ok(Ok(buffer))
1090	}
1091
1092	async fn write_request<T>(
1093		&mut self,
1094		_: &Self::Protocol,
1095		io: &mut T,
1096		req: Self::Request,
1097	) -> io::Result<()>
1098	where
1099		T: AsyncWrite + Unpin + Send,
1100	{
1101		// TODO: check the length?
1102		// Write the length.
1103		{
1104			let mut buffer = unsigned_varint::encode::usize_buffer();
1105			io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1106		}
1107
1108		// Write the payload.
1109		io.write_all(&req).await?;
1110
1111		io.close().await?;
1112		Ok(())
1113	}
1114
1115	async fn write_response<T>(
1116		&mut self,
1117		_: &Self::Protocol,
1118		io: &mut T,
1119		res: Self::Response,
1120	) -> io::Result<()>
1121	where
1122		T: AsyncWrite + Unpin + Send,
1123	{
1124		// If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1125		if let Ok(res) = res {
1126			// TODO: check the length?
1127			// Write the length.
1128			{
1129				let mut buffer = unsigned_varint::encode::usize_buffer();
1130				io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1131			}
1132
1133			// Write the payload.
1134			io.write_all(&res).await?;
1135		}
1136
1137		io.close().await?;
1138		Ok(())
1139	}
1140}
1141
1142#[cfg(test)]
1143mod tests {
1144	use super::*;
1145
1146	use crate::mock::MockPeerStore;
1147	use assert_matches::assert_matches;
1148	use futures::{channel::oneshot, executor::LocalPool, task::Spawn};
1149	use libp2p::{
1150		core::{
1151			transport::{MemoryTransport, Transport},
1152			upgrade,
1153		},
1154		identity::Keypair,
1155		noise,
1156		swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1157		Multiaddr,
1158	};
1159	use std::{iter, time::Duration};
1160
1161	struct TokioExecutor(tokio::runtime::Runtime);
1162	impl Executor for TokioExecutor {
1163		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1164			let _ = self.0.spawn(f);
1165		}
1166	}
1167
1168	fn build_swarm(
1169		list: impl Iterator<Item = ProtocolConfig>,
1170	) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1171		let keypair = Keypair::generate_ed25519();
1172
1173		let transport = MemoryTransport::new()
1174			.upgrade(upgrade::Version::V1)
1175			.authenticate(noise::Config::new(&keypair).unwrap())
1176			.multiplex(libp2p::yamux::Config::default())
1177			.boxed();
1178
1179		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1180
1181		let runtime = tokio::runtime::Runtime::new().unwrap();
1182
1183		let mut swarm = Swarm::new(
1184			transport,
1185			behaviour,
1186			keypair.public().to_peer_id(),
1187			SwarmConfig::with_executor(TokioExecutor(runtime)),
1188		);
1189
1190		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1191
1192		swarm.listen_on(listen_addr.clone()).unwrap();
1193
1194		(swarm, listen_addr)
1195	}
1196
1197	#[test]
1198	fn basic_request_response_works() {
1199		let protocol_name = ProtocolName::from("/test/req-resp/1");
1200		let mut pool = LocalPool::new();
1201
1202		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1203		let mut swarms = (0..2)
1204			.map(|_| {
1205				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1206
1207				pool.spawner()
1208					.spawn_obj(
1209						async move {
1210							while let Some(rq) = rx.next().await {
1211								let (fb_tx, fb_rx) = oneshot::channel();
1212								assert_eq!(rq.payload, b"this is a request");
1213								let _ = rq.pending_response.send(super::OutgoingResponse {
1214									result: Ok(b"this is a response".to_vec()),
1215									reputation_changes: Vec::new(),
1216									sent_feedback: Some(fb_tx),
1217								});
1218								fb_rx.await.unwrap();
1219							}
1220						}
1221						.boxed()
1222						.into(),
1223					)
1224					.unwrap();
1225
1226				let protocol_config = ProtocolConfig {
1227					name: protocol_name.clone(),
1228					fallback_names: Vec::new(),
1229					max_request_size: 1024,
1230					max_response_size: 1024 * 1024,
1231					request_timeout: Duration::from_secs(30),
1232					inbound_queue: Some(tx),
1233				};
1234
1235				build_swarm(iter::once(protocol_config))
1236			})
1237			.collect::<Vec<_>>();
1238
1239		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1240		// this test, so they wouldn't connect to each other.
1241		{
1242			let dial_addr = swarms[1].1.clone();
1243			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1244		}
1245
1246		let (mut swarm, _) = swarms.remove(0);
1247		// Running `swarm[0]` in the background.
1248		pool.spawner()
1249			.spawn_obj({
1250				async move {
1251					loop {
1252						match swarm.select_next_some().await {
1253							SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1254								result.unwrap();
1255							},
1256							_ => {},
1257						}
1258					}
1259				}
1260				.boxed()
1261				.into()
1262			})
1263			.unwrap();
1264
1265		// Remove and run the remaining swarm.
1266		let (mut swarm, _) = swarms.remove(0);
1267		pool.run_until(async move {
1268			let mut response_receiver = None;
1269
1270			loop {
1271				match swarm.select_next_some().await {
1272					SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1273						let (sender, receiver) = oneshot::channel();
1274						swarm.behaviour_mut().send_request(
1275							&peer_id,
1276							protocol_name.clone(),
1277							b"this is a request".to_vec(),
1278							None,
1279							sender,
1280							IfDisconnected::ImmediateError,
1281						);
1282						assert!(response_receiver.is_none());
1283						response_receiver = Some(receiver);
1284					},
1285					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1286						result.unwrap();
1287						break
1288					},
1289					_ => {},
1290				}
1291			}
1292
1293			assert_eq!(
1294				response_receiver.unwrap().await.unwrap().unwrap(),
1295				(b"this is a response".to_vec(), protocol_name)
1296			);
1297		});
1298	}
1299
1300	#[test]
1301	fn max_response_size_exceeded() {
1302		let protocol_name = ProtocolName::from("/test/req-resp/1");
1303		let mut pool = LocalPool::new();
1304
1305		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1306		let mut swarms = (0..2)
1307			.map(|_| {
1308				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1309
1310				pool.spawner()
1311					.spawn_obj(
1312						async move {
1313							while let Some(rq) = rx.next().await {
1314								assert_eq!(rq.payload, b"this is a request");
1315								let _ = rq.pending_response.send(super::OutgoingResponse {
1316									result: Ok(b"this response exceeds the limit".to_vec()),
1317									reputation_changes: Vec::new(),
1318									sent_feedback: None,
1319								});
1320							}
1321						}
1322						.boxed()
1323						.into(),
1324					)
1325					.unwrap();
1326
1327				let protocol_config = ProtocolConfig {
1328					name: protocol_name.clone(),
1329					fallback_names: Vec::new(),
1330					max_request_size: 1024,
1331					max_response_size: 8, // <-- important for the test
1332					request_timeout: Duration::from_secs(30),
1333					inbound_queue: Some(tx),
1334				};
1335
1336				build_swarm(iter::once(protocol_config))
1337			})
1338			.collect::<Vec<_>>();
1339
1340		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1341		// this test, so they wouldn't connect to each other.
1342		{
1343			let dial_addr = swarms[1].1.clone();
1344			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1345		}
1346
1347		// Running `swarm[0]` in the background until a `InboundRequest` event happens,
1348		// which is a hint about the test having ended.
1349		let (mut swarm, _) = swarms.remove(0);
1350		pool.spawner()
1351			.spawn_obj({
1352				async move {
1353					loop {
1354						match swarm.select_next_some().await {
1355							SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1356								assert!(result.is_ok());
1357								break
1358							},
1359							_ => {},
1360						}
1361					}
1362				}
1363				.boxed()
1364				.into()
1365			})
1366			.unwrap();
1367
1368		// Remove and run the remaining swarm.
1369		let (mut swarm, _) = swarms.remove(0);
1370		pool.run_until(async move {
1371			let mut response_receiver = None;
1372
1373			loop {
1374				match swarm.select_next_some().await {
1375					SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1376						let (sender, receiver) = oneshot::channel();
1377						swarm.behaviour_mut().send_request(
1378							&peer_id,
1379							protocol_name.clone(),
1380							b"this is a request".to_vec(),
1381							None,
1382							sender,
1383							IfDisconnected::ImmediateError,
1384						);
1385						assert!(response_receiver.is_none());
1386						response_receiver = Some(receiver);
1387					},
1388					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1389						assert!(result.is_err());
1390						break
1391					},
1392					_ => {},
1393				}
1394			}
1395
1396			match response_receiver.unwrap().await.unwrap().unwrap_err() {
1397				RequestFailure::Network(OutboundFailure::ConnectionClosed) => {},
1398				_ => panic!(),
1399			}
1400		});
1401	}
1402
1403	/// A [`RequestId`] is a unique identifier among either all inbound or all outbound requests for
1404	/// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across
1405	/// multiple [`RequestResponsesBehaviour`] behaviours. Thus when handling [`RequestId`] in the
1406	/// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the
1407	/// protocol name with the [`RequestId`] to get a unique request identifier.
1408	///
1409	/// This test ensures that two requests on different protocols can be handled concurrently
1410	/// without a [`RequestId`] collision.
1411	///
1412	/// See [`ProtocolRequestId`] for additional information.
1413	#[test]
1414	fn request_id_collision() {
1415		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1416		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1417		let mut pool = LocalPool::new();
1418
1419		let mut swarm_1 = {
1420			let protocol_configs = vec![
1421				ProtocolConfig {
1422					name: protocol_name_1.clone(),
1423					fallback_names: Vec::new(),
1424					max_request_size: 1024,
1425					max_response_size: 1024 * 1024,
1426					request_timeout: Duration::from_secs(30),
1427					inbound_queue: None,
1428				},
1429				ProtocolConfig {
1430					name: protocol_name_2.clone(),
1431					fallback_names: Vec::new(),
1432					max_request_size: 1024,
1433					max_response_size: 1024 * 1024,
1434					request_timeout: Duration::from_secs(30),
1435					inbound_queue: None,
1436				},
1437			];
1438
1439			build_swarm(protocol_configs.into_iter()).0
1440		};
1441
1442		let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1443			let (tx_1, rx_1) = async_channel::bounded(64);
1444			let (tx_2, rx_2) = async_channel::bounded(64);
1445
1446			let protocol_configs = vec![
1447				ProtocolConfig {
1448					name: protocol_name_1.clone(),
1449					fallback_names: Vec::new(),
1450					max_request_size: 1024,
1451					max_response_size: 1024 * 1024,
1452					request_timeout: Duration::from_secs(30),
1453					inbound_queue: Some(tx_1),
1454				},
1455				ProtocolConfig {
1456					name: protocol_name_2.clone(),
1457					fallback_names: Vec::new(),
1458					max_request_size: 1024,
1459					max_response_size: 1024 * 1024,
1460					request_timeout: Duration::from_secs(30),
1461					inbound_queue: Some(tx_2),
1462				},
1463			];
1464
1465			let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1466
1467			(swarm, rx_1, rx_2, listen_addr)
1468		};
1469
1470		// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
1471		// so they wouldn't connect to each other.
1472		swarm_1.dial(listen_add_2).unwrap();
1473
1474		// Run swarm 2 in the background, receiving two requests.
1475		pool.spawner()
1476			.spawn_obj(
1477				async move {
1478					loop {
1479						match swarm_2.select_next_some().await {
1480							SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1481								result.unwrap();
1482							},
1483							_ => {},
1484						}
1485					}
1486				}
1487				.boxed()
1488				.into(),
1489			)
1490			.unwrap();
1491
1492		// Handle both requests sent by swarm 1 to swarm 2 in the background.
1493		//
1494		// Make sure both requests overlap, by answering the first only after receiving the
1495		// second.
1496		pool.spawner()
1497			.spawn_obj(
1498				async move {
1499					let protocol_1_request = swarm_2_handler_1.next().await;
1500					let protocol_2_request = swarm_2_handler_2.next().await;
1501
1502					protocol_1_request
1503						.unwrap()
1504						.pending_response
1505						.send(OutgoingResponse {
1506							result: Ok(b"this is a response".to_vec()),
1507							reputation_changes: Vec::new(),
1508							sent_feedback: None,
1509						})
1510						.unwrap();
1511					protocol_2_request
1512						.unwrap()
1513						.pending_response
1514						.send(OutgoingResponse {
1515							result: Ok(b"this is a response".to_vec()),
1516							reputation_changes: Vec::new(),
1517							sent_feedback: None,
1518						})
1519						.unwrap();
1520				}
1521				.boxed()
1522				.into(),
1523			)
1524			.unwrap();
1525
1526		// Have swarm 1 send two requests to swarm 2 and await responses.
1527		pool.run_until(async move {
1528			let mut response_receivers = None;
1529			let mut num_responses = 0;
1530
1531			loop {
1532				match swarm_1.select_next_some().await {
1533					SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1534						let (sender_1, receiver_1) = oneshot::channel();
1535						let (sender_2, receiver_2) = oneshot::channel();
1536						swarm_1.behaviour_mut().send_request(
1537							&peer_id,
1538							protocol_name_1.clone(),
1539							b"this is a request".to_vec(),
1540							None,
1541							sender_1,
1542							IfDisconnected::ImmediateError,
1543						);
1544						swarm_1.behaviour_mut().send_request(
1545							&peer_id,
1546							protocol_name_2.clone(),
1547							b"this is a request".to_vec(),
1548							None,
1549							sender_2,
1550							IfDisconnected::ImmediateError,
1551						);
1552						assert!(response_receivers.is_none());
1553						response_receivers = Some((receiver_1, receiver_2));
1554					},
1555					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1556						num_responses += 1;
1557						result.unwrap();
1558						if num_responses == 2 {
1559							break
1560						}
1561					},
1562					_ => {},
1563				}
1564			}
1565			let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1566			assert_eq!(
1567				response_receiver_1.await.unwrap().unwrap(),
1568				(b"this is a response".to_vec(), protocol_name_1)
1569			);
1570			assert_eq!(
1571				response_receiver_2.await.unwrap().unwrap(),
1572				(b"this is a response".to_vec(), protocol_name_2)
1573			);
1574		});
1575	}
1576
1577	#[test]
1578	fn request_fallback() {
1579		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1580		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1581		let protocol_name_2 = ProtocolName::from("/test/another");
1582		let mut pool = LocalPool::new();
1583
1584		let protocol_config_1 = ProtocolConfig {
1585			name: protocol_name_1.clone(),
1586			fallback_names: Vec::new(),
1587			max_request_size: 1024,
1588			max_response_size: 1024 * 1024,
1589			request_timeout: Duration::from_secs(30),
1590			inbound_queue: None,
1591		};
1592		let protocol_config_1_fallback = ProtocolConfig {
1593			name: protocol_name_1_fallback.clone(),
1594			fallback_names: Vec::new(),
1595			max_request_size: 1024,
1596			max_response_size: 1024 * 1024,
1597			request_timeout: Duration::from_secs(30),
1598			inbound_queue: None,
1599		};
1600		let protocol_config_2 = ProtocolConfig {
1601			name: protocol_name_2.clone(),
1602			fallback_names: Vec::new(),
1603			max_request_size: 1024,
1604			max_response_size: 1024 * 1024,
1605			request_timeout: Duration::from_secs(30),
1606			inbound_queue: None,
1607		};
1608
1609		// This swarm only speaks protocol_name_1_fallback and protocol_name_2.
1610		// It only responds to requests.
1611		let mut older_swarm = {
1612			let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1613			let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1614			let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1615			protocol_config_1_fallback.inbound_queue = Some(tx_1);
1616
1617			let mut protocol_config_2 = protocol_config_2.clone();
1618			protocol_config_2.inbound_queue = Some(tx_2);
1619
1620			pool.spawner()
1621				.spawn_obj(
1622					async move {
1623						for _ in 0..2 {
1624							if let Some(rq) = rx_1.next().await {
1625								let (fb_tx, fb_rx) = oneshot::channel();
1626								assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1627								let _ = rq.pending_response.send(super::OutgoingResponse {
1628									result: Ok(
1629										b"this is a response on protocol /test/req-resp/1".to_vec()
1630									),
1631									reputation_changes: Vec::new(),
1632									sent_feedback: Some(fb_tx),
1633								});
1634								fb_rx.await.unwrap();
1635							}
1636						}
1637
1638						if let Some(rq) = rx_2.next().await {
1639							let (fb_tx, fb_rx) = oneshot::channel();
1640							assert_eq!(rq.payload, b"request on protocol /test/other");
1641							let _ = rq.pending_response.send(super::OutgoingResponse {
1642								result: Ok(b"this is a response on protocol /test/other".to_vec()),
1643								reputation_changes: Vec::new(),
1644								sent_feedback: Some(fb_tx),
1645							});
1646							fb_rx.await.unwrap();
1647						}
1648					}
1649					.boxed()
1650					.into(),
1651				)
1652				.unwrap();
1653
1654			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1655		};
1656
1657		// This swarm speaks all protocols.
1658		let mut new_swarm = build_swarm(
1659			vec![
1660				protocol_config_1.clone(),
1661				protocol_config_1_fallback.clone(),
1662				protocol_config_2.clone(),
1663			]
1664			.into_iter(),
1665		);
1666
1667		{
1668			let dial_addr = older_swarm.1.clone();
1669			Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1670		}
1671
1672		// Running `older_swarm`` in the background.
1673		pool.spawner()
1674			.spawn_obj({
1675				async move {
1676					loop {
1677						_ = older_swarm.0.select_next_some().await;
1678					}
1679				}
1680				.boxed()
1681				.into()
1682			})
1683			.unwrap();
1684
1685		// Run the newer swarm. Attempt to make requests on all protocols.
1686		let (mut swarm, _) = new_swarm;
1687		let mut older_peer_id = None;
1688
1689		pool.run_until(async move {
1690			let mut response_receiver = None;
1691			// Try the new protocol with a fallback.
1692			loop {
1693				match swarm.select_next_some().await {
1694					SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1695						older_peer_id = Some(peer_id);
1696						let (sender, receiver) = oneshot::channel();
1697						swarm.behaviour_mut().send_request(
1698							&peer_id,
1699							protocol_name_1.clone(),
1700							b"request on protocol /test/req-resp/2".to_vec(),
1701							Some((
1702								b"request on protocol /test/req-resp/1".to_vec(),
1703								protocol_config_1_fallback.name.clone(),
1704							)),
1705							sender,
1706							IfDisconnected::ImmediateError,
1707						);
1708						response_receiver = Some(receiver);
1709					},
1710					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1711						result.unwrap();
1712						break
1713					},
1714					_ => {},
1715				}
1716			}
1717			assert_eq!(
1718				response_receiver.unwrap().await.unwrap().unwrap(),
1719				(
1720					b"this is a response on protocol /test/req-resp/1".to_vec(),
1721					protocol_name_1_fallback.clone()
1722				)
1723			);
1724			// Try the old protocol with a useless fallback.
1725			let (sender, response_receiver) = oneshot::channel();
1726			swarm.behaviour_mut().send_request(
1727				older_peer_id.as_ref().unwrap(),
1728				protocol_name_1_fallback.clone(),
1729				b"request on protocol /test/req-resp/1".to_vec(),
1730				Some((
1731					b"dummy request, will fail if processed".to_vec(),
1732					protocol_config_1_fallback.name.clone(),
1733				)),
1734				sender,
1735				IfDisconnected::ImmediateError,
1736			);
1737			loop {
1738				match swarm.select_next_some().await {
1739					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1740						result.unwrap();
1741						break
1742					},
1743					_ => {},
1744				}
1745			}
1746			assert_eq!(
1747				response_receiver.await.unwrap().unwrap(),
1748				(
1749					b"this is a response on protocol /test/req-resp/1".to_vec(),
1750					protocol_name_1_fallback.clone()
1751				)
1752			);
1753			// Try the new protocol with no fallback. Should fail.
1754			let (sender, response_receiver) = oneshot::channel();
1755			swarm.behaviour_mut().send_request(
1756				older_peer_id.as_ref().unwrap(),
1757				protocol_name_1.clone(),
1758				b"request on protocol /test/req-resp-2".to_vec(),
1759				None,
1760				sender,
1761				IfDisconnected::ImmediateError,
1762			);
1763			loop {
1764				match swarm.select_next_some().await {
1765					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1766						assert_matches!(
1767							result.unwrap_err(),
1768							RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1769						);
1770						break
1771					},
1772					_ => {},
1773				}
1774			}
1775			assert!(response_receiver.await.unwrap().is_err());
1776			// Try the other protocol with no fallback.
1777			let (sender, response_receiver) = oneshot::channel();
1778			swarm.behaviour_mut().send_request(
1779				older_peer_id.as_ref().unwrap(),
1780				protocol_name_2.clone(),
1781				b"request on protocol /test/other".to_vec(),
1782				None,
1783				sender,
1784				IfDisconnected::ImmediateError,
1785			);
1786			loop {
1787				match swarm.select_next_some().await {
1788					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1789						result.unwrap();
1790						break
1791					},
1792					_ => {},
1793				}
1794			}
1795			assert_eq!(
1796				response_receiver.await.unwrap().unwrap(),
1797				(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1798			);
1799		});
1800	}
1801}