1use crate::{
38	peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39	service::traits::RequestResponseConfig as RequestResponseConfigT,
40	types::ProtocolName,
41	ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46	core::{transport::PortUse, Endpoint, Multiaddr},
47	request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48	swarm::{
49		behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50		NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51	},
52	PeerId,
53};
54
55use std::{
56	collections::{hash_map::Entry, HashMap},
57	io, iter,
58	ops::Deref,
59	pin::Pin,
60	sync::Arc,
61	task::{Context, Poll},
62	time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77	#[error("Failed to dial the requested peer")]
79	DialFailure,
80	#[error("Timeout while waiting for a response")]
82	Timeout,
83	#[error("Connection was closed before a response was received")]
85	ConnectionClosed,
86	#[error("The remote supports none of the requested protocols")]
88	UnsupportedProtocols,
89	#[error("An IO failure happened on an outbound stream")]
91	Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95	fn from(out: request_response::OutboundFailure) -> Self {
96		match out {
97			request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98			request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99			request_response::OutboundFailure::ConnectionClosed =>
100				OutboundFailure::ConnectionClosed,
101			request_response::OutboundFailure::UnsupportedProtocols =>
102				OutboundFailure::UnsupportedProtocols,
103			request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
104		}
105	}
106}
107
108#[derive(Debug, thiserror::Error)]
111pub enum InboundFailure {
112	#[error("Timeout while receiving request or sending response")]
115	Timeout,
116	#[error("Connection was closed before a response could be sent")]
118	ConnectionClosed,
119	#[error("The local peer supports none of the protocols requested by the remote")]
121	UnsupportedProtocols,
122	#[error("The response channel was dropped without sending a response to the remote")]
124	ResponseOmission,
125	#[error("An IO failure happened on an inbound stream")]
127	Io(Arc<io::Error>),
128}
129
130impl From<request_response::InboundFailure> for InboundFailure {
131	fn from(out: request_response::InboundFailure) -> Self {
132		match out {
133			request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
134			request_response::InboundFailure::Timeout => InboundFailure::Timeout,
135			request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
136			request_response::InboundFailure::UnsupportedProtocols =>
137				InboundFailure::UnsupportedProtocols,
138			request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
139		}
140	}
141}
142
143#[derive(Debug, thiserror::Error)]
145#[allow(missing_docs)]
146pub enum RequestFailure {
147	#[error("We are not currently connected to the requested peer.")]
148	NotConnected,
149	#[error("Given protocol hasn't been registered.")]
150	UnknownProtocol,
151	#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
152	Refused,
153	#[error("The remote replied, but the local node is no longer interested in the response.")]
154	Obsolete,
155	#[error("Problem on the network: {0}")]
156	Network(OutboundFailure),
157}
158
159#[derive(Debug, Clone)]
161pub struct ProtocolConfig {
162	pub name: ProtocolName,
164
165	pub fallback_names: Vec<ProtocolName>,
167
168	pub max_request_size: u64,
173
174	pub max_response_size: u64,
179
180	pub request_timeout: Duration,
184
185	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
206}
207
208impl RequestResponseConfigT for ProtocolConfig {
209	fn protocol_name(&self) -> &ProtocolName {
210		&self.name
211	}
212}
213
214#[derive(Debug)]
216pub struct IncomingRequest {
217	pub peer: sc_network_types::PeerId,
219
220	pub payload: Vec<u8>,
223
224	pub pending_response: oneshot::Sender<OutgoingResponse>,
233}
234
235#[derive(Debug)]
237pub struct OutgoingResponse {
238	pub result: Result<Vec<u8>, ()>,
242
243	pub reputation_changes: Vec<ReputationChange>,
246
247	pub sent_feedback: Option<oneshot::Sender<()>>,
256}
257
258struct PendingRequest {
260	started_at: Instant,
262	response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
267	fallback_request: Option<(Vec<u8>, ProtocolName)>,
269}
270
271#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
273pub enum IfDisconnected {
274	TryConnect,
276	ImmediateError,
278}
279
280impl IfDisconnected {
282	pub fn should_connect(self) -> bool {
284		match self {
285			Self::TryConnect => true,
286			Self::ImmediateError => false,
287		}
288	}
289}
290
291#[derive(Debug)]
293pub enum Event {
294	InboundRequest {
298		peer: PeerId,
300		protocol: ProtocolName,
302		result: Result<Duration, ResponseFailure>,
307	},
308
309	RequestFinished {
314		peer: PeerId,
316		protocol: ProtocolName,
318		duration: Duration,
320		result: Result<(), RequestFailure>,
322	},
323
324	ReputationChanges {
326		peer: PeerId,
328		changes: Vec<ReputationChange>,
330	},
331}
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
340struct ProtocolRequestId<RequestId> {
341	protocol: ProtocolName,
342	request_id: RequestId,
343}
344
345impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
346	fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
347		Self { protocol, request_id }
348	}
349}
350
351struct ProtocolDetails {
353	behaviour: Behaviour<GenericCodec>,
354	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
355	request_timeout: Duration,
356}
357
358pub struct RequestResponsesBehaviour {
360	protocols: HashMap<ProtocolName, ProtocolDetails>,
365
366	pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
368
369	pending_responses: stream::FuturesUnordered<
372		Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
373	>,
374
375	pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
377
378	send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
381
382	peer_store: Arc<dyn PeerStoreProvider>,
384
385	periodic_request_check: tokio::time::Interval,
392}
393
394struct RequestProcessingOutcome {
396	peer: PeerId,
397	request_id: InboundRequestId,
398	protocol: ProtocolName,
399	inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
400	response: OutgoingResponse,
401}
402
403impl RequestResponsesBehaviour {
404	pub fn new(
407		list: impl Iterator<Item = ProtocolConfig>,
408		peer_store: Arc<dyn PeerStoreProvider>,
409	) -> Result<Self, RegisterError> {
410		let mut protocols = HashMap::new();
411		for protocol in list {
412			let cfg = Config::default().with_request_timeout(protocol.request_timeout);
413
414			let protocol_support = if protocol.inbound_queue.is_some() {
415				ProtocolSupport::Full
416			} else {
417				ProtocolSupport::Outbound
418			};
419
420			let behaviour = Behaviour::with_codec(
421				GenericCodec {
422					max_request_size: protocol.max_request_size,
423					max_response_size: protocol.max_response_size,
424				},
425				iter::once(protocol.name.clone())
426					.chain(protocol.fallback_names)
427					.zip(iter::repeat(protocol_support)),
428				cfg,
429			);
430
431			match protocols.entry(protocol.name) {
432				Entry::Vacant(e) => e.insert(ProtocolDetails {
433					behaviour,
434					inbound_queue: protocol.inbound_queue,
435					request_timeout: protocol.request_timeout,
436				}),
437				Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
438			};
439		}
440
441		Ok(Self {
442			protocols,
443			pending_requests: Default::default(),
444			pending_responses: Default::default(),
445			pending_responses_arrival_time: Default::default(),
446			send_feedback: Default::default(),
447			peer_store,
448			periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
449		})
450	}
451
452	pub fn send_request(
459		&mut self,
460		target: &PeerId,
461		protocol_name: ProtocolName,
462		request: Vec<u8>,
463		fallback_request: Option<(Vec<u8>, ProtocolName)>,
464		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
465		connect: IfDisconnected,
466	) {
467		log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
468
469		if let Some(ProtocolDetails { behaviour, .. }) =
470			self.protocols.get_mut(protocol_name.deref())
471		{
472			Self::send_request_inner(
473				behaviour,
474				&mut self.pending_requests,
475				target,
476				protocol_name,
477				request,
478				fallback_request,
479				pending_response,
480				connect,
481			)
482		} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
483			log::debug!(
484				target: LOG_TARGET,
485				"Unknown protocol {:?}. At the same time local \
486				 node is no longer interested in the result.",
487				protocol_name,
488			);
489		}
490	}
491
492	fn send_request_inner(
493		behaviour: &mut Behaviour<GenericCodec>,
494		pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
495		target: &PeerId,
496		protocol_name: ProtocolName,
497		request: Vec<u8>,
498		fallback_request: Option<(Vec<u8>, ProtocolName)>,
499		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
500		connect: IfDisconnected,
501	) {
502		if behaviour.is_connected(target) || connect.should_connect() {
503			let request_id = behaviour.send_request(target, request);
504			let prev_req_id = pending_requests.insert(
505				(protocol_name.to_string().into(), request_id).into(),
506				PendingRequest {
507					started_at: Instant::now(),
508					response_tx: Some(pending_response),
509					fallback_request,
510				},
511			);
512			debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
513		} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
514			log::debug!(
515				target: LOG_TARGET,
516				"Not connected to peer {:?}. At the same time local \
517				 node is no longer interested in the result.",
518				target,
519			);
520		}
521	}
522}
523
524impl NetworkBehaviour for RequestResponsesBehaviour {
525	type ConnectionHandler =
526		MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
527	type ToSwarm = Event;
528
529	fn handle_pending_inbound_connection(
530		&mut self,
531		_connection_id: ConnectionId,
532		_local_addr: &Multiaddr,
533		_remote_addr: &Multiaddr,
534	) -> Result<(), ConnectionDenied> {
535		Ok(())
536	}
537
538	fn handle_pending_outbound_connection(
539		&mut self,
540		_connection_id: ConnectionId,
541		_maybe_peer: Option<PeerId>,
542		_addresses: &[Multiaddr],
543		_effective_role: Endpoint,
544	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
545		Ok(Vec::new())
546	}
547
548	fn handle_established_inbound_connection(
549		&mut self,
550		connection_id: ConnectionId,
551		peer: PeerId,
552		local_addr: &Multiaddr,
553		remote_addr: &Multiaddr,
554	) -> Result<THandler<Self>, ConnectionDenied> {
555		let iter =
556			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
557				if let Ok(handler) = behaviour.handle_established_inbound_connection(
558					connection_id,
559					peer,
560					local_addr,
561					remote_addr,
562				) {
563					Some((p.to_string(), handler))
564				} else {
565					None
566				}
567			});
568
569		Ok(MultiHandler::try_from_iter(iter).expect(
570			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
571			 which is the only possible error; qed",
572		))
573	}
574
575	fn handle_established_outbound_connection(
576		&mut self,
577		connection_id: ConnectionId,
578		peer: PeerId,
579		addr: &Multiaddr,
580		role_override: Endpoint,
581		port_use: PortUse,
582	) -> Result<THandler<Self>, ConnectionDenied> {
583		let iter =
584			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
585				if let Ok(handler) = behaviour.handle_established_outbound_connection(
586					connection_id,
587					peer,
588					addr,
589					role_override,
590					port_use,
591				) {
592					Some((p.to_string(), handler))
593				} else {
594					None
595				}
596			});
597
598		Ok(MultiHandler::try_from_iter(iter).expect(
599			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
600			 which is the only possible error; qed",
601		))
602	}
603
604	fn on_swarm_event(&mut self, event: FromSwarm) {
605		for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
606			behaviour.on_swarm_event(event);
607		}
608	}
609
610	fn on_connection_handler_event(
611		&mut self,
612		peer_id: PeerId,
613		connection_id: ConnectionId,
614		event: THandlerOutEvent<Self>,
615	) {
616		let p_name = event.0;
617		if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
618			return behaviour.on_connection_handler_event(peer_id, connection_id, event.1)
619		} else {
620			log::warn!(
621				target: LOG_TARGET,
622				"on_connection_handler_event: no request-response instance registered for protocol {:?}",
623				p_name
624			);
625		}
626	}
627
628	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
629		'poll_all: loop {
630			if self.periodic_request_check.poll_tick(cx).is_ready() {
632				self.pending_requests.retain(|id, req| {
633					let Some(ProtocolDetails { request_timeout, .. }) =
634						self.protocols.get(&id.protocol)
635					else {
636						log::warn!(
637							target: LOG_TARGET,
638							"Request {id:?} has no protocol registered.",
639						);
640
641						if let Some(response_tx) = req.response_tx.take() {
642							if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
643								log::debug!(
644									target: LOG_TARGET,
645									"Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
646								);
647							}
648						}
649						return false
650					};
651
652					let elapsed = req.started_at.elapsed();
653					if elapsed > *request_timeout {
654						log::debug!(
655							target: LOG_TARGET,
656							"Request {id:?} force detected as timeout.",
657						);
658
659						if let Some(response_tx) = req.response_tx.take() {
660							if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
661								log::debug!(
662									target: LOG_TARGET,
663									"Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
664								);
665							}
666						}
667
668						false
669					} else {
670						true
671					}
672				});
673			}
674
675			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
677				let RequestProcessingOutcome {
678					peer,
679					request_id,
680					protocol: protocol_name,
681					inner_channel,
682					response: OutgoingResponse { result, reputation_changes, sent_feedback },
683				} = match outcome {
684					Some(outcome) => outcome,
685					None => continue,
688				};
689
690				if let Ok(payload) = result {
691					if let Some(ProtocolDetails { behaviour, .. }) =
692						self.protocols.get_mut(&*protocol_name)
693					{
694						log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
695
696						if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
697							log::debug!(
700								target: LOG_TARGET,
701								"Failed to send response for {:?} on protocol {:?} due to a \
702								 timeout or due to the connection to the peer being closed. \
703								 Dropping response",
704								request_id, protocol_name,
705							);
706						} else if let Some(sent_feedback) = sent_feedback {
707							self.send_feedback
708								.insert((protocol_name, request_id).into(), sent_feedback);
709						}
710					}
711				}
712
713				if !reputation_changes.is_empty() {
714					return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
715						peer,
716						changes: reputation_changes,
717					}))
718				}
719			}
720
721			let mut fallback_requests = vec![];
722
723			for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
725			{
726				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
727					let ev = match ev {
728						ToSwarm::GenerateEvent(ev) => ev,
730
731						ToSwarm::Dial { opts } => {
734							if opts.get_peer_id().is_none() {
735								log::error!(
736									target: LOG_TARGET,
737									"The request-response isn't supposed to start dialing addresses"
738								);
739							}
740							return Poll::Ready(ToSwarm::Dial { opts })
741						},
742						event => {
743							return Poll::Ready(
744								event.map_in(|event| ((*protocol).to_string(), event)).map_out(
745									|_| {
746										unreachable!(
747											"`GenerateEvent` is handled in a branch above; qed"
748										)
749									},
750								),
751							);
752						},
753					};
754
755					match ev {
756						request_response::Event::Message {
758							peer,
759							message: Message::Request { request_id, request, channel, .. },
760						} => {
761							self.pending_responses_arrival_time
762								.insert((protocol.clone(), request_id).into(), Instant::now());
763
764							let reputation = self.peer_store.peer_reputation(&peer.into());
765
766							if reputation < BANNED_THRESHOLD {
767								log::debug!(
768									target: LOG_TARGET,
769									"Cannot handle requests from a node with a low reputation {}: {}",
770									peer,
771									reputation,
772								);
773								continue 'poll_protocol
774							}
775
776							let (tx, rx) = oneshot::channel();
777
778							if let Some(resp_builder) = inbound_queue {
781								let _ = resp_builder.try_send(IncomingRequest {
788									peer: peer.into(),
789									payload: request,
790									pending_response: tx,
791								});
792							} else {
793								debug_assert!(false, "Received message on outbound-only protocol.");
794							}
795
796							let protocol = protocol.clone();
797
798							self.pending_responses.push(Box::pin(async move {
799								rx.await.map_or(None, |response| {
803									Some(RequestProcessingOutcome {
804										peer,
805										request_id,
806										protocol,
807										inner_channel: channel,
808										response,
809									})
810								})
811							}));
812
813							continue 'poll_all
816						},
817
818						request_response::Event::Message {
820							peer,
821							message: Message::Response { request_id, response },
822							..
823						} => {
824							let (started, delivered) = match self
825								.pending_requests
826								.remove(&(protocol.clone(), request_id).into())
827							{
828								Some(PendingRequest {
829									started_at,
830									response_tx: Some(response_tx),
831									..
832								}) => {
833									log::trace!(
834										target: LOG_TARGET,
835										"received response from {peer} ({protocol:?}), {} bytes",
836										response.as_ref().map_or(0usize, |response| response.len()),
837									);
838
839									let delivered = response_tx
840										.send(
841											response
842												.map_err(|()| RequestFailure::Refused)
843												.map(|resp| (resp, protocol.clone())),
844										)
845										.map_err(|_| RequestFailure::Obsolete);
846									(started_at, delivered)
847								},
848								_ => {
849									log::debug!(
850										target: LOG_TARGET,
851										"Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
852										request_id,
853										peer,
854									);
855									continue
856								},
857							};
858
859							let out = Event::RequestFinished {
860								peer,
861								protocol: protocol.clone(),
862								duration: started.elapsed(),
863								result: delivered,
864							};
865
866							return Poll::Ready(ToSwarm::GenerateEvent(out))
867						},
868
869						request_response::Event::OutboundFailure {
871							peer,
872							request_id,
873							error,
874							..
875						} => {
876							let error = OutboundFailure::from(error);
877							let started = match self
878								.pending_requests
879								.remove(&(protocol.clone(), request_id).into())
880							{
881								Some(PendingRequest {
882									started_at,
883									response_tx: Some(response_tx),
884									fallback_request,
885								}) => {
886									if matches!(error, OutboundFailure::UnsupportedProtocols) {
889										if let Some((fallback_request, fallback_protocol)) =
890											fallback_request
891										{
892											log::trace!(
893												target: LOG_TARGET,
894												"Request with id {:?} failed. Trying the fallback protocol. {}",
895												request_id,
896												fallback_protocol.deref()
897											);
898											fallback_requests.push((
899												peer,
900												fallback_protocol,
901												fallback_request,
902												response_tx,
903											));
904											continue
905										}
906									}
907
908									if response_tx
909										.send(Err(RequestFailure::Network(error.clone())))
910										.is_err()
911									{
912										log::debug!(
913											target: LOG_TARGET,
914											"Request with id {:?} failed. At the same time local \
915											 node is no longer interested in the result.",
916											request_id,
917										);
918									}
919									started_at
920								},
921								_ => {
922									log::debug!(
923										target: LOG_TARGET,
924										"Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
925										request_id,
926										error,
927										peer
928									);
929									continue
930								},
931							};
932
933							let out = Event::RequestFinished {
934								peer,
935								protocol: protocol.clone(),
936								duration: started.elapsed(),
937								result: Err(RequestFailure::Network(error)),
938							};
939
940							return Poll::Ready(ToSwarm::GenerateEvent(out))
941						},
942
943						request_response::Event::InboundFailure {
946							request_id, peer, error, ..
947						} => {
948							self.pending_responses_arrival_time
949								.remove(&(protocol.clone(), request_id).into());
950							self.send_feedback.remove(&(protocol.clone(), request_id).into());
951							let out = Event::InboundRequest {
952								peer,
953								protocol: protocol.clone(),
954								result: Err(ResponseFailure::Network(error.into())),
955							};
956							return Poll::Ready(ToSwarm::GenerateEvent(out))
957						},
958
959						request_response::Event::ResponseSent { request_id, peer } => {
961							let arrival_time = self
962								.pending_responses_arrival_time
963								.remove(&(protocol.clone(), request_id).into())
964								.map(|t| t.elapsed())
965								.expect(
966									"Time is added for each inbound request on arrival and only \
967									 removed on success (`ResponseSent`) or failure \
968									 (`InboundFailure`). One can not receive a success event for a \
969									 request that either never arrived, or that has previously \
970									 failed; qed.",
971								);
972
973							if let Some(send_feedback) =
974								self.send_feedback.remove(&(protocol.clone(), request_id).into())
975							{
976								let _ = send_feedback.send(());
977							}
978
979							let out = Event::InboundRequest {
980								peer,
981								protocol: protocol.clone(),
982								result: Ok(arrival_time),
983							};
984
985							return Poll::Ready(ToSwarm::GenerateEvent(out))
986						},
987					};
988				}
989			}
990
991			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
993				if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
994					Self::send_request_inner(
995						behaviour,
996						&mut self.pending_requests,
997						&peer,
998						protocol,
999						request,
1000						None,
1001						pending_response,
1002						IfDisconnected::ImmediateError,
1006					);
1007				}
1008			}
1009
1010			break Poll::Pending
1011		}
1012	}
1013}
1014
1015#[derive(Debug, thiserror::Error)]
1017pub enum RegisterError {
1018	#[error("{0}")]
1020	DuplicateProtocol(ProtocolName),
1021}
1022
1023#[derive(Debug, thiserror::Error)]
1025pub enum ResponseFailure {
1026	#[error("Problem on the network: {0}")]
1028	Network(InboundFailure),
1029}
1030
1031#[derive(Debug, Clone)]
1034#[doc(hidden)] pub struct GenericCodec {
1036	max_request_size: u64,
1037	max_response_size: u64,
1038}
1039
1040#[async_trait::async_trait]
1041impl Codec for GenericCodec {
1042	type Protocol = ProtocolName;
1043	type Request = Vec<u8>;
1044	type Response = Result<Vec<u8>, ()>;
1045
1046	async fn read_request<T>(
1047		&mut self,
1048		_: &Self::Protocol,
1049		mut io: &mut T,
1050	) -> io::Result<Self::Request>
1051	where
1052		T: AsyncRead + Unpin + Send,
1053	{
1054		let length = unsigned_varint::aio::read_usize(&mut io)
1056			.await
1057			.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1058		if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1059			return Err(io::Error::new(
1060				io::ErrorKind::InvalidInput,
1061				format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1062			))
1063		}
1064
1065		let mut buffer = vec![0; length];
1067		io.read_exact(&mut buffer).await?;
1068		Ok(buffer)
1069	}
1070
1071	async fn read_response<T>(
1072		&mut self,
1073		_: &Self::Protocol,
1074		mut io: &mut T,
1075	) -> io::Result<Self::Response>
1076	where
1077		T: AsyncRead + Unpin + Send,
1078	{
1079		let length = match unsigned_varint::aio::read_usize(&mut io).await {
1086			Ok(l) => l,
1087			Err(unsigned_varint::io::ReadError::Io(err))
1088				if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1089				return Ok(Err(())),
1090			Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1091		};
1092
1093		if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1094			return Err(io::Error::new(
1095				io::ErrorKind::InvalidInput,
1096				format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1097			))
1098		}
1099
1100		let mut buffer = vec![0; length];
1102		io.read_exact(&mut buffer).await?;
1103		Ok(Ok(buffer))
1104	}
1105
1106	async fn write_request<T>(
1107		&mut self,
1108		_: &Self::Protocol,
1109		io: &mut T,
1110		req: Self::Request,
1111	) -> io::Result<()>
1112	where
1113		T: AsyncWrite + Unpin + Send,
1114	{
1115		{
1118			let mut buffer = unsigned_varint::encode::usize_buffer();
1119			io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1120		}
1121
1122		io.write_all(&req).await?;
1124
1125		io.close().await?;
1126		Ok(())
1127	}
1128
1129	async fn write_response<T>(
1130		&mut self,
1131		_: &Self::Protocol,
1132		io: &mut T,
1133		res: Self::Response,
1134	) -> io::Result<()>
1135	where
1136		T: AsyncWrite + Unpin + Send,
1137	{
1138		if let Ok(res) = res {
1140			{
1143				let mut buffer = unsigned_varint::encode::usize_buffer();
1144				io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1145			}
1146
1147			io.write_all(&res).await?;
1149		}
1150
1151		io.close().await?;
1152		Ok(())
1153	}
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158	use super::*;
1159
1160	use crate::mock::MockPeerStore;
1161	use assert_matches::assert_matches;
1162	use futures::channel::oneshot;
1163	use libp2p::{
1164		core::{
1165			transport::{MemoryTransport, Transport},
1166			upgrade,
1167		},
1168		identity::Keypair,
1169		noise,
1170		swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1171		Multiaddr,
1172	};
1173	use std::{iter, time::Duration};
1174
1175	struct TokioExecutor;
1176	impl Executor for TokioExecutor {
1177		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1178			tokio::spawn(f);
1179		}
1180	}
1181
1182	fn build_swarm(
1183		list: impl Iterator<Item = ProtocolConfig>,
1184	) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1185		let keypair = Keypair::generate_ed25519();
1186
1187		let transport = MemoryTransport::new()
1188			.upgrade(upgrade::Version::V1)
1189			.authenticate(noise::Config::new(&keypair).unwrap())
1190			.multiplex(libp2p::yamux::Config::default())
1191			.boxed();
1192
1193		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1194
1195		let mut swarm = Swarm::new(
1196			transport,
1197			behaviour,
1198			keypair.public().to_peer_id(),
1199			SwarmConfig::with_executor(TokioExecutor {})
1200				.with_idle_connection_timeout(Duration::from_secs(10)),
1203		);
1204
1205		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1206
1207		swarm.listen_on(listen_addr.clone()).unwrap();
1208
1209		(swarm, listen_addr)
1210	}
1211
1212	#[tokio::test]
1213	async fn basic_request_response_works() {
1214		let protocol_name = ProtocolName::from("/test/req-resp/1");
1215
1216		let mut swarms = (0..2)
1218			.map(|_| {
1219				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1220
1221				tokio::spawn(async move {
1222					while let Some(rq) = rx.next().await {
1223						let (fb_tx, fb_rx) = oneshot::channel();
1224						assert_eq!(rq.payload, b"this is a request");
1225						let _ = rq.pending_response.send(super::OutgoingResponse {
1226							result: Ok(b"this is a response".to_vec()),
1227							reputation_changes: Vec::new(),
1228							sent_feedback: Some(fb_tx),
1229						});
1230						fb_rx.await.unwrap();
1231					}
1232				});
1233
1234				let protocol_config = ProtocolConfig {
1235					name: protocol_name.clone(),
1236					fallback_names: Vec::new(),
1237					max_request_size: 1024,
1238					max_response_size: 1024 * 1024,
1239					request_timeout: Duration::from_secs(30),
1240					inbound_queue: Some(tx),
1241				};
1242
1243				build_swarm(iter::once(protocol_config))
1244			})
1245			.collect::<Vec<_>>();
1246
1247		{
1250			let dial_addr = swarms[1].1.clone();
1251			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1252		}
1253
1254		let (mut swarm, _) = swarms.remove(0);
1255		tokio::spawn(async move {
1257			loop {
1258				match swarm.select_next_some().await {
1259					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1260						result.unwrap();
1261					},
1262					_ => {},
1263				}
1264			}
1265		});
1266
1267		let (mut swarm, _) = swarms.remove(0);
1269		let mut response_receiver = None;
1270
1271		loop {
1272			match swarm.select_next_some().await {
1273				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1274					let (sender, receiver) = oneshot::channel();
1275					swarm.behaviour_mut().send_request(
1276						&peer_id,
1277						protocol_name.clone(),
1278						b"this is a request".to_vec(),
1279						None,
1280						sender,
1281						IfDisconnected::ImmediateError,
1282					);
1283					assert!(response_receiver.is_none());
1284					response_receiver = Some(receiver);
1285				},
1286				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1287					result.unwrap();
1288					break
1289				},
1290				_ => {},
1291			}
1292		}
1293
1294		assert_eq!(
1295			response_receiver.unwrap().await.unwrap().unwrap(),
1296			(b"this is a response".to_vec(), protocol_name)
1297		);
1298	}
1299
1300	#[tokio::test]
1301	async fn max_response_size_exceeded() {
1302		let protocol_name = ProtocolName::from("/test/req-resp/1");
1303
1304		let mut swarms = (0..2)
1306			.map(|_| {
1307				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1308
1309				tokio::spawn(async move {
1310					while let Some(rq) = rx.next().await {
1311						assert_eq!(rq.payload, b"this is a request");
1312						let _ = rq.pending_response.send(super::OutgoingResponse {
1313							result: Ok(b"this response exceeds the limit".to_vec()),
1314							reputation_changes: Vec::new(),
1315							sent_feedback: None,
1316						});
1317					}
1318				});
1319
1320				let protocol_config = ProtocolConfig {
1321					name: protocol_name.clone(),
1322					fallback_names: Vec::new(),
1323					max_request_size: 1024,
1324					max_response_size: 8, request_timeout: Duration::from_secs(30),
1326					inbound_queue: Some(tx),
1327				};
1328
1329				build_swarm(iter::once(protocol_config))
1330			})
1331			.collect::<Vec<_>>();
1332
1333		{
1336			let dial_addr = swarms[1].1.clone();
1337			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1338		}
1339
1340		let (mut swarm, _) = swarms.remove(0);
1343		tokio::spawn(async move {
1344			loop {
1345				match swarm.select_next_some().await {
1346					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1347						assert!(result.is_ok());
1348					},
1349					SwarmEvent::ConnectionClosed { .. } => {
1350						break;
1351					},
1352					_ => {},
1353				}
1354			}
1355		});
1356
1357		let (mut swarm, _) = swarms.remove(0);
1359
1360		let mut response_receiver = None;
1361
1362		loop {
1363			match swarm.select_next_some().await {
1364				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1365					let (sender, receiver) = oneshot::channel();
1366					swarm.behaviour_mut().send_request(
1367						&peer_id,
1368						protocol_name.clone(),
1369						b"this is a request".to_vec(),
1370						None,
1371						sender,
1372						IfDisconnected::ImmediateError,
1373					);
1374					assert!(response_receiver.is_none());
1375					response_receiver = Some(receiver);
1376				},
1377				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1378					assert!(result.is_err());
1379					break
1380				},
1381				_ => {},
1382			}
1383		}
1384
1385		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1386			RequestFailure::Network(OutboundFailure::Io(_)) => {},
1387			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1388		}
1389	}
1390
1391	#[tokio::test]
1402	async fn request_id_collision() {
1403		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1404		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1405
1406		let mut swarm_1 = {
1407			let protocol_configs = vec![
1408				ProtocolConfig {
1409					name: protocol_name_1.clone(),
1410					fallback_names: Vec::new(),
1411					max_request_size: 1024,
1412					max_response_size: 1024 * 1024,
1413					request_timeout: Duration::from_secs(30),
1414					inbound_queue: None,
1415				},
1416				ProtocolConfig {
1417					name: protocol_name_2.clone(),
1418					fallback_names: Vec::new(),
1419					max_request_size: 1024,
1420					max_response_size: 1024 * 1024,
1421					request_timeout: Duration::from_secs(30),
1422					inbound_queue: None,
1423				},
1424			];
1425
1426			build_swarm(protocol_configs.into_iter()).0
1427		};
1428
1429		let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1430			let (tx_1, rx_1) = async_channel::bounded(64);
1431			let (tx_2, rx_2) = async_channel::bounded(64);
1432
1433			let protocol_configs = vec![
1434				ProtocolConfig {
1435					name: protocol_name_1.clone(),
1436					fallback_names: Vec::new(),
1437					max_request_size: 1024,
1438					max_response_size: 1024 * 1024,
1439					request_timeout: Duration::from_secs(30),
1440					inbound_queue: Some(tx_1),
1441				},
1442				ProtocolConfig {
1443					name: protocol_name_2.clone(),
1444					fallback_names: Vec::new(),
1445					max_request_size: 1024,
1446					max_response_size: 1024 * 1024,
1447					request_timeout: Duration::from_secs(30),
1448					inbound_queue: Some(tx_2),
1449				},
1450			];
1451
1452			let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1453
1454			(swarm, rx_1, rx_2, listen_addr)
1455		};
1456
1457		swarm_1.dial(listen_add_2).unwrap();
1460
1461		tokio::spawn(async move {
1463			loop {
1464				match swarm_2.select_next_some().await {
1465					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1466						result.unwrap();
1467					},
1468					_ => {},
1469				}
1470			}
1471		});
1472
1473		tokio::spawn(async move {
1478			let protocol_1_request = swarm_2_handler_1.next().await;
1479			let protocol_2_request = swarm_2_handler_2.next().await;
1480
1481			protocol_1_request
1482				.unwrap()
1483				.pending_response
1484				.send(OutgoingResponse {
1485					result: Ok(b"this is a response".to_vec()),
1486					reputation_changes: Vec::new(),
1487					sent_feedback: None,
1488				})
1489				.unwrap();
1490			protocol_2_request
1491				.unwrap()
1492				.pending_response
1493				.send(OutgoingResponse {
1494					result: Ok(b"this is a response".to_vec()),
1495					reputation_changes: Vec::new(),
1496					sent_feedback: None,
1497				})
1498				.unwrap();
1499		});
1500
1501		let mut response_receivers = None;
1504		let mut num_responses = 0;
1505
1506		loop {
1507			match swarm_1.select_next_some().await {
1508				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1509					let (sender_1, receiver_1) = oneshot::channel();
1510					let (sender_2, receiver_2) = oneshot::channel();
1511					swarm_1.behaviour_mut().send_request(
1512						&peer_id,
1513						protocol_name_1.clone(),
1514						b"this is a request".to_vec(),
1515						None,
1516						sender_1,
1517						IfDisconnected::ImmediateError,
1518					);
1519					swarm_1.behaviour_mut().send_request(
1520						&peer_id,
1521						protocol_name_2.clone(),
1522						b"this is a request".to_vec(),
1523						None,
1524						sender_2,
1525						IfDisconnected::ImmediateError,
1526					);
1527					assert!(response_receivers.is_none());
1528					response_receivers = Some((receiver_1, receiver_2));
1529				},
1530				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1531					num_responses += 1;
1532					result.unwrap();
1533					if num_responses == 2 {
1534						break
1535					}
1536				},
1537				_ => {},
1538			}
1539		}
1540		let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1541		assert_eq!(
1542			response_receiver_1.await.unwrap().unwrap(),
1543			(b"this is a response".to_vec(), protocol_name_1)
1544		);
1545		assert_eq!(
1546			response_receiver_2.await.unwrap().unwrap(),
1547			(b"this is a response".to_vec(), protocol_name_2)
1548		);
1549	}
1550
1551	#[tokio::test]
1552	async fn request_fallback() {
1553		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1554		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1555		let protocol_name_2 = ProtocolName::from("/test/another");
1556
1557		let protocol_config_1 = ProtocolConfig {
1558			name: protocol_name_1.clone(),
1559			fallback_names: Vec::new(),
1560			max_request_size: 1024,
1561			max_response_size: 1024 * 1024,
1562			request_timeout: Duration::from_secs(30),
1563			inbound_queue: None,
1564		};
1565		let protocol_config_1_fallback = ProtocolConfig {
1566			name: protocol_name_1_fallback.clone(),
1567			fallback_names: Vec::new(),
1568			max_request_size: 1024,
1569			max_response_size: 1024 * 1024,
1570			request_timeout: Duration::from_secs(30),
1571			inbound_queue: None,
1572		};
1573		let protocol_config_2 = ProtocolConfig {
1574			name: protocol_name_2.clone(),
1575			fallback_names: Vec::new(),
1576			max_request_size: 1024,
1577			max_response_size: 1024 * 1024,
1578			request_timeout: Duration::from_secs(30),
1579			inbound_queue: None,
1580		};
1581
1582		let mut older_swarm = {
1585			let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1586			let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1587			let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1588			protocol_config_1_fallback.inbound_queue = Some(tx_1);
1589
1590			let mut protocol_config_2 = protocol_config_2.clone();
1591			protocol_config_2.inbound_queue = Some(tx_2);
1592
1593			tokio::spawn(async move {
1594				for _ in 0..2 {
1595					if let Some(rq) = rx_1.next().await {
1596						let (fb_tx, fb_rx) = oneshot::channel();
1597						assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1598						let _ = rq.pending_response.send(super::OutgoingResponse {
1599							result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1600							reputation_changes: Vec::new(),
1601							sent_feedback: Some(fb_tx),
1602						});
1603						fb_rx.await.unwrap();
1604					}
1605				}
1606
1607				if let Some(rq) = rx_2.next().await {
1608					let (fb_tx, fb_rx) = oneshot::channel();
1609					assert_eq!(rq.payload, b"request on protocol /test/other");
1610					let _ = rq.pending_response.send(super::OutgoingResponse {
1611						result: Ok(b"this is a response on protocol /test/other".to_vec()),
1612						reputation_changes: Vec::new(),
1613						sent_feedback: Some(fb_tx),
1614					});
1615					fb_rx.await.unwrap();
1616				}
1617			});
1618
1619			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1620		};
1621
1622		let mut new_swarm = build_swarm(
1624			vec![
1625				protocol_config_1.clone(),
1626				protocol_config_1_fallback.clone(),
1627				protocol_config_2.clone(),
1628			]
1629			.into_iter(),
1630		);
1631
1632		{
1633			let dial_addr = older_swarm.1.clone();
1634			Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1635		}
1636
1637		tokio::spawn(async move {
1639			loop {
1640				_ = older_swarm.0.select_next_some().await;
1641			}
1642		});
1643
1644		let (mut swarm, _) = new_swarm;
1646		let mut older_peer_id = None;
1647
1648		let mut response_receiver = None;
1649		loop {
1651			match swarm.select_next_some().await {
1652				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1653					older_peer_id = Some(peer_id);
1654					let (sender, receiver) = oneshot::channel();
1655					swarm.behaviour_mut().send_request(
1656						&peer_id,
1657						protocol_name_1.clone(),
1658						b"request on protocol /test/req-resp/2".to_vec(),
1659						Some((
1660							b"request on protocol /test/req-resp/1".to_vec(),
1661							protocol_config_1_fallback.name.clone(),
1662						)),
1663						sender,
1664						IfDisconnected::ImmediateError,
1665					);
1666					response_receiver = Some(receiver);
1667				},
1668				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1669					result.unwrap();
1670					break
1671				},
1672				_ => {},
1673			}
1674		}
1675		assert_eq!(
1676			response_receiver.unwrap().await.unwrap().unwrap(),
1677			(
1678				b"this is a response on protocol /test/req-resp/1".to_vec(),
1679				protocol_name_1_fallback.clone()
1680			)
1681		);
1682		let (sender, response_receiver) = oneshot::channel();
1684		swarm.behaviour_mut().send_request(
1685			older_peer_id.as_ref().unwrap(),
1686			protocol_name_1_fallback.clone(),
1687			b"request on protocol /test/req-resp/1".to_vec(),
1688			Some((
1689				b"dummy request, will fail if processed".to_vec(),
1690				protocol_config_1_fallback.name.clone(),
1691			)),
1692			sender,
1693			IfDisconnected::ImmediateError,
1694		);
1695		loop {
1696			match swarm.select_next_some().await {
1697				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1698					result.unwrap();
1699					break
1700				},
1701				_ => {},
1702			}
1703		}
1704		assert_eq!(
1705			response_receiver.await.unwrap().unwrap(),
1706			(
1707				b"this is a response on protocol /test/req-resp/1".to_vec(),
1708				protocol_name_1_fallback.clone()
1709			)
1710		);
1711		let (sender, response_receiver) = oneshot::channel();
1713		swarm.behaviour_mut().send_request(
1714			older_peer_id.as_ref().unwrap(),
1715			protocol_name_1.clone(),
1716			b"request on protocol /test/req-resp-2".to_vec(),
1717			None,
1718			sender,
1719			IfDisconnected::ImmediateError,
1720		);
1721		loop {
1722			match swarm.select_next_some().await {
1723				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1724					assert_matches!(
1725						result.unwrap_err(),
1726						RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1727					);
1728					break
1729				},
1730				_ => {},
1731			}
1732		}
1733		assert!(response_receiver.await.unwrap().is_err());
1734		let (sender, response_receiver) = oneshot::channel();
1736		swarm.behaviour_mut().send_request(
1737			older_peer_id.as_ref().unwrap(),
1738			protocol_name_2.clone(),
1739			b"request on protocol /test/other".to_vec(),
1740			None,
1741			sender,
1742			IfDisconnected::ImmediateError,
1743		);
1744		loop {
1745			match swarm.select_next_some().await {
1746				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1747					result.unwrap();
1748					break
1749				},
1750				_ => {},
1751			}
1752		}
1753		assert_eq!(
1754			response_receiver.await.unwrap().unwrap(),
1755			(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1756		);
1757	}
1758
1759	#[tokio::test]
1773	async fn enforce_outbound_timeouts() {
1774		const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1775		const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1776
1777		let protocol_name = ProtocolName::from("/test/req-resp/1");
1779
1780		let protocol_config = ProtocolConfig {
1781			name: protocol_name.clone(),
1782			fallback_names: Vec::new(),
1783			max_request_size: 1024,
1784			max_response_size: 1024 * 1024,
1785			request_timeout: REQUEST_TIMEOUT, inbound_queue: None,
1787		};
1788
1789		let (mut first_swarm, _) = {
1791			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1792
1793			tokio::spawn(async move {
1794				if let Some(rq) = rx.next().await {
1795					assert_eq!(rq.payload, b"this is a request");
1796
1797					tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1800
1801					let _ = rq.pending_response.send(super::OutgoingResponse {
1804						result: Ok(b"Second swarm already timedout".to_vec()),
1805						reputation_changes: Vec::new(),
1806						sent_feedback: None,
1807					});
1808				}
1809			});
1810
1811			let mut protocol_config = protocol_config.clone();
1812			protocol_config.inbound_queue = Some(tx);
1813
1814			build_swarm(iter::once(protocol_config))
1815		};
1816
1817		let (mut second_swarm, second_address) = {
1818			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1819
1820			tokio::spawn(async move {
1821				while let Some(rq) = rx.next().await {
1822					let _ = rq.pending_response.send(super::OutgoingResponse {
1823						result: Ok(b"This is the response".to_vec()),
1824						reputation_changes: Vec::new(),
1825						sent_feedback: None,
1826					});
1827				}
1828			});
1829			let mut protocol_config = protocol_config.clone();
1830			protocol_config.inbound_queue = Some(tx);
1831
1832			build_swarm(iter::once(protocol_config.clone()))
1833		};
1834		second_swarm
1836			.behaviour_mut()
1837			.protocols
1838			.get_mut(&protocol_name)
1839			.unwrap()
1840			.request_timeout = REQUEST_TIMEOUT_SHORT;
1841
1842		{
1844			Swarm::dial(&mut first_swarm, second_address).unwrap();
1845		}
1846
1847		tokio::spawn(async move {
1850			loop {
1851				let event = first_swarm.select_next_some().await;
1852				match event {
1853					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1854						assert!(result.is_ok());
1855						break;
1856					},
1857					SwarmEvent::ConnectionClosed { .. } => {
1858						break;
1859					},
1860					_ => {},
1861				}
1862			}
1863		});
1864
1865		let mut response_receiver = None;
1869		loop {
1870			let event = second_swarm.select_next_some().await;
1871
1872			match event {
1873				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1874					let (sender, receiver) = oneshot::channel();
1875					second_swarm.behaviour_mut().send_request(
1876						&peer_id,
1877						protocol_name.clone(),
1878						b"this is a request".to_vec(),
1879						None,
1880						sender,
1881						IfDisconnected::ImmediateError,
1882					);
1883					assert!(response_receiver.is_none());
1884					response_receiver = Some(receiver);
1885				},
1886				SwarmEvent::ConnectionClosed { .. } => {
1887					break;
1888				},
1889				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1890					assert!(result.is_err());
1891					break
1892				},
1893				_ => {},
1894			}
1895		}
1896
1897		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1899			RequestFailure::Network(OutboundFailure::Timeout) => {},
1900			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1901		}
1902	}
1903}