1use crate::{
38 peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39 service::traits::RequestResponseConfig as RequestResponseConfigT,
40 types::ProtocolName,
41 ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46 core::{transport::PortUse, Endpoint, Multiaddr},
47 request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48 swarm::{
49 behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50 NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51 },
52 PeerId,
53};
54
55use std::{
56 collections::{hash_map::Entry, HashMap},
57 io, iter,
58 ops::Deref,
59 pin::Pin,
60 sync::Arc,
61 task::{Context, Poll},
62 time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77 #[error("Failed to dial the requested peer")]
79 DialFailure,
80 #[error("Timeout while waiting for a response")]
82 Timeout,
83 #[error("Connection was closed before a response was received")]
85 ConnectionClosed,
86 #[error("The remote supports none of the requested protocols")]
88 UnsupportedProtocols,
89 #[error("An IO failure happened on an outbound stream")]
91 Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95 fn from(out: request_response::OutboundFailure) -> Self {
96 match out {
97 request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98 request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99 request_response::OutboundFailure::ConnectionClosed =>
100 OutboundFailure::ConnectionClosed,
101 request_response::OutboundFailure::UnsupportedProtocols =>
102 OutboundFailure::UnsupportedProtocols,
103 request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
104 }
105 }
106}
107
108#[derive(Debug, thiserror::Error)]
111pub enum InboundFailure {
112 #[error("Timeout while receiving request or sending response")]
115 Timeout,
116 #[error("Connection was closed before a response could be sent")]
118 ConnectionClosed,
119 #[error("The local peer supports none of the protocols requested by the remote")]
121 UnsupportedProtocols,
122 #[error("The response channel was dropped without sending a response to the remote")]
124 ResponseOmission,
125 #[error("An IO failure happened on an inbound stream")]
127 Io(Arc<io::Error>),
128}
129
130impl From<request_response::InboundFailure> for InboundFailure {
131 fn from(out: request_response::InboundFailure) -> Self {
132 match out {
133 request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
134 request_response::InboundFailure::Timeout => InboundFailure::Timeout,
135 request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
136 request_response::InboundFailure::UnsupportedProtocols =>
137 InboundFailure::UnsupportedProtocols,
138 request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
139 }
140 }
141}
142
143#[derive(Debug, thiserror::Error)]
145#[allow(missing_docs)]
146pub enum RequestFailure {
147 #[error("We are not currently connected to the requested peer.")]
148 NotConnected,
149 #[error("Given protocol hasn't been registered.")]
150 UnknownProtocol,
151 #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
152 Refused,
153 #[error("The remote replied, but the local node is no longer interested in the response.")]
154 Obsolete,
155 #[error("Problem on the network: {0}")]
156 Network(OutboundFailure),
157}
158
159#[derive(Debug, Clone)]
161pub struct ProtocolConfig {
162 pub name: ProtocolName,
164
165 pub fallback_names: Vec<ProtocolName>,
167
168 pub max_request_size: u64,
173
174 pub max_response_size: u64,
179
180 pub request_timeout: Duration,
184
185 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
206}
207
208impl RequestResponseConfigT for ProtocolConfig {
209 fn protocol_name(&self) -> &ProtocolName {
210 &self.name
211 }
212}
213
214#[derive(Debug)]
216pub struct IncomingRequest {
217 pub peer: sc_network_types::PeerId,
219
220 pub payload: Vec<u8>,
223
224 pub pending_response: oneshot::Sender<OutgoingResponse>,
233}
234
235#[derive(Debug)]
237pub struct OutgoingResponse {
238 pub result: Result<Vec<u8>, ()>,
242
243 pub reputation_changes: Vec<ReputationChange>,
246
247 pub sent_feedback: Option<oneshot::Sender<()>>,
256}
257
258struct PendingRequest {
260 started_at: Instant,
262 response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
267 fallback_request: Option<(Vec<u8>, ProtocolName)>,
269}
270
271#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
273pub enum IfDisconnected {
274 TryConnect,
276 ImmediateError,
278}
279
280impl IfDisconnected {
282 pub fn should_connect(self) -> bool {
284 match self {
285 Self::TryConnect => true,
286 Self::ImmediateError => false,
287 }
288 }
289}
290
291#[derive(Debug)]
293pub enum Event {
294 InboundRequest {
298 peer: PeerId,
300 protocol: ProtocolName,
302 result: Result<Duration, ResponseFailure>,
307 },
308
309 RequestFinished {
314 peer: PeerId,
316 protocol: ProtocolName,
318 duration: Duration,
320 result: Result<(), RequestFailure>,
322 },
323
324 ReputationChanges {
326 peer: PeerId,
328 changes: Vec<ReputationChange>,
330 },
331}
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
340struct ProtocolRequestId<RequestId> {
341 protocol: ProtocolName,
342 request_id: RequestId,
343}
344
345impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
346 fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
347 Self { protocol, request_id }
348 }
349}
350
351struct ProtocolDetails {
353 behaviour: Behaviour<GenericCodec>,
354 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
355 request_timeout: Duration,
356}
357
358pub struct RequestResponsesBehaviour {
360 protocols: HashMap<ProtocolName, ProtocolDetails>,
365
366 pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
368
369 pending_responses: stream::FuturesUnordered<
372 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
373 >,
374
375 pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
377
378 send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
381
382 peer_store: Arc<dyn PeerStoreProvider>,
384
385 periodic_request_check: tokio::time::Interval,
392}
393
394struct RequestProcessingOutcome {
396 peer: PeerId,
397 request_id: InboundRequestId,
398 protocol: ProtocolName,
399 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
400 response: OutgoingResponse,
401}
402
403impl RequestResponsesBehaviour {
404 pub fn new(
407 list: impl Iterator<Item = ProtocolConfig>,
408 peer_store: Arc<dyn PeerStoreProvider>,
409 ) -> Result<Self, RegisterError> {
410 let mut protocols = HashMap::new();
411 for protocol in list {
412 let cfg = Config::default().with_request_timeout(protocol.request_timeout);
413
414 let protocol_support = if protocol.inbound_queue.is_some() {
415 ProtocolSupport::Full
416 } else {
417 ProtocolSupport::Outbound
418 };
419
420 let behaviour = Behaviour::with_codec(
421 GenericCodec {
422 max_request_size: protocol.max_request_size,
423 max_response_size: protocol.max_response_size,
424 },
425 iter::once(protocol.name.clone())
426 .chain(protocol.fallback_names)
427 .zip(iter::repeat(protocol_support)),
428 cfg,
429 );
430
431 match protocols.entry(protocol.name) {
432 Entry::Vacant(e) => e.insert(ProtocolDetails {
433 behaviour,
434 inbound_queue: protocol.inbound_queue,
435 request_timeout: protocol.request_timeout,
436 }),
437 Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
438 };
439 }
440
441 Ok(Self {
442 protocols,
443 pending_requests: Default::default(),
444 pending_responses: Default::default(),
445 pending_responses_arrival_time: Default::default(),
446 send_feedback: Default::default(),
447 peer_store,
448 periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
449 })
450 }
451
452 pub fn send_request(
459 &mut self,
460 target: &PeerId,
461 protocol_name: ProtocolName,
462 request: Vec<u8>,
463 fallback_request: Option<(Vec<u8>, ProtocolName)>,
464 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
465 connect: IfDisconnected,
466 ) {
467 log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
468
469 if let Some(ProtocolDetails { behaviour, .. }) =
470 self.protocols.get_mut(protocol_name.deref())
471 {
472 Self::send_request_inner(
473 behaviour,
474 &mut self.pending_requests,
475 target,
476 protocol_name,
477 request,
478 fallback_request,
479 pending_response,
480 connect,
481 )
482 } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
483 log::debug!(
484 target: LOG_TARGET,
485 "Unknown protocol {:?}. At the same time local \
486 node is no longer interested in the result.",
487 protocol_name,
488 );
489 }
490 }
491
492 fn send_request_inner(
493 behaviour: &mut Behaviour<GenericCodec>,
494 pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
495 target: &PeerId,
496 protocol_name: ProtocolName,
497 request: Vec<u8>,
498 fallback_request: Option<(Vec<u8>, ProtocolName)>,
499 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
500 connect: IfDisconnected,
501 ) {
502 if behaviour.is_connected(target) || connect.should_connect() {
503 let request_id = behaviour.send_request(target, request);
504 let prev_req_id = pending_requests.insert(
505 (protocol_name.to_string().into(), request_id).into(),
506 PendingRequest {
507 started_at: Instant::now(),
508 response_tx: Some(pending_response),
509 fallback_request,
510 },
511 );
512 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
513 } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
514 log::debug!(
515 target: LOG_TARGET,
516 "Not connected to peer {:?}. At the same time local \
517 node is no longer interested in the result.",
518 target,
519 );
520 }
521 }
522}
523
524impl NetworkBehaviour for RequestResponsesBehaviour {
525 type ConnectionHandler =
526 MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
527 type ToSwarm = Event;
528
529 fn handle_pending_inbound_connection(
530 &mut self,
531 _connection_id: ConnectionId,
532 _local_addr: &Multiaddr,
533 _remote_addr: &Multiaddr,
534 ) -> Result<(), ConnectionDenied> {
535 Ok(())
536 }
537
538 fn handle_pending_outbound_connection(
539 &mut self,
540 _connection_id: ConnectionId,
541 _maybe_peer: Option<PeerId>,
542 _addresses: &[Multiaddr],
543 _effective_role: Endpoint,
544 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
545 Ok(Vec::new())
546 }
547
548 fn handle_established_inbound_connection(
549 &mut self,
550 connection_id: ConnectionId,
551 peer: PeerId,
552 local_addr: &Multiaddr,
553 remote_addr: &Multiaddr,
554 ) -> Result<THandler<Self>, ConnectionDenied> {
555 let iter =
556 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
557 if let Ok(handler) = behaviour.handle_established_inbound_connection(
558 connection_id,
559 peer,
560 local_addr,
561 remote_addr,
562 ) {
563 Some((p.to_string(), handler))
564 } else {
565 None
566 }
567 });
568
569 Ok(MultiHandler::try_from_iter(iter).expect(
570 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
571 which is the only possible error; qed",
572 ))
573 }
574
575 fn handle_established_outbound_connection(
576 &mut self,
577 connection_id: ConnectionId,
578 peer: PeerId,
579 addr: &Multiaddr,
580 role_override: Endpoint,
581 port_use: PortUse,
582 ) -> Result<THandler<Self>, ConnectionDenied> {
583 let iter =
584 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
585 if let Ok(handler) = behaviour.handle_established_outbound_connection(
586 connection_id,
587 peer,
588 addr,
589 role_override,
590 port_use,
591 ) {
592 Some((p.to_string(), handler))
593 } else {
594 None
595 }
596 });
597
598 Ok(MultiHandler::try_from_iter(iter).expect(
599 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
600 which is the only possible error; qed",
601 ))
602 }
603
604 fn on_swarm_event(&mut self, event: FromSwarm) {
605 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
606 behaviour.on_swarm_event(event);
607 }
608 }
609
610 fn on_connection_handler_event(
611 &mut self,
612 peer_id: PeerId,
613 connection_id: ConnectionId,
614 event: THandlerOutEvent<Self>,
615 ) {
616 let p_name = event.0;
617 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
618 return behaviour.on_connection_handler_event(peer_id, connection_id, event.1)
619 } else {
620 log::warn!(
621 target: LOG_TARGET,
622 "on_connection_handler_event: no request-response instance registered for protocol {:?}",
623 p_name
624 );
625 }
626 }
627
628 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
629 'poll_all: loop {
630 if self.periodic_request_check.poll_tick(cx).is_ready() {
632 self.pending_requests.retain(|id, req| {
633 let Some(ProtocolDetails { request_timeout, .. }) =
634 self.protocols.get(&id.protocol)
635 else {
636 log::warn!(
637 target: LOG_TARGET,
638 "Request {id:?} has no protocol registered.",
639 );
640
641 if let Some(response_tx) = req.response_tx.take() {
642 if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
643 log::debug!(
644 target: LOG_TARGET,
645 "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
646 );
647 }
648 }
649 return false
650 };
651
652 let elapsed = req.started_at.elapsed();
653 if elapsed > *request_timeout {
654 log::debug!(
655 target: LOG_TARGET,
656 "Request {id:?} force detected as timeout.",
657 );
658
659 if let Some(response_tx) = req.response_tx.take() {
660 if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
661 log::debug!(
662 target: LOG_TARGET,
663 "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
664 );
665 }
666 }
667
668 false
669 } else {
670 true
671 }
672 });
673 }
674
675 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
677 let RequestProcessingOutcome {
678 peer,
679 request_id,
680 protocol: protocol_name,
681 inner_channel,
682 response: OutgoingResponse { result, reputation_changes, sent_feedback },
683 } = match outcome {
684 Some(outcome) => outcome,
685 None => continue,
688 };
689
690 if let Ok(payload) = result {
691 if let Some(ProtocolDetails { behaviour, .. }) =
692 self.protocols.get_mut(&*protocol_name)
693 {
694 log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
695
696 if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
697 log::debug!(
700 target: LOG_TARGET,
701 "Failed to send response for {:?} on protocol {:?} due to a \
702 timeout or due to the connection to the peer being closed. \
703 Dropping response",
704 request_id, protocol_name,
705 );
706 } else if let Some(sent_feedback) = sent_feedback {
707 self.send_feedback
708 .insert((protocol_name, request_id).into(), sent_feedback);
709 }
710 }
711 }
712
713 if !reputation_changes.is_empty() {
714 return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
715 peer,
716 changes: reputation_changes,
717 }))
718 }
719 }
720
721 let mut fallback_requests = vec![];
722
723 for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
725 {
726 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
727 let ev = match ev {
728 ToSwarm::GenerateEvent(ev) => ev,
730
731 ToSwarm::Dial { opts } => {
734 if opts.get_peer_id().is_none() {
735 log::error!(
736 target: LOG_TARGET,
737 "The request-response isn't supposed to start dialing addresses"
738 );
739 }
740 return Poll::Ready(ToSwarm::Dial { opts })
741 },
742 event => {
743 return Poll::Ready(
744 event.map_in(|event| ((*protocol).to_string(), event)).map_out(
745 |_| {
746 unreachable!(
747 "`GenerateEvent` is handled in a branch above; qed"
748 )
749 },
750 ),
751 );
752 },
753 };
754
755 match ev {
756 request_response::Event::Message {
758 peer,
759 message: Message::Request { request_id, request, channel, .. },
760 } => {
761 self.pending_responses_arrival_time
762 .insert((protocol.clone(), request_id).into(), Instant::now());
763
764 let reputation = self.peer_store.peer_reputation(&peer.into());
765
766 if reputation < BANNED_THRESHOLD {
767 log::debug!(
768 target: LOG_TARGET,
769 "Cannot handle requests from a node with a low reputation {}: {}",
770 peer,
771 reputation,
772 );
773 continue 'poll_protocol
774 }
775
776 let (tx, rx) = oneshot::channel();
777
778 if let Some(resp_builder) = inbound_queue {
781 let _ = resp_builder.try_send(IncomingRequest {
788 peer: peer.into(),
789 payload: request,
790 pending_response: tx,
791 });
792 } else {
793 debug_assert!(false, "Received message on outbound-only protocol.");
794 }
795
796 let protocol = protocol.clone();
797
798 self.pending_responses.push(Box::pin(async move {
799 rx.await.map_or(None, |response| {
803 Some(RequestProcessingOutcome {
804 peer,
805 request_id,
806 protocol,
807 inner_channel: channel,
808 response,
809 })
810 })
811 }));
812
813 continue 'poll_all
816 },
817
818 request_response::Event::Message {
820 peer,
821 message: Message::Response { request_id, response },
822 ..
823 } => {
824 let (started, delivered) = match self
825 .pending_requests
826 .remove(&(protocol.clone(), request_id).into())
827 {
828 Some(PendingRequest {
829 started_at,
830 response_tx: Some(response_tx),
831 ..
832 }) => {
833 log::trace!(
834 target: LOG_TARGET,
835 "received response from {peer} ({protocol:?}), {} bytes",
836 response.as_ref().map_or(0usize, |response| response.len()),
837 );
838
839 let delivered = response_tx
840 .send(
841 response
842 .map_err(|()| RequestFailure::Refused)
843 .map(|resp| (resp, protocol.clone())),
844 )
845 .map_err(|_| RequestFailure::Obsolete);
846 (started_at, delivered)
847 },
848 _ => {
849 log::debug!(
850 target: LOG_TARGET,
851 "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
852 request_id,
853 peer,
854 );
855 continue
856 },
857 };
858
859 let out = Event::RequestFinished {
860 peer,
861 protocol: protocol.clone(),
862 duration: started.elapsed(),
863 result: delivered,
864 };
865
866 return Poll::Ready(ToSwarm::GenerateEvent(out))
867 },
868
869 request_response::Event::OutboundFailure {
871 peer,
872 request_id,
873 error,
874 ..
875 } => {
876 let error = OutboundFailure::from(error);
877 let started = match self
878 .pending_requests
879 .remove(&(protocol.clone(), request_id).into())
880 {
881 Some(PendingRequest {
882 started_at,
883 response_tx: Some(response_tx),
884 fallback_request,
885 }) => {
886 if matches!(error, OutboundFailure::UnsupportedProtocols) {
889 if let Some((fallback_request, fallback_protocol)) =
890 fallback_request
891 {
892 log::trace!(
893 target: LOG_TARGET,
894 "Request with id {:?} failed. Trying the fallback protocol. {}",
895 request_id,
896 fallback_protocol.deref()
897 );
898 fallback_requests.push((
899 peer,
900 fallback_protocol,
901 fallback_request,
902 response_tx,
903 ));
904 continue
905 }
906 }
907
908 if response_tx
909 .send(Err(RequestFailure::Network(error.clone())))
910 .is_err()
911 {
912 log::debug!(
913 target: LOG_TARGET,
914 "Request with id {:?} failed. At the same time local \
915 node is no longer interested in the result.",
916 request_id,
917 );
918 }
919 started_at
920 },
921 _ => {
922 log::debug!(
923 target: LOG_TARGET,
924 "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
925 request_id,
926 error,
927 peer
928 );
929 continue
930 },
931 };
932
933 let out = Event::RequestFinished {
934 peer,
935 protocol: protocol.clone(),
936 duration: started.elapsed(),
937 result: Err(RequestFailure::Network(error)),
938 };
939
940 return Poll::Ready(ToSwarm::GenerateEvent(out))
941 },
942
943 request_response::Event::InboundFailure {
946 request_id, peer, error, ..
947 } => {
948 self.pending_responses_arrival_time
949 .remove(&(protocol.clone(), request_id).into());
950 self.send_feedback.remove(&(protocol.clone(), request_id).into());
951 let out = Event::InboundRequest {
952 peer,
953 protocol: protocol.clone(),
954 result: Err(ResponseFailure::Network(error.into())),
955 };
956 return Poll::Ready(ToSwarm::GenerateEvent(out))
957 },
958
959 request_response::Event::ResponseSent { request_id, peer } => {
961 let arrival_time = self
962 .pending_responses_arrival_time
963 .remove(&(protocol.clone(), request_id).into())
964 .map(|t| t.elapsed())
965 .expect(
966 "Time is added for each inbound request on arrival and only \
967 removed on success (`ResponseSent`) or failure \
968 (`InboundFailure`). One can not receive a success event for a \
969 request that either never arrived, or that has previously \
970 failed; qed.",
971 );
972
973 if let Some(send_feedback) =
974 self.send_feedback.remove(&(protocol.clone(), request_id).into())
975 {
976 let _ = send_feedback.send(());
977 }
978
979 let out = Event::InboundRequest {
980 peer,
981 protocol: protocol.clone(),
982 result: Ok(arrival_time),
983 };
984
985 return Poll::Ready(ToSwarm::GenerateEvent(out))
986 },
987 };
988 }
989 }
990
991 for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
993 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
994 Self::send_request_inner(
995 behaviour,
996 &mut self.pending_requests,
997 &peer,
998 protocol,
999 request,
1000 None,
1001 pending_response,
1002 IfDisconnected::ImmediateError,
1006 );
1007 }
1008 }
1009
1010 break Poll::Pending
1011 }
1012 }
1013}
1014
1015#[derive(Debug, thiserror::Error)]
1017pub enum RegisterError {
1018 #[error("{0}")]
1020 DuplicateProtocol(ProtocolName),
1021}
1022
1023#[derive(Debug, thiserror::Error)]
1025pub enum ResponseFailure {
1026 #[error("Problem on the network: {0}")]
1028 Network(InboundFailure),
1029}
1030
1031#[derive(Debug, Clone)]
1034#[doc(hidden)] pub struct GenericCodec {
1036 max_request_size: u64,
1037 max_response_size: u64,
1038}
1039
1040#[async_trait::async_trait]
1041impl Codec for GenericCodec {
1042 type Protocol = ProtocolName;
1043 type Request = Vec<u8>;
1044 type Response = Result<Vec<u8>, ()>;
1045
1046 async fn read_request<T>(
1047 &mut self,
1048 _: &Self::Protocol,
1049 mut io: &mut T,
1050 ) -> io::Result<Self::Request>
1051 where
1052 T: AsyncRead + Unpin + Send,
1053 {
1054 let length = unsigned_varint::aio::read_usize(&mut io)
1056 .await
1057 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1058 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1059 return Err(io::Error::new(
1060 io::ErrorKind::InvalidInput,
1061 format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1062 ))
1063 }
1064
1065 let mut buffer = vec![0; length];
1067 io.read_exact(&mut buffer).await?;
1068 Ok(buffer)
1069 }
1070
1071 async fn read_response<T>(
1072 &mut self,
1073 _: &Self::Protocol,
1074 mut io: &mut T,
1075 ) -> io::Result<Self::Response>
1076 where
1077 T: AsyncRead + Unpin + Send,
1078 {
1079 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1086 Ok(l) => l,
1087 Err(unsigned_varint::io::ReadError::Io(err))
1088 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1089 return Ok(Err(())),
1090 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1091 };
1092
1093 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1094 return Err(io::Error::new(
1095 io::ErrorKind::InvalidInput,
1096 format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1097 ))
1098 }
1099
1100 let mut buffer = vec![0; length];
1102 io.read_exact(&mut buffer).await?;
1103 Ok(Ok(buffer))
1104 }
1105
1106 async fn write_request<T>(
1107 &mut self,
1108 _: &Self::Protocol,
1109 io: &mut T,
1110 req: Self::Request,
1111 ) -> io::Result<()>
1112 where
1113 T: AsyncWrite + Unpin + Send,
1114 {
1115 {
1118 let mut buffer = unsigned_varint::encode::usize_buffer();
1119 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1120 }
1121
1122 io.write_all(&req).await?;
1124
1125 io.close().await?;
1126 Ok(())
1127 }
1128
1129 async fn write_response<T>(
1130 &mut self,
1131 _: &Self::Protocol,
1132 io: &mut T,
1133 res: Self::Response,
1134 ) -> io::Result<()>
1135 where
1136 T: AsyncWrite + Unpin + Send,
1137 {
1138 if let Ok(res) = res {
1140 {
1143 let mut buffer = unsigned_varint::encode::usize_buffer();
1144 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1145 }
1146
1147 io.write_all(&res).await?;
1149 }
1150
1151 io.close().await?;
1152 Ok(())
1153 }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158 use super::*;
1159
1160 use crate::mock::MockPeerStore;
1161 use assert_matches::assert_matches;
1162 use futures::channel::oneshot;
1163 use libp2p::{
1164 core::{
1165 transport::{MemoryTransport, Transport},
1166 upgrade,
1167 },
1168 identity::Keypair,
1169 noise,
1170 swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1171 Multiaddr,
1172 };
1173 use std::{iter, time::Duration};
1174
1175 struct TokioExecutor;
1176 impl Executor for TokioExecutor {
1177 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1178 tokio::spawn(f);
1179 }
1180 }
1181
1182 fn build_swarm(
1183 list: impl Iterator<Item = ProtocolConfig>,
1184 ) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1185 let keypair = Keypair::generate_ed25519();
1186
1187 let transport = MemoryTransport::new()
1188 .upgrade(upgrade::Version::V1)
1189 .authenticate(noise::Config::new(&keypair).unwrap())
1190 .multiplex(libp2p::yamux::Config::default())
1191 .boxed();
1192
1193 let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1194
1195 let mut swarm = Swarm::new(
1196 transport,
1197 behaviour,
1198 keypair.public().to_peer_id(),
1199 SwarmConfig::with_executor(TokioExecutor {})
1200 .with_idle_connection_timeout(Duration::from_secs(10)),
1203 );
1204
1205 let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1206
1207 swarm.listen_on(listen_addr.clone()).unwrap();
1208
1209 (swarm, listen_addr)
1210 }
1211
1212 #[tokio::test]
1213 async fn basic_request_response_works() {
1214 let protocol_name = ProtocolName::from("/test/req-resp/1");
1215
1216 let mut swarms = (0..2)
1218 .map(|_| {
1219 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1220
1221 tokio::spawn(async move {
1222 while let Some(rq) = rx.next().await {
1223 let (fb_tx, fb_rx) = oneshot::channel();
1224 assert_eq!(rq.payload, b"this is a request");
1225 let _ = rq.pending_response.send(super::OutgoingResponse {
1226 result: Ok(b"this is a response".to_vec()),
1227 reputation_changes: Vec::new(),
1228 sent_feedback: Some(fb_tx),
1229 });
1230 fb_rx.await.unwrap();
1231 }
1232 });
1233
1234 let protocol_config = ProtocolConfig {
1235 name: protocol_name.clone(),
1236 fallback_names: Vec::new(),
1237 max_request_size: 1024,
1238 max_response_size: 1024 * 1024,
1239 request_timeout: Duration::from_secs(30),
1240 inbound_queue: Some(tx),
1241 };
1242
1243 build_swarm(iter::once(protocol_config))
1244 })
1245 .collect::<Vec<_>>();
1246
1247 {
1250 let dial_addr = swarms[1].1.clone();
1251 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1252 }
1253
1254 let (mut swarm, _) = swarms.remove(0);
1255 tokio::spawn(async move {
1257 loop {
1258 match swarm.select_next_some().await {
1259 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1260 result.unwrap();
1261 },
1262 _ => {},
1263 }
1264 }
1265 });
1266
1267 let (mut swarm, _) = swarms.remove(0);
1269 let mut response_receiver = None;
1270
1271 loop {
1272 match swarm.select_next_some().await {
1273 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1274 let (sender, receiver) = oneshot::channel();
1275 swarm.behaviour_mut().send_request(
1276 &peer_id,
1277 protocol_name.clone(),
1278 b"this is a request".to_vec(),
1279 None,
1280 sender,
1281 IfDisconnected::ImmediateError,
1282 );
1283 assert!(response_receiver.is_none());
1284 response_receiver = Some(receiver);
1285 },
1286 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1287 result.unwrap();
1288 break
1289 },
1290 _ => {},
1291 }
1292 }
1293
1294 assert_eq!(
1295 response_receiver.unwrap().await.unwrap().unwrap(),
1296 (b"this is a response".to_vec(), protocol_name)
1297 );
1298 }
1299
1300 #[tokio::test]
1301 async fn max_response_size_exceeded() {
1302 let protocol_name = ProtocolName::from("/test/req-resp/1");
1303
1304 let mut swarms = (0..2)
1306 .map(|_| {
1307 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1308
1309 tokio::spawn(async move {
1310 while let Some(rq) = rx.next().await {
1311 assert_eq!(rq.payload, b"this is a request");
1312 let _ = rq.pending_response.send(super::OutgoingResponse {
1313 result: Ok(b"this response exceeds the limit".to_vec()),
1314 reputation_changes: Vec::new(),
1315 sent_feedback: None,
1316 });
1317 }
1318 });
1319
1320 let protocol_config = ProtocolConfig {
1321 name: protocol_name.clone(),
1322 fallback_names: Vec::new(),
1323 max_request_size: 1024,
1324 max_response_size: 8, request_timeout: Duration::from_secs(30),
1326 inbound_queue: Some(tx),
1327 };
1328
1329 build_swarm(iter::once(protocol_config))
1330 })
1331 .collect::<Vec<_>>();
1332
1333 {
1336 let dial_addr = swarms[1].1.clone();
1337 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1338 }
1339
1340 let (mut swarm, _) = swarms.remove(0);
1343 tokio::spawn(async move {
1344 loop {
1345 match swarm.select_next_some().await {
1346 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1347 assert!(result.is_ok());
1348 },
1349 SwarmEvent::ConnectionClosed { .. } => {
1350 break;
1351 },
1352 _ => {},
1353 }
1354 }
1355 });
1356
1357 let (mut swarm, _) = swarms.remove(0);
1359
1360 let mut response_receiver = None;
1361
1362 loop {
1363 match swarm.select_next_some().await {
1364 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1365 let (sender, receiver) = oneshot::channel();
1366 swarm.behaviour_mut().send_request(
1367 &peer_id,
1368 protocol_name.clone(),
1369 b"this is a request".to_vec(),
1370 None,
1371 sender,
1372 IfDisconnected::ImmediateError,
1373 );
1374 assert!(response_receiver.is_none());
1375 response_receiver = Some(receiver);
1376 },
1377 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1378 assert!(result.is_err());
1379 break
1380 },
1381 _ => {},
1382 }
1383 }
1384
1385 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1386 RequestFailure::Network(OutboundFailure::Io(_)) => {},
1387 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1388 }
1389 }
1390
1391 #[tokio::test]
1402 async fn request_id_collision() {
1403 let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1404 let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1405
1406 let mut swarm_1 = {
1407 let protocol_configs = vec![
1408 ProtocolConfig {
1409 name: protocol_name_1.clone(),
1410 fallback_names: Vec::new(),
1411 max_request_size: 1024,
1412 max_response_size: 1024 * 1024,
1413 request_timeout: Duration::from_secs(30),
1414 inbound_queue: None,
1415 },
1416 ProtocolConfig {
1417 name: protocol_name_2.clone(),
1418 fallback_names: Vec::new(),
1419 max_request_size: 1024,
1420 max_response_size: 1024 * 1024,
1421 request_timeout: Duration::from_secs(30),
1422 inbound_queue: None,
1423 },
1424 ];
1425
1426 build_swarm(protocol_configs.into_iter()).0
1427 };
1428
1429 let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1430 let (tx_1, rx_1) = async_channel::bounded(64);
1431 let (tx_2, rx_2) = async_channel::bounded(64);
1432
1433 let protocol_configs = vec![
1434 ProtocolConfig {
1435 name: protocol_name_1.clone(),
1436 fallback_names: Vec::new(),
1437 max_request_size: 1024,
1438 max_response_size: 1024 * 1024,
1439 request_timeout: Duration::from_secs(30),
1440 inbound_queue: Some(tx_1),
1441 },
1442 ProtocolConfig {
1443 name: protocol_name_2.clone(),
1444 fallback_names: Vec::new(),
1445 max_request_size: 1024,
1446 max_response_size: 1024 * 1024,
1447 request_timeout: Duration::from_secs(30),
1448 inbound_queue: Some(tx_2),
1449 },
1450 ];
1451
1452 let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1453
1454 (swarm, rx_1, rx_2, listen_addr)
1455 };
1456
1457 swarm_1.dial(listen_add_2).unwrap();
1460
1461 tokio::spawn(async move {
1463 loop {
1464 match swarm_2.select_next_some().await {
1465 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1466 result.unwrap();
1467 },
1468 _ => {},
1469 }
1470 }
1471 });
1472
1473 tokio::spawn(async move {
1478 let protocol_1_request = swarm_2_handler_1.next().await;
1479 let protocol_2_request = swarm_2_handler_2.next().await;
1480
1481 protocol_1_request
1482 .unwrap()
1483 .pending_response
1484 .send(OutgoingResponse {
1485 result: Ok(b"this is a response".to_vec()),
1486 reputation_changes: Vec::new(),
1487 sent_feedback: None,
1488 })
1489 .unwrap();
1490 protocol_2_request
1491 .unwrap()
1492 .pending_response
1493 .send(OutgoingResponse {
1494 result: Ok(b"this is a response".to_vec()),
1495 reputation_changes: Vec::new(),
1496 sent_feedback: None,
1497 })
1498 .unwrap();
1499 });
1500
1501 let mut response_receivers = None;
1504 let mut num_responses = 0;
1505
1506 loop {
1507 match swarm_1.select_next_some().await {
1508 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1509 let (sender_1, receiver_1) = oneshot::channel();
1510 let (sender_2, receiver_2) = oneshot::channel();
1511 swarm_1.behaviour_mut().send_request(
1512 &peer_id,
1513 protocol_name_1.clone(),
1514 b"this is a request".to_vec(),
1515 None,
1516 sender_1,
1517 IfDisconnected::ImmediateError,
1518 );
1519 swarm_1.behaviour_mut().send_request(
1520 &peer_id,
1521 protocol_name_2.clone(),
1522 b"this is a request".to_vec(),
1523 None,
1524 sender_2,
1525 IfDisconnected::ImmediateError,
1526 );
1527 assert!(response_receivers.is_none());
1528 response_receivers = Some((receiver_1, receiver_2));
1529 },
1530 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1531 num_responses += 1;
1532 result.unwrap();
1533 if num_responses == 2 {
1534 break
1535 }
1536 },
1537 _ => {},
1538 }
1539 }
1540 let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1541 assert_eq!(
1542 response_receiver_1.await.unwrap().unwrap(),
1543 (b"this is a response".to_vec(), protocol_name_1)
1544 );
1545 assert_eq!(
1546 response_receiver_2.await.unwrap().unwrap(),
1547 (b"this is a response".to_vec(), protocol_name_2)
1548 );
1549 }
1550
1551 #[tokio::test]
1552 async fn request_fallback() {
1553 let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1554 let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1555 let protocol_name_2 = ProtocolName::from("/test/another");
1556
1557 let protocol_config_1 = ProtocolConfig {
1558 name: protocol_name_1.clone(),
1559 fallback_names: Vec::new(),
1560 max_request_size: 1024,
1561 max_response_size: 1024 * 1024,
1562 request_timeout: Duration::from_secs(30),
1563 inbound_queue: None,
1564 };
1565 let protocol_config_1_fallback = ProtocolConfig {
1566 name: protocol_name_1_fallback.clone(),
1567 fallback_names: Vec::new(),
1568 max_request_size: 1024,
1569 max_response_size: 1024 * 1024,
1570 request_timeout: Duration::from_secs(30),
1571 inbound_queue: None,
1572 };
1573 let protocol_config_2 = ProtocolConfig {
1574 name: protocol_name_2.clone(),
1575 fallback_names: Vec::new(),
1576 max_request_size: 1024,
1577 max_response_size: 1024 * 1024,
1578 request_timeout: Duration::from_secs(30),
1579 inbound_queue: None,
1580 };
1581
1582 let mut older_swarm = {
1585 let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1586 let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1587 let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1588 protocol_config_1_fallback.inbound_queue = Some(tx_1);
1589
1590 let mut protocol_config_2 = protocol_config_2.clone();
1591 protocol_config_2.inbound_queue = Some(tx_2);
1592
1593 tokio::spawn(async move {
1594 for _ in 0..2 {
1595 if let Some(rq) = rx_1.next().await {
1596 let (fb_tx, fb_rx) = oneshot::channel();
1597 assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1598 let _ = rq.pending_response.send(super::OutgoingResponse {
1599 result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1600 reputation_changes: Vec::new(),
1601 sent_feedback: Some(fb_tx),
1602 });
1603 fb_rx.await.unwrap();
1604 }
1605 }
1606
1607 if let Some(rq) = rx_2.next().await {
1608 let (fb_tx, fb_rx) = oneshot::channel();
1609 assert_eq!(rq.payload, b"request on protocol /test/other");
1610 let _ = rq.pending_response.send(super::OutgoingResponse {
1611 result: Ok(b"this is a response on protocol /test/other".to_vec()),
1612 reputation_changes: Vec::new(),
1613 sent_feedback: Some(fb_tx),
1614 });
1615 fb_rx.await.unwrap();
1616 }
1617 });
1618
1619 build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1620 };
1621
1622 let mut new_swarm = build_swarm(
1624 vec![
1625 protocol_config_1.clone(),
1626 protocol_config_1_fallback.clone(),
1627 protocol_config_2.clone(),
1628 ]
1629 .into_iter(),
1630 );
1631
1632 {
1633 let dial_addr = older_swarm.1.clone();
1634 Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1635 }
1636
1637 tokio::spawn(async move {
1639 loop {
1640 _ = older_swarm.0.select_next_some().await;
1641 }
1642 });
1643
1644 let (mut swarm, _) = new_swarm;
1646 let mut older_peer_id = None;
1647
1648 let mut response_receiver = None;
1649 loop {
1651 match swarm.select_next_some().await {
1652 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1653 older_peer_id = Some(peer_id);
1654 let (sender, receiver) = oneshot::channel();
1655 swarm.behaviour_mut().send_request(
1656 &peer_id,
1657 protocol_name_1.clone(),
1658 b"request on protocol /test/req-resp/2".to_vec(),
1659 Some((
1660 b"request on protocol /test/req-resp/1".to_vec(),
1661 protocol_config_1_fallback.name.clone(),
1662 )),
1663 sender,
1664 IfDisconnected::ImmediateError,
1665 );
1666 response_receiver = Some(receiver);
1667 },
1668 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1669 result.unwrap();
1670 break
1671 },
1672 _ => {},
1673 }
1674 }
1675 assert_eq!(
1676 response_receiver.unwrap().await.unwrap().unwrap(),
1677 (
1678 b"this is a response on protocol /test/req-resp/1".to_vec(),
1679 protocol_name_1_fallback.clone()
1680 )
1681 );
1682 let (sender, response_receiver) = oneshot::channel();
1684 swarm.behaviour_mut().send_request(
1685 older_peer_id.as_ref().unwrap(),
1686 protocol_name_1_fallback.clone(),
1687 b"request on protocol /test/req-resp/1".to_vec(),
1688 Some((
1689 b"dummy request, will fail if processed".to_vec(),
1690 protocol_config_1_fallback.name.clone(),
1691 )),
1692 sender,
1693 IfDisconnected::ImmediateError,
1694 );
1695 loop {
1696 match swarm.select_next_some().await {
1697 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1698 result.unwrap();
1699 break
1700 },
1701 _ => {},
1702 }
1703 }
1704 assert_eq!(
1705 response_receiver.await.unwrap().unwrap(),
1706 (
1707 b"this is a response on protocol /test/req-resp/1".to_vec(),
1708 protocol_name_1_fallback.clone()
1709 )
1710 );
1711 let (sender, response_receiver) = oneshot::channel();
1713 swarm.behaviour_mut().send_request(
1714 older_peer_id.as_ref().unwrap(),
1715 protocol_name_1.clone(),
1716 b"request on protocol /test/req-resp-2".to_vec(),
1717 None,
1718 sender,
1719 IfDisconnected::ImmediateError,
1720 );
1721 loop {
1722 match swarm.select_next_some().await {
1723 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1724 assert_matches!(
1725 result.unwrap_err(),
1726 RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1727 );
1728 break
1729 },
1730 _ => {},
1731 }
1732 }
1733 assert!(response_receiver.await.unwrap().is_err());
1734 let (sender, response_receiver) = oneshot::channel();
1736 swarm.behaviour_mut().send_request(
1737 older_peer_id.as_ref().unwrap(),
1738 protocol_name_2.clone(),
1739 b"request on protocol /test/other".to_vec(),
1740 None,
1741 sender,
1742 IfDisconnected::ImmediateError,
1743 );
1744 loop {
1745 match swarm.select_next_some().await {
1746 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1747 result.unwrap();
1748 break
1749 },
1750 _ => {},
1751 }
1752 }
1753 assert_eq!(
1754 response_receiver.await.unwrap().unwrap(),
1755 (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1756 );
1757 }
1758
1759 #[tokio::test]
1773 async fn enforce_outbound_timeouts() {
1774 const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1775 const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1776
1777 let protocol_name = ProtocolName::from("/test/req-resp/1");
1779
1780 let protocol_config = ProtocolConfig {
1781 name: protocol_name.clone(),
1782 fallback_names: Vec::new(),
1783 max_request_size: 1024,
1784 max_response_size: 1024 * 1024,
1785 request_timeout: REQUEST_TIMEOUT, inbound_queue: None,
1787 };
1788
1789 let (mut first_swarm, _) = {
1791 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1792
1793 tokio::spawn(async move {
1794 if let Some(rq) = rx.next().await {
1795 assert_eq!(rq.payload, b"this is a request");
1796
1797 tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1800
1801 let _ = rq.pending_response.send(super::OutgoingResponse {
1804 result: Ok(b"Second swarm already timedout".to_vec()),
1805 reputation_changes: Vec::new(),
1806 sent_feedback: None,
1807 });
1808 }
1809 });
1810
1811 let mut protocol_config = protocol_config.clone();
1812 protocol_config.inbound_queue = Some(tx);
1813
1814 build_swarm(iter::once(protocol_config))
1815 };
1816
1817 let (mut second_swarm, second_address) = {
1818 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1819
1820 tokio::spawn(async move {
1821 while let Some(rq) = rx.next().await {
1822 let _ = rq.pending_response.send(super::OutgoingResponse {
1823 result: Ok(b"This is the response".to_vec()),
1824 reputation_changes: Vec::new(),
1825 sent_feedback: None,
1826 });
1827 }
1828 });
1829 let mut protocol_config = protocol_config.clone();
1830 protocol_config.inbound_queue = Some(tx);
1831
1832 build_swarm(iter::once(protocol_config.clone()))
1833 };
1834 second_swarm
1836 .behaviour_mut()
1837 .protocols
1838 .get_mut(&protocol_name)
1839 .unwrap()
1840 .request_timeout = REQUEST_TIMEOUT_SHORT;
1841
1842 {
1844 Swarm::dial(&mut first_swarm, second_address).unwrap();
1845 }
1846
1847 tokio::spawn(async move {
1850 loop {
1851 let event = first_swarm.select_next_some().await;
1852 match event {
1853 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1854 assert!(result.is_ok());
1855 break;
1856 },
1857 SwarmEvent::ConnectionClosed { .. } => {
1858 break;
1859 },
1860 _ => {},
1861 }
1862 }
1863 });
1864
1865 let mut response_receiver = None;
1869 loop {
1870 let event = second_swarm.select_next_some().await;
1871
1872 match event {
1873 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1874 let (sender, receiver) = oneshot::channel();
1875 second_swarm.behaviour_mut().send_request(
1876 &peer_id,
1877 protocol_name.clone(),
1878 b"this is a request".to_vec(),
1879 None,
1880 sender,
1881 IfDisconnected::ImmediateError,
1882 );
1883 assert!(response_receiver.is_none());
1884 response_receiver = Some(receiver);
1885 },
1886 SwarmEvent::ConnectionClosed { .. } => {
1887 break;
1888 },
1889 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1890 assert!(result.is_err());
1891 break
1892 },
1893 _ => {},
1894 }
1895 }
1896
1897 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1899 RequestFailure::Network(OutboundFailure::Timeout) => {},
1900 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1901 }
1902 }
1903}