1use crate::{
38 peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39 service::traits::RequestResponseConfig as RequestResponseConfigT,
40 types::ProtocolName,
41 ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46 core::{transport::PortUse, Endpoint, Multiaddr},
47 request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48 swarm::{
49 behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50 NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51 },
52 PeerId,
53};
54
55use std::{
56 collections::{hash_map::Entry, HashMap},
57 io, iter,
58 ops::Deref,
59 pin::Pin,
60 sync::Arc,
61 task::{Context, Poll},
62 time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77 #[error("Failed to dial the requested peer")]
79 DialFailure,
80 #[error("Timeout while waiting for a response")]
82 Timeout,
83 #[error("Connection was closed before a response was received")]
85 ConnectionClosed,
86 #[error("The remote supports none of the requested protocols")]
88 UnsupportedProtocols,
89 #[error("An IO failure happened on an outbound stream")]
91 Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95 fn from(out: request_response::OutboundFailure) -> Self {
96 match out {
97 request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98 request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99 request_response::OutboundFailure::ConnectionClosed => {
100 OutboundFailure::ConnectionClosed
101 },
102 request_response::OutboundFailure::UnsupportedProtocols => {
103 OutboundFailure::UnsupportedProtocols
104 },
105 request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
106 }
107 }
108}
109
110#[derive(Debug, thiserror::Error)]
113pub enum InboundFailure {
114 #[error("Timeout while receiving request or sending response")]
117 Timeout,
118 #[error("Connection was closed before a response could be sent")]
120 ConnectionClosed,
121 #[error("The local peer supports none of the protocols requested by the remote")]
123 UnsupportedProtocols,
124 #[error("The response channel was dropped without sending a response to the remote")]
126 ResponseOmission,
127 #[error("An IO failure happened on an inbound stream")]
129 Io(Arc<io::Error>),
130}
131
132impl From<request_response::InboundFailure> for InboundFailure {
133 fn from(out: request_response::InboundFailure) -> Self {
134 match out {
135 request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
136 request_response::InboundFailure::Timeout => InboundFailure::Timeout,
137 request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
138 request_response::InboundFailure::UnsupportedProtocols => {
139 InboundFailure::UnsupportedProtocols
140 },
141 request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
142 }
143 }
144}
145
146#[derive(Debug, thiserror::Error)]
148#[allow(missing_docs)]
149pub enum RequestFailure {
150 #[error("We are not currently connected to the requested peer.")]
151 NotConnected,
152 #[error("Given protocol hasn't been registered.")]
153 UnknownProtocol,
154 #[error("The outbound request payload or parameters are invalid for the selected protocol.")]
155 InvalidRequest,
156 #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
157 Refused,
158 #[error("The remote replied, but the local node is no longer interested in the response.")]
159 Obsolete,
160 #[error("Problem on the network: {0}")]
161 Network(OutboundFailure),
162}
163
164#[derive(Debug, Clone)]
166pub struct ProtocolConfig {
167 pub name: ProtocolName,
169
170 pub fallback_names: Vec<ProtocolName>,
172
173 pub max_request_size: u64,
178
179 pub max_response_size: u64,
184
185 pub request_timeout: Duration,
189
190 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
211}
212
213impl RequestResponseConfigT for ProtocolConfig {
214 fn protocol_name(&self) -> &ProtocolName {
215 &self.name
216 }
217}
218
219#[derive(Debug)]
221pub struct IncomingRequest {
222 pub peer: sc_network_types::PeerId,
224
225 pub payload: Vec<u8>,
228
229 pub pending_response: oneshot::Sender<OutgoingResponse>,
238}
239
240#[derive(Debug)]
242pub struct OutgoingResponse {
243 pub result: Result<Vec<u8>, ()>,
247
248 pub reputation_changes: Vec<ReputationChange>,
251
252 pub sent_feedback: Option<oneshot::Sender<()>>,
261}
262
263struct PendingRequest {
265 started_at: Instant,
267 response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
272 fallback_request: Option<(Vec<u8>, ProtocolName)>,
274}
275
276#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
278pub enum IfDisconnected {
279 TryConnect,
281 ImmediateError,
283}
284
285impl IfDisconnected {
287 pub fn should_connect(self) -> bool {
289 match self {
290 Self::TryConnect => true,
291 Self::ImmediateError => false,
292 }
293 }
294}
295
296#[derive(Debug)]
298pub enum Event {
299 InboundRequest {
303 peer: PeerId,
305 protocol: ProtocolName,
307 result: Result<Duration, ResponseFailure>,
312 },
313
314 RequestFinished {
319 peer: PeerId,
321 protocol: ProtocolName,
323 duration: Duration,
325 result: Result<(), RequestFailure>,
327 },
328
329 ReputationChanges {
331 peer: PeerId,
333 changes: Vec<ReputationChange>,
335 },
336}
337
338#[derive(Debug, Clone, PartialEq, Eq, Hash)]
345struct ProtocolRequestId<RequestId> {
346 protocol: ProtocolName,
347 request_id: RequestId,
348}
349
350impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
351 fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
352 Self { protocol, request_id }
353 }
354}
355
356struct ProtocolDetails {
358 behaviour: Behaviour<GenericCodec>,
359 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
360 request_timeout: Duration,
361}
362
363pub struct RequestResponsesBehaviour {
365 protocols: HashMap<ProtocolName, ProtocolDetails>,
370
371 pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
373
374 pending_responses: stream::FuturesUnordered<
377 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
378 >,
379
380 pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
382
383 send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
386
387 peer_store: Arc<dyn PeerStoreProvider>,
389
390 periodic_request_check: tokio::time::Interval,
397}
398
399struct RequestProcessingOutcome {
401 peer: PeerId,
402 request_id: InboundRequestId,
403 protocol: ProtocolName,
404 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
405 response: OutgoingResponse,
406}
407
408impl RequestResponsesBehaviour {
409 pub fn new(
412 list: impl Iterator<Item = ProtocolConfig>,
413 peer_store: Arc<dyn PeerStoreProvider>,
414 ) -> Result<Self, RegisterError> {
415 let mut protocols = HashMap::new();
416 for protocol in list {
417 let cfg = Config::default().with_request_timeout(protocol.request_timeout);
418
419 let protocol_support = if protocol.inbound_queue.is_some() {
420 ProtocolSupport::Full
421 } else {
422 ProtocolSupport::Outbound
423 };
424
425 let behaviour = Behaviour::with_codec(
426 GenericCodec {
427 max_request_size: protocol.max_request_size,
428 max_response_size: protocol.max_response_size,
429 },
430 iter::once(protocol.name.clone())
431 .chain(protocol.fallback_names)
432 .zip(iter::repeat(protocol_support)),
433 cfg,
434 );
435
436 match protocols.entry(protocol.name) {
437 Entry::Vacant(e) => e.insert(ProtocolDetails {
438 behaviour,
439 inbound_queue: protocol.inbound_queue,
440 request_timeout: protocol.request_timeout,
441 }),
442 Entry::Occupied(e) => {
443 return Err(RegisterError::DuplicateProtocol(e.key().clone()))
444 },
445 };
446 }
447
448 Ok(Self {
449 protocols,
450 pending_requests: Default::default(),
451 pending_responses: Default::default(),
452 pending_responses_arrival_time: Default::default(),
453 send_feedback: Default::default(),
454 peer_store,
455 periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
456 })
457 }
458
459 pub fn send_request(
466 &mut self,
467 target: &PeerId,
468 protocol_name: ProtocolName,
469 request: Vec<u8>,
470 fallback_request: Option<(Vec<u8>, ProtocolName)>,
471 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
472 connect: IfDisconnected,
473 ) {
474 log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
475
476 if let Some(ProtocolDetails { behaviour, .. }) =
477 self.protocols.get_mut(protocol_name.deref())
478 {
479 Self::send_request_inner(
480 behaviour,
481 &mut self.pending_requests,
482 target,
483 protocol_name,
484 request,
485 fallback_request,
486 pending_response,
487 connect,
488 )
489 } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
490 log::debug!(
491 target: LOG_TARGET,
492 "Unknown protocol {:?}. At the same time local \
493 node is no longer interested in the result.",
494 protocol_name,
495 );
496 }
497 }
498
499 fn send_request_inner(
500 behaviour: &mut Behaviour<GenericCodec>,
501 pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
502 target: &PeerId,
503 protocol_name: ProtocolName,
504 request: Vec<u8>,
505 fallback_request: Option<(Vec<u8>, ProtocolName)>,
506 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
507 connect: IfDisconnected,
508 ) {
509 if behaviour.is_connected(target) || connect.should_connect() {
510 let request_id = behaviour.send_request(target, request);
511 let prev_req_id = pending_requests.insert(
512 (protocol_name.to_string().into(), request_id).into(),
513 PendingRequest {
514 started_at: Instant::now(),
515 response_tx: Some(pending_response),
516 fallback_request,
517 },
518 );
519 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
520 } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
521 log::debug!(
522 target: LOG_TARGET,
523 "Not connected to peer {:?}. At the same time local \
524 node is no longer interested in the result.",
525 target,
526 );
527 }
528 }
529}
530
531impl NetworkBehaviour for RequestResponsesBehaviour {
532 type ConnectionHandler =
533 MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
534 type ToSwarm = Event;
535
536 fn handle_pending_inbound_connection(
537 &mut self,
538 _connection_id: ConnectionId,
539 _local_addr: &Multiaddr,
540 _remote_addr: &Multiaddr,
541 ) -> Result<(), ConnectionDenied> {
542 Ok(())
543 }
544
545 fn handle_pending_outbound_connection(
546 &mut self,
547 _connection_id: ConnectionId,
548 _maybe_peer: Option<PeerId>,
549 _addresses: &[Multiaddr],
550 _effective_role: Endpoint,
551 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
552 Ok(Vec::new())
553 }
554
555 fn handle_established_inbound_connection(
556 &mut self,
557 connection_id: ConnectionId,
558 peer: PeerId,
559 local_addr: &Multiaddr,
560 remote_addr: &Multiaddr,
561 ) -> Result<THandler<Self>, ConnectionDenied> {
562 let iter =
563 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
564 if let Ok(handler) = behaviour.handle_established_inbound_connection(
565 connection_id,
566 peer,
567 local_addr,
568 remote_addr,
569 ) {
570 Some((p.to_string(), handler))
571 } else {
572 None
573 }
574 });
575
576 Ok(MultiHandler::try_from_iter(iter).expect(
577 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
578 which is the only possible error; qed",
579 ))
580 }
581
582 fn handle_established_outbound_connection(
583 &mut self,
584 connection_id: ConnectionId,
585 peer: PeerId,
586 addr: &Multiaddr,
587 role_override: Endpoint,
588 port_use: PortUse,
589 ) -> Result<THandler<Self>, ConnectionDenied> {
590 let iter =
591 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
592 if let Ok(handler) = behaviour.handle_established_outbound_connection(
593 connection_id,
594 peer,
595 addr,
596 role_override,
597 port_use,
598 ) {
599 Some((p.to_string(), handler))
600 } else {
601 None
602 }
603 });
604
605 Ok(MultiHandler::try_from_iter(iter).expect(
606 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
607 which is the only possible error; qed",
608 ))
609 }
610
611 fn on_swarm_event(&mut self, event: FromSwarm) {
612 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
613 behaviour.on_swarm_event(event);
614 }
615 }
616
617 fn on_connection_handler_event(
618 &mut self,
619 peer_id: PeerId,
620 connection_id: ConnectionId,
621 event: THandlerOutEvent<Self>,
622 ) {
623 let p_name = event.0;
624 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
625 return behaviour.on_connection_handler_event(peer_id, connection_id, event.1);
626 } else {
627 log::warn!(
628 target: LOG_TARGET,
629 "on_connection_handler_event: no request-response instance registered for protocol {:?}",
630 p_name
631 );
632 }
633 }
634
635 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
636 'poll_all: loop {
637 if self.periodic_request_check.poll_tick(cx).is_ready() {
639 self.pending_requests.retain(|id, req| {
640 let Some(ProtocolDetails { request_timeout, .. }) =
641 self.protocols.get(&id.protocol)
642 else {
643 log::warn!(
644 target: LOG_TARGET,
645 "Request {id:?} has no protocol registered.",
646 );
647
648 if let Some(response_tx) = req.response_tx.take() {
649 if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
650 log::debug!(
651 target: LOG_TARGET,
652 "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
653 );
654 }
655 }
656 return false
657 };
658
659 let elapsed = req.started_at.elapsed();
660 if elapsed > *request_timeout {
661 log::debug!(
662 target: LOG_TARGET,
663 "Request {id:?} force detected as timeout.",
664 );
665
666 if let Some(response_tx) = req.response_tx.take() {
667 if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
668 log::debug!(
669 target: LOG_TARGET,
670 "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
671 );
672 }
673 }
674
675 false
676 } else {
677 true
678 }
679 });
680 }
681
682 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
684 let RequestProcessingOutcome {
685 peer,
686 request_id,
687 protocol: protocol_name,
688 inner_channel,
689 response: OutgoingResponse { result, reputation_changes, sent_feedback },
690 } = match outcome {
691 Some(outcome) => outcome,
692 None => continue,
695 };
696
697 if let Ok(payload) = result {
698 if let Some(ProtocolDetails { behaviour, .. }) =
699 self.protocols.get_mut(&*protocol_name)
700 {
701 log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
702
703 if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
704 log::debug!(
707 target: LOG_TARGET,
708 "Failed to send response for {:?} on protocol {:?} due to a \
709 timeout or due to the connection to the peer being closed. \
710 Dropping response",
711 request_id, protocol_name,
712 );
713 } else if let Some(sent_feedback) = sent_feedback {
714 self.send_feedback
715 .insert((protocol_name, request_id).into(), sent_feedback);
716 }
717 }
718 }
719
720 if !reputation_changes.is_empty() {
721 return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
722 peer,
723 changes: reputation_changes,
724 }));
725 }
726 }
727
728 let mut fallback_requests = vec![];
729
730 for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
732 {
733 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
734 let ev = match ev {
735 ToSwarm::GenerateEvent(ev) => ev,
737
738 ToSwarm::Dial { opts } => {
741 if opts.get_peer_id().is_none() {
742 log::error!(
743 target: LOG_TARGET,
744 "The request-response isn't supposed to start dialing addresses"
745 );
746 }
747 return Poll::Ready(ToSwarm::Dial { opts });
748 },
749 event => {
750 return Poll::Ready(
751 event.map_in(|event| ((*protocol).to_string(), event)).map_out(
752 |_| {
753 unreachable!(
754 "`GenerateEvent` is handled in a branch above; qed"
755 )
756 },
757 ),
758 );
759 },
760 };
761
762 match ev {
763 request_response::Event::Message {
765 peer,
766 message: Message::Request { request_id, request, channel, .. },
767 } => {
768 self.pending_responses_arrival_time
769 .insert((protocol.clone(), request_id).into(), Instant::now());
770
771 let reputation = self.peer_store.peer_reputation(&peer.into());
772
773 if reputation < BANNED_THRESHOLD {
774 log::debug!(
775 target: LOG_TARGET,
776 "Cannot handle requests from a node with a low reputation {}: {}",
777 peer,
778 reputation,
779 );
780 continue 'poll_protocol;
781 }
782
783 let (tx, rx) = oneshot::channel();
784
785 if let Some(resp_builder) = inbound_queue {
788 let _ = resp_builder.try_send(IncomingRequest {
795 peer: peer.into(),
796 payload: request,
797 pending_response: tx,
798 });
799 } else {
800 debug_assert!(false, "Received message on outbound-only protocol.");
801 }
802
803 let protocol = protocol.clone();
804
805 self.pending_responses.push(Box::pin(async move {
806 rx.await.map_or(None, |response| {
810 Some(RequestProcessingOutcome {
811 peer,
812 request_id,
813 protocol,
814 inner_channel: channel,
815 response,
816 })
817 })
818 }));
819
820 continue 'poll_all;
823 },
824
825 request_response::Event::Message {
827 peer,
828 message: Message::Response { request_id, response },
829 ..
830 } => {
831 let (started, delivered) = match self
832 .pending_requests
833 .remove(&(protocol.clone(), request_id).into())
834 {
835 Some(PendingRequest {
836 started_at,
837 response_tx: Some(response_tx),
838 ..
839 }) => {
840 log::trace!(
841 target: LOG_TARGET,
842 "received response from {peer} ({protocol:?}), {} bytes",
843 response.as_ref().map_or(0usize, |response| response.len()),
844 );
845
846 let delivered = response_tx
847 .send(
848 response
849 .map_err(|()| RequestFailure::Refused)
850 .map(|resp| (resp, protocol.clone())),
851 )
852 .map_err(|_| RequestFailure::Obsolete);
853 (started_at, delivered)
854 },
855 _ => {
856 log::debug!(
857 target: LOG_TARGET,
858 "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
859 request_id,
860 peer,
861 );
862 continue;
863 },
864 };
865
866 let out = Event::RequestFinished {
867 peer,
868 protocol: protocol.clone(),
869 duration: started.elapsed(),
870 result: delivered,
871 };
872
873 return Poll::Ready(ToSwarm::GenerateEvent(out));
874 },
875
876 request_response::Event::OutboundFailure {
878 peer,
879 request_id,
880 error,
881 ..
882 } => {
883 let error = OutboundFailure::from(error);
884 let started = match self
885 .pending_requests
886 .remove(&(protocol.clone(), request_id).into())
887 {
888 Some(PendingRequest {
889 started_at,
890 response_tx: Some(response_tx),
891 fallback_request,
892 }) => {
893 if matches!(error, OutboundFailure::UnsupportedProtocols) {
896 if let Some((fallback_request, fallback_protocol)) =
897 fallback_request
898 {
899 log::trace!(
900 target: LOG_TARGET,
901 "Request with id {:?} failed. Trying the fallback protocol. {}",
902 request_id,
903 fallback_protocol.deref()
904 );
905 fallback_requests.push((
906 peer,
907 fallback_protocol,
908 fallback_request,
909 response_tx,
910 ));
911 continue;
912 }
913 }
914
915 if response_tx
916 .send(Err(RequestFailure::Network(error.clone())))
917 .is_err()
918 {
919 log::debug!(
920 target: LOG_TARGET,
921 "Request with id {:?} failed. At the same time local \
922 node is no longer interested in the result.",
923 request_id,
924 );
925 }
926 started_at
927 },
928 _ => {
929 log::debug!(
930 target: LOG_TARGET,
931 "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
932 request_id,
933 error,
934 peer
935 );
936 continue;
937 },
938 };
939
940 let out = Event::RequestFinished {
941 peer,
942 protocol: protocol.clone(),
943 duration: started.elapsed(),
944 result: Err(RequestFailure::Network(error)),
945 };
946
947 return Poll::Ready(ToSwarm::GenerateEvent(out));
948 },
949
950 request_response::Event::InboundFailure {
953 request_id, peer, error, ..
954 } => {
955 self.pending_responses_arrival_time
956 .remove(&(protocol.clone(), request_id).into());
957 self.send_feedback.remove(&(protocol.clone(), request_id).into());
958 let out = Event::InboundRequest {
959 peer,
960 protocol: protocol.clone(),
961 result: Err(ResponseFailure::Network(error.into())),
962 };
963 return Poll::Ready(ToSwarm::GenerateEvent(out));
964 },
965
966 request_response::Event::ResponseSent { request_id, peer } => {
968 let arrival_time = self
969 .pending_responses_arrival_time
970 .remove(&(protocol.clone(), request_id).into())
971 .map(|t| t.elapsed())
972 .expect(
973 "Time is added for each inbound request on arrival and only \
974 removed on success (`ResponseSent`) or failure \
975 (`InboundFailure`). One can not receive a success event for a \
976 request that either never arrived, or that has previously \
977 failed; qed.",
978 );
979
980 if let Some(send_feedback) =
981 self.send_feedback.remove(&(protocol.clone(), request_id).into())
982 {
983 let _ = send_feedback.send(());
984 }
985
986 let out = Event::InboundRequest {
987 peer,
988 protocol: protocol.clone(),
989 result: Ok(arrival_time),
990 };
991
992 return Poll::Ready(ToSwarm::GenerateEvent(out));
993 },
994 };
995 }
996 }
997
998 for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
1000 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
1001 Self::send_request_inner(
1002 behaviour,
1003 &mut self.pending_requests,
1004 &peer,
1005 protocol,
1006 request,
1007 None,
1008 pending_response,
1009 IfDisconnected::ImmediateError,
1013 );
1014 }
1015 }
1016
1017 break Poll::Pending;
1018 }
1019 }
1020}
1021
1022#[derive(Debug, thiserror::Error)]
1024pub enum RegisterError {
1025 #[error("{0}")]
1027 DuplicateProtocol(ProtocolName),
1028}
1029
1030#[derive(Debug, thiserror::Error)]
1032pub enum ResponseFailure {
1033 #[error("Problem on the network: {0}")]
1035 Network(InboundFailure),
1036}
1037
1038#[derive(Debug, Clone)]
1041#[doc(hidden)] pub struct GenericCodec {
1043 max_request_size: u64,
1044 max_response_size: u64,
1045}
1046
1047#[async_trait::async_trait]
1048impl Codec for GenericCodec {
1049 type Protocol = ProtocolName;
1050 type Request = Vec<u8>;
1051 type Response = Result<Vec<u8>, ()>;
1052
1053 async fn read_request<T>(
1054 &mut self,
1055 _: &Self::Protocol,
1056 mut io: &mut T,
1057 ) -> io::Result<Self::Request>
1058 where
1059 T: AsyncRead + Unpin + Send,
1060 {
1061 let length = unsigned_varint::aio::read_usize(&mut io)
1063 .await
1064 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1065 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1066 return Err(io::Error::new(
1067 io::ErrorKind::InvalidInput,
1068 format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1069 ));
1070 }
1071
1072 let mut buffer = vec![0; length];
1074 io.read_exact(&mut buffer).await?;
1075 Ok(buffer)
1076 }
1077
1078 async fn read_response<T>(
1079 &mut self,
1080 _: &Self::Protocol,
1081 mut io: &mut T,
1082 ) -> io::Result<Self::Response>
1083 where
1084 T: AsyncRead + Unpin + Send,
1085 {
1086 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1093 Ok(l) => l,
1094 Err(unsigned_varint::io::ReadError::Io(err))
1095 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1096 {
1097 return Ok(Err(()))
1098 },
1099 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1100 };
1101
1102 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1103 return Err(io::Error::new(
1104 io::ErrorKind::InvalidInput,
1105 format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1106 ));
1107 }
1108
1109 let mut buffer = vec![0; length];
1111 io.read_exact(&mut buffer).await?;
1112 Ok(Ok(buffer))
1113 }
1114
1115 async fn write_request<T>(
1116 &mut self,
1117 _: &Self::Protocol,
1118 io: &mut T,
1119 req: Self::Request,
1120 ) -> io::Result<()>
1121 where
1122 T: AsyncWrite + Unpin + Send,
1123 {
1124 {
1127 let mut buffer = unsigned_varint::encode::usize_buffer();
1128 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1129 }
1130
1131 io.write_all(&req).await?;
1133
1134 io.close().await?;
1135 Ok(())
1136 }
1137
1138 async fn write_response<T>(
1139 &mut self,
1140 _: &Self::Protocol,
1141 io: &mut T,
1142 res: Self::Response,
1143 ) -> io::Result<()>
1144 where
1145 T: AsyncWrite + Unpin + Send,
1146 {
1147 if let Ok(res) = res {
1149 {
1152 let mut buffer = unsigned_varint::encode::usize_buffer();
1153 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1154 }
1155
1156 io.write_all(&res).await?;
1158 }
1159
1160 io.close().await?;
1161 Ok(())
1162 }
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167 use super::*;
1168
1169 use crate::mock::MockPeerStore;
1170 use assert_matches::assert_matches;
1171 use futures::channel::oneshot;
1172 use libp2p::{
1173 core::{
1174 transport::{MemoryTransport, Transport},
1175 upgrade,
1176 },
1177 identity::Keypair,
1178 noise,
1179 swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1180 Multiaddr,
1181 };
1182 use std::{iter, time::Duration};
1183
1184 struct TokioExecutor;
1185 impl Executor for TokioExecutor {
1186 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1187 tokio::spawn(f);
1188 }
1189 }
1190
1191 fn build_swarm(
1192 list: impl Iterator<Item = ProtocolConfig>,
1193 ) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1194 let keypair = Keypair::generate_ed25519();
1195
1196 let transport = MemoryTransport::new()
1197 .upgrade(upgrade::Version::V1)
1198 .authenticate(noise::Config::new(&keypair).unwrap())
1199 .multiplex(libp2p::yamux::Config::default())
1200 .boxed();
1201
1202 let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1203
1204 let mut swarm = Swarm::new(
1205 transport,
1206 behaviour,
1207 keypair.public().to_peer_id(),
1208 SwarmConfig::with_executor(TokioExecutor {})
1209 .with_idle_connection_timeout(Duration::from_secs(10)),
1212 );
1213
1214 let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1215
1216 swarm.listen_on(listen_addr.clone()).unwrap();
1217
1218 (swarm, listen_addr)
1219 }
1220
1221 #[tokio::test]
1222 async fn basic_request_response_works() {
1223 let protocol_name = ProtocolName::from("/test/req-resp/1");
1224
1225 let mut swarms = (0..2)
1227 .map(|_| {
1228 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1229
1230 tokio::spawn(async move {
1231 while let Some(rq) = rx.next().await {
1232 let (fb_tx, fb_rx) = oneshot::channel();
1233 assert_eq!(rq.payload, b"this is a request");
1234 let _ = rq.pending_response.send(super::OutgoingResponse {
1235 result: Ok(b"this is a response".to_vec()),
1236 reputation_changes: Vec::new(),
1237 sent_feedback: Some(fb_tx),
1238 });
1239 fb_rx.await.unwrap();
1240 }
1241 });
1242
1243 let protocol_config = ProtocolConfig {
1244 name: protocol_name.clone(),
1245 fallback_names: Vec::new(),
1246 max_request_size: 1024,
1247 max_response_size: 1024 * 1024,
1248 request_timeout: Duration::from_secs(30),
1249 inbound_queue: Some(tx),
1250 };
1251
1252 build_swarm(iter::once(protocol_config))
1253 })
1254 .collect::<Vec<_>>();
1255
1256 {
1259 let dial_addr = swarms[1].1.clone();
1260 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1261 }
1262
1263 let (mut swarm, _) = swarms.remove(0);
1264 tokio::spawn(async move {
1266 loop {
1267 match swarm.select_next_some().await {
1268 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1269 result.unwrap();
1270 },
1271 _ => {},
1272 }
1273 }
1274 });
1275
1276 let (mut swarm, _) = swarms.remove(0);
1278 let mut response_receiver = None;
1279
1280 loop {
1281 match swarm.select_next_some().await {
1282 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1283 let (sender, receiver) = oneshot::channel();
1284 swarm.behaviour_mut().send_request(
1285 &peer_id,
1286 protocol_name.clone(),
1287 b"this is a request".to_vec(),
1288 None,
1289 sender,
1290 IfDisconnected::ImmediateError,
1291 );
1292 assert!(response_receiver.is_none());
1293 response_receiver = Some(receiver);
1294 },
1295 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1296 result.unwrap();
1297 break;
1298 },
1299 _ => {},
1300 }
1301 }
1302
1303 assert_eq!(
1304 response_receiver.unwrap().await.unwrap().unwrap(),
1305 (b"this is a response".to_vec(), protocol_name)
1306 );
1307 }
1308
1309 #[tokio::test]
1310 async fn max_response_size_exceeded() {
1311 let protocol_name = ProtocolName::from("/test/req-resp/1");
1312
1313 let mut swarms = (0..2)
1315 .map(|_| {
1316 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1317
1318 tokio::spawn(async move {
1319 while let Some(rq) = rx.next().await {
1320 assert_eq!(rq.payload, b"this is a request");
1321 let _ = rq.pending_response.send(super::OutgoingResponse {
1322 result: Ok(b"this response exceeds the limit".to_vec()),
1323 reputation_changes: Vec::new(),
1324 sent_feedback: None,
1325 });
1326 }
1327 });
1328
1329 let protocol_config = ProtocolConfig {
1330 name: protocol_name.clone(),
1331 fallback_names: Vec::new(),
1332 max_request_size: 1024,
1333 max_response_size: 8, request_timeout: Duration::from_secs(30),
1335 inbound_queue: Some(tx),
1336 };
1337
1338 build_swarm(iter::once(protocol_config))
1339 })
1340 .collect::<Vec<_>>();
1341
1342 {
1345 let dial_addr = swarms[1].1.clone();
1346 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1347 }
1348
1349 let (mut swarm, _) = swarms.remove(0);
1352 tokio::spawn(async move {
1353 loop {
1354 match swarm.select_next_some().await {
1355 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1356 assert!(result.is_ok());
1357 },
1358 SwarmEvent::ConnectionClosed { .. } => {
1359 break;
1360 },
1361 _ => {},
1362 }
1363 }
1364 });
1365
1366 let (mut swarm, _) = swarms.remove(0);
1368
1369 let mut response_receiver = None;
1370
1371 loop {
1372 match swarm.select_next_some().await {
1373 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1374 let (sender, receiver) = oneshot::channel();
1375 swarm.behaviour_mut().send_request(
1376 &peer_id,
1377 protocol_name.clone(),
1378 b"this is a request".to_vec(),
1379 None,
1380 sender,
1381 IfDisconnected::ImmediateError,
1382 );
1383 assert!(response_receiver.is_none());
1384 response_receiver = Some(receiver);
1385 },
1386 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1387 assert!(result.is_err());
1388 break;
1389 },
1390 _ => {},
1391 }
1392 }
1393
1394 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1395 RequestFailure::Network(OutboundFailure::Io(_)) => {},
1396 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1397 }
1398 }
1399
1400 #[tokio::test]
1411 async fn request_id_collision() {
1412 let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1413 let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1414
1415 let mut swarm_1 = {
1416 let protocol_configs = vec![
1417 ProtocolConfig {
1418 name: protocol_name_1.clone(),
1419 fallback_names: Vec::new(),
1420 max_request_size: 1024,
1421 max_response_size: 1024 * 1024,
1422 request_timeout: Duration::from_secs(30),
1423 inbound_queue: None,
1424 },
1425 ProtocolConfig {
1426 name: protocol_name_2.clone(),
1427 fallback_names: Vec::new(),
1428 max_request_size: 1024,
1429 max_response_size: 1024 * 1024,
1430 request_timeout: Duration::from_secs(30),
1431 inbound_queue: None,
1432 },
1433 ];
1434
1435 build_swarm(protocol_configs.into_iter()).0
1436 };
1437
1438 let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1439 let (tx_1, rx_1) = async_channel::bounded(64);
1440 let (tx_2, rx_2) = async_channel::bounded(64);
1441
1442 let protocol_configs = vec![
1443 ProtocolConfig {
1444 name: protocol_name_1.clone(),
1445 fallback_names: Vec::new(),
1446 max_request_size: 1024,
1447 max_response_size: 1024 * 1024,
1448 request_timeout: Duration::from_secs(30),
1449 inbound_queue: Some(tx_1),
1450 },
1451 ProtocolConfig {
1452 name: protocol_name_2.clone(),
1453 fallback_names: Vec::new(),
1454 max_request_size: 1024,
1455 max_response_size: 1024 * 1024,
1456 request_timeout: Duration::from_secs(30),
1457 inbound_queue: Some(tx_2),
1458 },
1459 ];
1460
1461 let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1462
1463 (swarm, rx_1, rx_2, listen_addr)
1464 };
1465
1466 swarm_1.dial(listen_add_2).unwrap();
1469
1470 tokio::spawn(async move {
1472 loop {
1473 match swarm_2.select_next_some().await {
1474 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1475 result.unwrap();
1476 },
1477 _ => {},
1478 }
1479 }
1480 });
1481
1482 tokio::spawn(async move {
1487 let protocol_1_request = swarm_2_handler_1.next().await;
1488 let protocol_2_request = swarm_2_handler_2.next().await;
1489
1490 protocol_1_request
1491 .unwrap()
1492 .pending_response
1493 .send(OutgoingResponse {
1494 result: Ok(b"this is a response".to_vec()),
1495 reputation_changes: Vec::new(),
1496 sent_feedback: None,
1497 })
1498 .unwrap();
1499 protocol_2_request
1500 .unwrap()
1501 .pending_response
1502 .send(OutgoingResponse {
1503 result: Ok(b"this is a response".to_vec()),
1504 reputation_changes: Vec::new(),
1505 sent_feedback: None,
1506 })
1507 .unwrap();
1508 });
1509
1510 let mut response_receivers = None;
1513 let mut num_responses = 0;
1514
1515 loop {
1516 match swarm_1.select_next_some().await {
1517 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1518 let (sender_1, receiver_1) = oneshot::channel();
1519 let (sender_2, receiver_2) = oneshot::channel();
1520 swarm_1.behaviour_mut().send_request(
1521 &peer_id,
1522 protocol_name_1.clone(),
1523 b"this is a request".to_vec(),
1524 None,
1525 sender_1,
1526 IfDisconnected::ImmediateError,
1527 );
1528 swarm_1.behaviour_mut().send_request(
1529 &peer_id,
1530 protocol_name_2.clone(),
1531 b"this is a request".to_vec(),
1532 None,
1533 sender_2,
1534 IfDisconnected::ImmediateError,
1535 );
1536 assert!(response_receivers.is_none());
1537 response_receivers = Some((receiver_1, receiver_2));
1538 },
1539 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1540 num_responses += 1;
1541 result.unwrap();
1542 if num_responses == 2 {
1543 break;
1544 }
1545 },
1546 _ => {},
1547 }
1548 }
1549 let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1550 assert_eq!(
1551 response_receiver_1.await.unwrap().unwrap(),
1552 (b"this is a response".to_vec(), protocol_name_1)
1553 );
1554 assert_eq!(
1555 response_receiver_2.await.unwrap().unwrap(),
1556 (b"this is a response".to_vec(), protocol_name_2)
1557 );
1558 }
1559
1560 #[tokio::test]
1561 async fn request_fallback() {
1562 let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1563 let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1564 let protocol_name_2 = ProtocolName::from("/test/another");
1565
1566 let protocol_config_1 = ProtocolConfig {
1567 name: protocol_name_1.clone(),
1568 fallback_names: Vec::new(),
1569 max_request_size: 1024,
1570 max_response_size: 1024 * 1024,
1571 request_timeout: Duration::from_secs(30),
1572 inbound_queue: None,
1573 };
1574 let protocol_config_1_fallback = ProtocolConfig {
1575 name: protocol_name_1_fallback.clone(),
1576 fallback_names: Vec::new(),
1577 max_request_size: 1024,
1578 max_response_size: 1024 * 1024,
1579 request_timeout: Duration::from_secs(30),
1580 inbound_queue: None,
1581 };
1582 let protocol_config_2 = ProtocolConfig {
1583 name: protocol_name_2.clone(),
1584 fallback_names: Vec::new(),
1585 max_request_size: 1024,
1586 max_response_size: 1024 * 1024,
1587 request_timeout: Duration::from_secs(30),
1588 inbound_queue: None,
1589 };
1590
1591 let mut older_swarm = {
1594 let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1595 let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1596 let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1597 protocol_config_1_fallback.inbound_queue = Some(tx_1);
1598
1599 let mut protocol_config_2 = protocol_config_2.clone();
1600 protocol_config_2.inbound_queue = Some(tx_2);
1601
1602 tokio::spawn(async move {
1603 for _ in 0..2 {
1604 if let Some(rq) = rx_1.next().await {
1605 let (fb_tx, fb_rx) = oneshot::channel();
1606 assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1607 let _ = rq.pending_response.send(super::OutgoingResponse {
1608 result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1609 reputation_changes: Vec::new(),
1610 sent_feedback: Some(fb_tx),
1611 });
1612 fb_rx.await.unwrap();
1613 }
1614 }
1615
1616 if let Some(rq) = rx_2.next().await {
1617 let (fb_tx, fb_rx) = oneshot::channel();
1618 assert_eq!(rq.payload, b"request on protocol /test/other");
1619 let _ = rq.pending_response.send(super::OutgoingResponse {
1620 result: Ok(b"this is a response on protocol /test/other".to_vec()),
1621 reputation_changes: Vec::new(),
1622 sent_feedback: Some(fb_tx),
1623 });
1624 fb_rx.await.unwrap();
1625 }
1626 });
1627
1628 build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1629 };
1630
1631 let mut new_swarm = build_swarm(
1633 vec![
1634 protocol_config_1.clone(),
1635 protocol_config_1_fallback.clone(),
1636 protocol_config_2.clone(),
1637 ]
1638 .into_iter(),
1639 );
1640
1641 {
1642 let dial_addr = older_swarm.1.clone();
1643 Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1644 }
1645
1646 tokio::spawn(async move {
1648 loop {
1649 _ = older_swarm.0.select_next_some().await;
1650 }
1651 });
1652
1653 let (mut swarm, _) = new_swarm;
1655 let mut older_peer_id = None;
1656
1657 let mut response_receiver = None;
1658 loop {
1660 match swarm.select_next_some().await {
1661 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1662 older_peer_id = Some(peer_id);
1663 let (sender, receiver) = oneshot::channel();
1664 swarm.behaviour_mut().send_request(
1665 &peer_id,
1666 protocol_name_1.clone(),
1667 b"request on protocol /test/req-resp/2".to_vec(),
1668 Some((
1669 b"request on protocol /test/req-resp/1".to_vec(),
1670 protocol_config_1_fallback.name.clone(),
1671 )),
1672 sender,
1673 IfDisconnected::ImmediateError,
1674 );
1675 response_receiver = Some(receiver);
1676 },
1677 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1678 result.unwrap();
1679 break;
1680 },
1681 _ => {},
1682 }
1683 }
1684 assert_eq!(
1685 response_receiver.unwrap().await.unwrap().unwrap(),
1686 (
1687 b"this is a response on protocol /test/req-resp/1".to_vec(),
1688 protocol_name_1_fallback.clone()
1689 )
1690 );
1691 let (sender, response_receiver) = oneshot::channel();
1693 swarm.behaviour_mut().send_request(
1694 older_peer_id.as_ref().unwrap(),
1695 protocol_name_1_fallback.clone(),
1696 b"request on protocol /test/req-resp/1".to_vec(),
1697 Some((
1698 b"dummy request, will fail if processed".to_vec(),
1699 protocol_config_1_fallback.name.clone(),
1700 )),
1701 sender,
1702 IfDisconnected::ImmediateError,
1703 );
1704 loop {
1705 match swarm.select_next_some().await {
1706 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1707 result.unwrap();
1708 break;
1709 },
1710 _ => {},
1711 }
1712 }
1713 assert_eq!(
1714 response_receiver.await.unwrap().unwrap(),
1715 (
1716 b"this is a response on protocol /test/req-resp/1".to_vec(),
1717 protocol_name_1_fallback.clone()
1718 )
1719 );
1720 let (sender, response_receiver) = oneshot::channel();
1722 swarm.behaviour_mut().send_request(
1723 older_peer_id.as_ref().unwrap(),
1724 protocol_name_1.clone(),
1725 b"request on protocol /test/req-resp-2".to_vec(),
1726 None,
1727 sender,
1728 IfDisconnected::ImmediateError,
1729 );
1730 loop {
1731 match swarm.select_next_some().await {
1732 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1733 assert_matches!(
1734 result.unwrap_err(),
1735 RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1736 );
1737 break;
1738 },
1739 _ => {},
1740 }
1741 }
1742 assert!(response_receiver.await.unwrap().is_err());
1743 let (sender, response_receiver) = oneshot::channel();
1745 swarm.behaviour_mut().send_request(
1746 older_peer_id.as_ref().unwrap(),
1747 protocol_name_2.clone(),
1748 b"request on protocol /test/other".to_vec(),
1749 None,
1750 sender,
1751 IfDisconnected::ImmediateError,
1752 );
1753 loop {
1754 match swarm.select_next_some().await {
1755 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1756 result.unwrap();
1757 break;
1758 },
1759 _ => {},
1760 }
1761 }
1762 assert_eq!(
1763 response_receiver.await.unwrap().unwrap(),
1764 (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1765 );
1766 }
1767
1768 #[tokio::test]
1782 async fn enforce_outbound_timeouts() {
1783 const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1784 const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1785
1786 let protocol_name = ProtocolName::from("/test/req-resp/1");
1788
1789 let protocol_config = ProtocolConfig {
1790 name: protocol_name.clone(),
1791 fallback_names: Vec::new(),
1792 max_request_size: 1024,
1793 max_response_size: 1024 * 1024,
1794 request_timeout: REQUEST_TIMEOUT, inbound_queue: None,
1796 };
1797
1798 let (mut first_swarm, _) = {
1800 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1801
1802 tokio::spawn(async move {
1803 if let Some(rq) = rx.next().await {
1804 assert_eq!(rq.payload, b"this is a request");
1805
1806 tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1809
1810 let _ = rq.pending_response.send(super::OutgoingResponse {
1813 result: Ok(b"Second swarm already timedout".to_vec()),
1814 reputation_changes: Vec::new(),
1815 sent_feedback: None,
1816 });
1817 }
1818 });
1819
1820 let mut protocol_config = protocol_config.clone();
1821 protocol_config.inbound_queue = Some(tx);
1822
1823 build_swarm(iter::once(protocol_config))
1824 };
1825
1826 let (mut second_swarm, second_address) = {
1827 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1828
1829 tokio::spawn(async move {
1830 while let Some(rq) = rx.next().await {
1831 let _ = rq.pending_response.send(super::OutgoingResponse {
1832 result: Ok(b"This is the response".to_vec()),
1833 reputation_changes: Vec::new(),
1834 sent_feedback: None,
1835 });
1836 }
1837 });
1838 let mut protocol_config = protocol_config.clone();
1839 protocol_config.inbound_queue = Some(tx);
1840
1841 build_swarm(iter::once(protocol_config.clone()))
1842 };
1843 second_swarm
1845 .behaviour_mut()
1846 .protocols
1847 .get_mut(&protocol_name)
1848 .unwrap()
1849 .request_timeout = REQUEST_TIMEOUT_SHORT;
1850
1851 {
1853 Swarm::dial(&mut first_swarm, second_address).unwrap();
1854 }
1855
1856 tokio::spawn(async move {
1859 loop {
1860 let event = first_swarm.select_next_some().await;
1861 match event {
1862 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1863 assert!(result.is_ok());
1864 break;
1865 },
1866 SwarmEvent::ConnectionClosed { .. } => {
1867 break;
1868 },
1869 _ => {},
1870 }
1871 }
1872 });
1873
1874 let mut response_receiver = None;
1878 loop {
1879 let event = second_swarm.select_next_some().await;
1880
1881 match event {
1882 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1883 let (sender, receiver) = oneshot::channel();
1884 second_swarm.behaviour_mut().send_request(
1885 &peer_id,
1886 protocol_name.clone(),
1887 b"this is a request".to_vec(),
1888 None,
1889 sender,
1890 IfDisconnected::ImmediateError,
1891 );
1892 assert!(response_receiver.is_none());
1893 response_receiver = Some(receiver);
1894 },
1895 SwarmEvent::ConnectionClosed { .. } => {
1896 break;
1897 },
1898 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1899 assert!(result.is_err());
1900 break;
1901 },
1902 _ => {},
1903 }
1904 }
1905
1906 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1908 RequestFailure::Network(OutboundFailure::Timeout) => {},
1909 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1910 }
1911 }
1912}