1use crate::{
38 peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39 service::traits::RequestResponseConfig as RequestResponseConfigT,
40 types::ProtocolName,
41 ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46 core::{Endpoint, Multiaddr},
47 request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48 swarm::{
49 behaviour::{ConnectionClosed, FromSwarm},
50 handler::multi::MultiHandler,
51 ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, THandler,
52 THandlerInEvent, THandlerOutEvent, ToSwarm,
53 },
54 PeerId,
55};
56
57use std::{
58 collections::{hash_map::Entry, HashMap},
59 io, iter,
60 ops::Deref,
61 pin::Pin,
62 sync::Arc,
63 task::{Context, Poll},
64 time::{Duration, Instant},
65};
66
67pub use libp2p::request_response::{Config, RequestId};
68
69#[derive(Debug, thiserror::Error)]
72pub enum OutboundFailure {
73 #[error("Failed to dial the requested peer")]
75 DialFailure,
76 #[error("Timeout while waiting for a response")]
78 Timeout,
79 #[error("Connection was closed before a response was received")]
81 ConnectionClosed,
82 #[error("The remote supports none of the requested protocols")]
84 UnsupportedProtocols,
85}
86
87impl From<request_response::OutboundFailure> for OutboundFailure {
88 fn from(out: request_response::OutboundFailure) -> Self {
89 match out {
90 request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
91 request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
92 request_response::OutboundFailure::ConnectionClosed =>
93 OutboundFailure::ConnectionClosed,
94 request_response::OutboundFailure::UnsupportedProtocols =>
95 OutboundFailure::UnsupportedProtocols,
96 }
97 }
98}
99
100#[derive(Debug, thiserror::Error)]
103pub enum InboundFailure {
104 #[error("Timeout while receiving request or sending response")]
107 Timeout,
108 #[error("Connection was closed before a response could be sent")]
110 ConnectionClosed,
111 #[error("The local peer supports none of the protocols requested by the remote")]
113 UnsupportedProtocols,
114 #[error("The response channel was dropped without sending a response to the remote")]
116 ResponseOmission,
117}
118
119impl From<request_response::InboundFailure> for InboundFailure {
120 fn from(out: request_response::InboundFailure) -> Self {
121 match out {
122 request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
123 request_response::InboundFailure::Timeout => InboundFailure::Timeout,
124 request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
125 request_response::InboundFailure::UnsupportedProtocols =>
126 InboundFailure::UnsupportedProtocols,
127 }
128 }
129}
130
131#[derive(Debug, thiserror::Error)]
133#[allow(missing_docs)]
134pub enum RequestFailure {
135 #[error("We are not currently connected to the requested peer.")]
136 NotConnected,
137 #[error("Given protocol hasn't been registered.")]
138 UnknownProtocol,
139 #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
140 Refused,
141 #[error("The remote replied, but the local node is no longer interested in the response.")]
142 Obsolete,
143 #[error("Problem on the network: {0}")]
144 Network(OutboundFailure),
145}
146
147#[derive(Debug, Clone)]
149pub struct ProtocolConfig {
150 pub name: ProtocolName,
152
153 pub fallback_names: Vec<ProtocolName>,
155
156 pub max_request_size: u64,
161
162 pub max_response_size: u64,
167
168 pub request_timeout: Duration,
172
173 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
194}
195
196impl RequestResponseConfigT for ProtocolConfig {
197 fn protocol_name(&self) -> &ProtocolName {
198 &self.name
199 }
200}
201
202#[derive(Debug)]
204pub struct IncomingRequest {
205 pub peer: sc_network_types::PeerId,
207
208 pub payload: Vec<u8>,
211
212 pub pending_response: oneshot::Sender<OutgoingResponse>,
221}
222
223#[derive(Debug)]
225pub struct OutgoingResponse {
226 pub result: Result<Vec<u8>, ()>,
230
231 pub reputation_changes: Vec<ReputationChange>,
234
235 pub sent_feedback: Option<oneshot::Sender<()>>,
244}
245
246struct PendingRequest {
248 started_at: Instant,
249 response_tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
250 fallback_request: Option<(Vec<u8>, ProtocolName)>,
251}
252
253#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
255pub enum IfDisconnected {
256 TryConnect,
258 ImmediateError,
260}
261
262impl IfDisconnected {
264 pub fn should_connect(self) -> bool {
266 match self {
267 Self::TryConnect => true,
268 Self::ImmediateError => false,
269 }
270 }
271}
272
273#[derive(Debug)]
275pub enum Event {
276 InboundRequest {
280 peer: PeerId,
282 protocol: ProtocolName,
284 result: Result<Duration, ResponseFailure>,
289 },
290
291 RequestFinished {
296 peer: PeerId,
298 protocol: ProtocolName,
300 duration: Duration,
302 result: Result<(), RequestFailure>,
304 },
305
306 ReputationChanges {
308 peer: PeerId,
310 changes: Vec<ReputationChange>,
312 },
313}
314
315#[derive(Debug, Clone, PartialEq, Eq, Hash)]
322struct ProtocolRequestId {
323 protocol: ProtocolName,
324 request_id: RequestId,
325}
326
327impl From<(ProtocolName, RequestId)> for ProtocolRequestId {
328 fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
329 Self { protocol, request_id }
330 }
331}
332
333pub struct RequestResponsesBehaviour {
335 protocols: HashMap<
340 ProtocolName,
341 (Behaviour<GenericCodec>, Option<async_channel::Sender<IncomingRequest>>),
342 >,
343
344 pending_requests: HashMap<ProtocolRequestId, PendingRequest>,
346
347 pending_responses: stream::FuturesUnordered<
350 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
351 >,
352
353 pending_responses_arrival_time: HashMap<ProtocolRequestId, Instant>,
355
356 send_feedback: HashMap<ProtocolRequestId, oneshot::Sender<()>>,
359
360 peer_store: Arc<dyn PeerStoreProvider>,
362}
363
364struct RequestProcessingOutcome {
366 peer: PeerId,
367 request_id: RequestId,
368 protocol: ProtocolName,
369 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
370 response: OutgoingResponse,
371}
372
373impl RequestResponsesBehaviour {
374 pub fn new(
377 list: impl Iterator<Item = ProtocolConfig>,
378 peer_store: Arc<dyn PeerStoreProvider>,
379 ) -> Result<Self, RegisterError> {
380 let mut protocols = HashMap::new();
381 for protocol in list {
382 let mut cfg = Config::default();
383 cfg.set_request_timeout(protocol.request_timeout);
384
385 let protocol_support = if protocol.inbound_queue.is_some() {
386 ProtocolSupport::Full
387 } else {
388 ProtocolSupport::Outbound
389 };
390
391 let rq_rp = Behaviour::with_codec(
392 GenericCodec {
393 max_request_size: protocol.max_request_size,
394 max_response_size: protocol.max_response_size,
395 },
396 iter::once(protocol.name.clone())
397 .chain(protocol.fallback_names)
398 .zip(iter::repeat(protocol_support)),
399 cfg,
400 );
401
402 match protocols.entry(protocol.name) {
403 Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)),
404 Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
405 };
406 }
407
408 Ok(Self {
409 protocols,
410 pending_requests: Default::default(),
411 pending_responses: Default::default(),
412 pending_responses_arrival_time: Default::default(),
413 send_feedback: Default::default(),
414 peer_store,
415 })
416 }
417
418 pub fn send_request(
425 &mut self,
426 target: &PeerId,
427 protocol_name: ProtocolName,
428 request: Vec<u8>,
429 fallback_request: Option<(Vec<u8>, ProtocolName)>,
430 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
431 connect: IfDisconnected,
432 ) {
433 log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len());
434
435 if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) {
436 Self::send_request_inner(
437 protocol,
438 &mut self.pending_requests,
439 target,
440 protocol_name,
441 request,
442 fallback_request,
443 pending_response,
444 connect,
445 )
446 } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
447 log::debug!(
448 target: "sub-libp2p",
449 "Unknown protocol {:?}. At the same time local \
450 node is no longer interested in the result.",
451 protocol_name,
452 );
453 }
454 }
455
456 fn send_request_inner(
457 behaviour: &mut Behaviour<GenericCodec>,
458 pending_requests: &mut HashMap<ProtocolRequestId, PendingRequest>,
459 target: &PeerId,
460 protocol_name: ProtocolName,
461 request: Vec<u8>,
462 fallback_request: Option<(Vec<u8>, ProtocolName)>,
463 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
464 connect: IfDisconnected,
465 ) {
466 if behaviour.is_connected(target) || connect.should_connect() {
467 let request_id = behaviour.send_request(target, request);
468 let prev_req_id = pending_requests.insert(
469 (protocol_name.to_string().into(), request_id).into(),
470 PendingRequest {
471 started_at: Instant::now(),
472 response_tx: pending_response,
473 fallback_request,
474 },
475 );
476 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
477 } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
478 log::debug!(
479 target: "sub-libp2p",
480 "Not connected to peer {:?}. At the same time local \
481 node is no longer interested in the result.",
482 target,
483 );
484 }
485 }
486}
487
488impl NetworkBehaviour for RequestResponsesBehaviour {
489 type ConnectionHandler =
490 MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
491 type ToSwarm = Event;
492
493 fn handle_pending_inbound_connection(
494 &mut self,
495 _connection_id: ConnectionId,
496 _local_addr: &Multiaddr,
497 _remote_addr: &Multiaddr,
498 ) -> Result<(), ConnectionDenied> {
499 Ok(())
500 }
501
502 fn handle_pending_outbound_connection(
503 &mut self,
504 _connection_id: ConnectionId,
505 _maybe_peer: Option<PeerId>,
506 _addresses: &[Multiaddr],
507 _effective_role: Endpoint,
508 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
509 Ok(Vec::new())
510 }
511
512 fn handle_established_inbound_connection(
513 &mut self,
514 connection_id: ConnectionId,
515 peer: PeerId,
516 local_addr: &Multiaddr,
517 remote_addr: &Multiaddr,
518 ) -> Result<THandler<Self>, ConnectionDenied> {
519 let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| {
520 if let Ok(handler) = r.handle_established_inbound_connection(
521 connection_id,
522 peer,
523 local_addr,
524 remote_addr,
525 ) {
526 Some((p.to_string(), handler))
527 } else {
528 None
529 }
530 });
531
532 Ok(MultiHandler::try_from_iter(iter).expect(
533 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
534 which is the only possible error; qed",
535 ))
536 }
537
538 fn handle_established_outbound_connection(
539 &mut self,
540 connection_id: ConnectionId,
541 peer: PeerId,
542 addr: &Multiaddr,
543 role_override: Endpoint,
544 ) -> Result<THandler<Self>, ConnectionDenied> {
545 let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| {
546 if let Ok(handler) =
547 r.handle_established_outbound_connection(connection_id, peer, addr, role_override)
548 {
549 Some((p.to_string(), handler))
550 } else {
551 None
552 }
553 });
554
555 Ok(MultiHandler::try_from_iter(iter).expect(
556 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
557 which is the only possible error; qed",
558 ))
559 }
560
561 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
562 match event {
563 FromSwarm::ConnectionEstablished(e) =>
564 for (p, _) in self.protocols.values_mut() {
565 NetworkBehaviour::on_swarm_event(p, FromSwarm::ConnectionEstablished(e));
566 },
567 FromSwarm::ConnectionClosed(ConnectionClosed {
568 peer_id,
569 connection_id,
570 endpoint,
571 handler,
572 remaining_established,
573 }) =>
574 for (p_name, p_handler) in handler.into_iter() {
575 if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
576 proto.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
577 peer_id,
578 connection_id,
579 endpoint,
580 handler: p_handler,
581 remaining_established,
582 }));
583 } else {
584 log::error!(
585 target: "sub-libp2p",
586 "on_swarm_event/connection_closed: no request-response instance registered for protocol {:?}",
587 p_name,
588 )
589 }
590 },
591 FromSwarm::DialFailure(e) =>
592 for (p, _) in self.protocols.values_mut() {
593 NetworkBehaviour::on_swarm_event(p, FromSwarm::DialFailure(e));
594 },
595 FromSwarm::ListenerClosed(e) =>
596 for (p, _) in self.protocols.values_mut() {
597 NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerClosed(e));
598 },
599 FromSwarm::ListenFailure(e) =>
600 for (p, _) in self.protocols.values_mut() {
601 NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenFailure(e));
602 },
603 FromSwarm::ListenerError(e) =>
604 for (p, _) in self.protocols.values_mut() {
605 NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerError(e));
606 },
607 FromSwarm::ExternalAddrExpired(e) =>
608 for (p, _) in self.protocols.values_mut() {
609 NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrExpired(e));
610 },
611 FromSwarm::NewListener(e) =>
612 for (p, _) in self.protocols.values_mut() {
613 NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListener(e));
614 },
615 FromSwarm::ExpiredListenAddr(e) =>
616 for (p, _) in self.protocols.values_mut() {
617 NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredListenAddr(e));
618 },
619 FromSwarm::NewExternalAddrCandidate(e) =>
620 for (p, _) in self.protocols.values_mut() {
621 NetworkBehaviour::on_swarm_event(p, FromSwarm::NewExternalAddrCandidate(e));
622 },
623 FromSwarm::ExternalAddrConfirmed(e) =>
624 for (p, _) in self.protocols.values_mut() {
625 NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrConfirmed(e));
626 },
627 FromSwarm::AddressChange(e) =>
628 for (p, _) in self.protocols.values_mut() {
629 NetworkBehaviour::on_swarm_event(p, FromSwarm::AddressChange(e));
630 },
631 FromSwarm::NewListenAddr(e) =>
632 for (p, _) in self.protocols.values_mut() {
633 NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListenAddr(e));
634 },
635 }
636 }
637
638 fn on_connection_handler_event(
639 &mut self,
640 peer_id: PeerId,
641 connection_id: ConnectionId,
642 event: THandlerOutEvent<Self>,
643 ) {
644 let p_name = event.0;
645 if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
646 return proto.on_connection_handler_event(peer_id, connection_id, event.1)
647 } else {
648 log::warn!(
649 target: "sub-libp2p",
650 "on_connection_handler_event: no request-response instance registered for protocol {:?}",
651 p_name
652 );
653 }
654 }
655
656 fn poll(
657 &mut self,
658 cx: &mut Context,
659 params: &mut impl PollParameters,
660 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
661 'poll_all: loop {
662 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
664 let RequestProcessingOutcome {
665 peer,
666 request_id,
667 protocol: protocol_name,
668 inner_channel,
669 response: OutgoingResponse { result, reputation_changes, sent_feedback },
670 } = match outcome {
671 Some(outcome) => outcome,
672 None => continue,
675 };
676
677 if let Ok(payload) = result {
678 if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
679 log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
680
681 if protocol.send_response(inner_channel, Ok(payload)).is_err() {
682 log::debug!(
685 target: "sub-libp2p",
686 "Failed to send response for {:?} on protocol {:?} due to a \
687 timeout or due to the connection to the peer being closed. \
688 Dropping response",
689 request_id, protocol_name,
690 );
691 } else if let Some(sent_feedback) = sent_feedback {
692 self.send_feedback
693 .insert((protocol_name, request_id).into(), sent_feedback);
694 }
695 }
696 }
697
698 if !reputation_changes.is_empty() {
699 return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
700 peer,
701 changes: reputation_changes,
702 }))
703 }
704 }
705
706 let mut fallback_requests = vec![];
707
708 for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols {
710 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) {
711 let ev = match ev {
712 ToSwarm::GenerateEvent(ev) => ev,
714
715 ToSwarm::Dial { opts } => {
718 if opts.get_peer_id().is_none() {
719 log::error!(
720 "The request-response isn't supposed to start dialing addresses"
721 );
722 }
723 return Poll::Ready(ToSwarm::Dial { opts })
724 },
725 ToSwarm::NotifyHandler { peer_id, handler, event } =>
726 return Poll::Ready(ToSwarm::NotifyHandler {
727 peer_id,
728 handler,
729 event: ((*protocol).to_string(), event),
730 }),
731 ToSwarm::CloseConnection { peer_id, connection } =>
732 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
733 ToSwarm::NewExternalAddrCandidate(observed) =>
734 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
735 ToSwarm::ExternalAddrConfirmed(addr) =>
736 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
737 ToSwarm::ExternalAddrExpired(addr) =>
738 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
739 ToSwarm::ListenOn { opts } =>
740 return Poll::Ready(ToSwarm::ListenOn { opts }),
741 ToSwarm::RemoveListener { id } =>
742 return Poll::Ready(ToSwarm::RemoveListener { id }),
743 };
744
745 match ev {
746 request_response::Event::Message {
748 peer,
749 message: Message::Request { request_id, request, channel, .. },
750 } => {
751 self.pending_responses_arrival_time
752 .insert((protocol.clone(), request_id).into(), Instant::now());
753
754 let reputation = self.peer_store.peer_reputation(&peer.into());
755
756 if reputation < BANNED_THRESHOLD {
757 log::debug!(
758 target: "sub-libp2p",
759 "Cannot handle requests from a node with a low reputation {}: {}",
760 peer,
761 reputation,
762 );
763 continue 'poll_protocol
764 }
765
766 let (tx, rx) = oneshot::channel();
767
768 if let Some(resp_builder) = resp_builder {
771 let _ = resp_builder.try_send(IncomingRequest {
778 peer: peer.into(),
779 payload: request,
780 pending_response: tx,
781 });
782 } else {
783 debug_assert!(false, "Received message on outbound-only protocol.");
784 }
785
786 let protocol = protocol.clone();
787
788 self.pending_responses.push(Box::pin(async move {
789 rx.await.map_or(None, |response| {
793 Some(RequestProcessingOutcome {
794 peer,
795 request_id,
796 protocol,
797 inner_channel: channel,
798 response,
799 })
800 })
801 }));
802
803 continue 'poll_all
806 },
807
808 request_response::Event::Message {
810 peer,
811 message: Message::Response { request_id, response },
812 ..
813 } => {
814 let (started, delivered) = match self
815 .pending_requests
816 .remove(&(protocol.clone(), request_id).into())
817 {
818 Some(PendingRequest { started_at, response_tx, .. }) => {
819 log::trace!(
820 target: "sub-libp2p",
821 "received response from {peer} ({protocol:?}), {} bytes",
822 response.as_ref().map_or(0usize, |response| response.len()),
823 );
824
825 let delivered = response_tx
826 .send(
827 response
828 .map_err(|()| RequestFailure::Refused)
829 .map(|resp| (resp, protocol.clone())),
830 )
831 .map_err(|_| RequestFailure::Obsolete);
832 (started_at, delivered)
833 },
834 None => {
835 log::warn!(
836 target: "sub-libp2p",
837 "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
838 request_id,
839 );
840 debug_assert!(false);
841 continue
842 },
843 };
844
845 let out = Event::RequestFinished {
846 peer,
847 protocol: protocol.clone(),
848 duration: started.elapsed(),
849 result: delivered,
850 };
851
852 return Poll::Ready(ToSwarm::GenerateEvent(out))
853 },
854
855 request_response::Event::OutboundFailure {
857 peer,
858 request_id,
859 error,
860 ..
861 } => {
862 let started = match self
863 .pending_requests
864 .remove(&(protocol.clone(), request_id).into())
865 {
866 Some(PendingRequest {
867 started_at,
868 response_tx,
869 fallback_request,
870 }) => {
871 if let request_response::OutboundFailure::UnsupportedProtocols =
874 error
875 {
876 if let Some((fallback_request, fallback_protocol)) =
877 fallback_request
878 {
879 log::trace!(
880 target: "sub-libp2p",
881 "Request with id {:?} failed. Trying the fallback protocol. {}",
882 request_id,
883 fallback_protocol.deref()
884 );
885 fallback_requests.push((
886 peer,
887 fallback_protocol,
888 fallback_request,
889 response_tx,
890 ));
891 continue
892 }
893 }
894
895 if response_tx
896 .send(Err(RequestFailure::Network(error.clone().into())))
897 .is_err()
898 {
899 log::debug!(
900 target: "sub-libp2p",
901 "Request with id {:?} failed. At the same time local \
902 node is no longer interested in the result.",
903 request_id,
904 );
905 }
906 started_at
907 },
908 None => {
909 log::warn!(
910 target: "sub-libp2p",
911 "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
912 request_id,
913 );
914 debug_assert!(false);
915 continue
916 },
917 };
918
919 let out = Event::RequestFinished {
920 peer,
921 protocol: protocol.clone(),
922 duration: started.elapsed(),
923 result: Err(RequestFailure::Network(error.into())),
924 };
925
926 return Poll::Ready(ToSwarm::GenerateEvent(out))
927 },
928
929 request_response::Event::InboundFailure {
932 request_id, peer, error, ..
933 } => {
934 self.pending_responses_arrival_time
935 .remove(&(protocol.clone(), request_id).into());
936 self.send_feedback.remove(&(protocol.clone(), request_id).into());
937 let out = Event::InboundRequest {
938 peer,
939 protocol: protocol.clone(),
940 result: Err(ResponseFailure::Network(error.into())),
941 };
942 return Poll::Ready(ToSwarm::GenerateEvent(out))
943 },
944
945 request_response::Event::ResponseSent { request_id, peer } => {
947 let arrival_time = self
948 .pending_responses_arrival_time
949 .remove(&(protocol.clone(), request_id).into())
950 .map(|t| t.elapsed())
951 .expect(
952 "Time is added for each inbound request on arrival and only \
953 removed on success (`ResponseSent`) or failure \
954 (`InboundFailure`). One can not receive a success event for a \
955 request that either never arrived, or that has previously \
956 failed; qed.",
957 );
958
959 if let Some(send_feedback) =
960 self.send_feedback.remove(&(protocol.clone(), request_id).into())
961 {
962 let _ = send_feedback.send(());
963 }
964
965 let out = Event::InboundRequest {
966 peer,
967 protocol: protocol.clone(),
968 result: Ok(arrival_time),
969 };
970
971 return Poll::Ready(ToSwarm::GenerateEvent(out))
972 },
973 };
974 }
975 }
976
977 for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
979 if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) {
980 Self::send_request_inner(
981 behaviour,
982 &mut self.pending_requests,
983 &peer,
984 protocol,
985 request,
986 None,
987 pending_response,
988 IfDisconnected::ImmediateError,
992 );
993 }
994 }
995
996 break Poll::Pending
997 }
998 }
999}
1000
1001#[derive(Debug, thiserror::Error)]
1003pub enum RegisterError {
1004 #[error("{0}")]
1006 DuplicateProtocol(ProtocolName),
1007}
1008
1009#[derive(Debug, thiserror::Error)]
1011pub enum ResponseFailure {
1012 #[error("Problem on the network: {0}")]
1014 Network(InboundFailure),
1015}
1016
1017#[derive(Debug, Clone)]
1020#[doc(hidden)] pub struct GenericCodec {
1022 max_request_size: u64,
1023 max_response_size: u64,
1024}
1025
1026#[async_trait::async_trait]
1027impl Codec for GenericCodec {
1028 type Protocol = ProtocolName;
1029 type Request = Vec<u8>;
1030 type Response = Result<Vec<u8>, ()>;
1031
1032 async fn read_request<T>(
1033 &mut self,
1034 _: &Self::Protocol,
1035 mut io: &mut T,
1036 ) -> io::Result<Self::Request>
1037 where
1038 T: AsyncRead + Unpin + Send,
1039 {
1040 let length = unsigned_varint::aio::read_usize(&mut io)
1042 .await
1043 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1044 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1045 return Err(io::Error::new(
1046 io::ErrorKind::InvalidInput,
1047 format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1048 ))
1049 }
1050
1051 let mut buffer = vec![0; length];
1053 io.read_exact(&mut buffer).await?;
1054 Ok(buffer)
1055 }
1056
1057 async fn read_response<T>(
1058 &mut self,
1059 _: &Self::Protocol,
1060 mut io: &mut T,
1061 ) -> io::Result<Self::Response>
1062 where
1063 T: AsyncRead + Unpin + Send,
1064 {
1065 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1072 Ok(l) => l,
1073 Err(unsigned_varint::io::ReadError::Io(err))
1074 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1075 return Ok(Err(())),
1076 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1077 };
1078
1079 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1080 return Err(io::Error::new(
1081 io::ErrorKind::InvalidInput,
1082 format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1083 ))
1084 }
1085
1086 let mut buffer = vec![0; length];
1088 io.read_exact(&mut buffer).await?;
1089 Ok(Ok(buffer))
1090 }
1091
1092 async fn write_request<T>(
1093 &mut self,
1094 _: &Self::Protocol,
1095 io: &mut T,
1096 req: Self::Request,
1097 ) -> io::Result<()>
1098 where
1099 T: AsyncWrite + Unpin + Send,
1100 {
1101 {
1104 let mut buffer = unsigned_varint::encode::usize_buffer();
1105 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1106 }
1107
1108 io.write_all(&req).await?;
1110
1111 io.close().await?;
1112 Ok(())
1113 }
1114
1115 async fn write_response<T>(
1116 &mut self,
1117 _: &Self::Protocol,
1118 io: &mut T,
1119 res: Self::Response,
1120 ) -> io::Result<()>
1121 where
1122 T: AsyncWrite + Unpin + Send,
1123 {
1124 if let Ok(res) = res {
1126 {
1129 let mut buffer = unsigned_varint::encode::usize_buffer();
1130 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1131 }
1132
1133 io.write_all(&res).await?;
1135 }
1136
1137 io.close().await?;
1138 Ok(())
1139 }
1140}
1141
1142#[cfg(test)]
1143mod tests {
1144 use super::*;
1145
1146 use crate::mock::MockPeerStore;
1147 use assert_matches::assert_matches;
1148 use futures::{channel::oneshot, executor::LocalPool, task::Spawn};
1149 use libp2p::{
1150 core::{
1151 transport::{MemoryTransport, Transport},
1152 upgrade,
1153 },
1154 identity::Keypair,
1155 noise,
1156 swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1157 Multiaddr,
1158 };
1159 use std::{iter, time::Duration};
1160
1161 struct TokioExecutor(tokio::runtime::Runtime);
1162 impl Executor for TokioExecutor {
1163 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1164 let _ = self.0.spawn(f);
1165 }
1166 }
1167
1168 fn build_swarm(
1169 list: impl Iterator<Item = ProtocolConfig>,
1170 ) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1171 let keypair = Keypair::generate_ed25519();
1172
1173 let transport = MemoryTransport::new()
1174 .upgrade(upgrade::Version::V1)
1175 .authenticate(noise::Config::new(&keypair).unwrap())
1176 .multiplex(libp2p::yamux::Config::default())
1177 .boxed();
1178
1179 let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1180
1181 let runtime = tokio::runtime::Runtime::new().unwrap();
1182
1183 let mut swarm = Swarm::new(
1184 transport,
1185 behaviour,
1186 keypair.public().to_peer_id(),
1187 SwarmConfig::with_executor(TokioExecutor(runtime)),
1188 );
1189
1190 let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1191
1192 swarm.listen_on(listen_addr.clone()).unwrap();
1193
1194 (swarm, listen_addr)
1195 }
1196
1197 #[test]
1198 fn basic_request_response_works() {
1199 let protocol_name = ProtocolName::from("/test/req-resp/1");
1200 let mut pool = LocalPool::new();
1201
1202 let mut swarms = (0..2)
1204 .map(|_| {
1205 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1206
1207 pool.spawner()
1208 .spawn_obj(
1209 async move {
1210 while let Some(rq) = rx.next().await {
1211 let (fb_tx, fb_rx) = oneshot::channel();
1212 assert_eq!(rq.payload, b"this is a request");
1213 let _ = rq.pending_response.send(super::OutgoingResponse {
1214 result: Ok(b"this is a response".to_vec()),
1215 reputation_changes: Vec::new(),
1216 sent_feedback: Some(fb_tx),
1217 });
1218 fb_rx.await.unwrap();
1219 }
1220 }
1221 .boxed()
1222 .into(),
1223 )
1224 .unwrap();
1225
1226 let protocol_config = ProtocolConfig {
1227 name: protocol_name.clone(),
1228 fallback_names: Vec::new(),
1229 max_request_size: 1024,
1230 max_response_size: 1024 * 1024,
1231 request_timeout: Duration::from_secs(30),
1232 inbound_queue: Some(tx),
1233 };
1234
1235 build_swarm(iter::once(protocol_config))
1236 })
1237 .collect::<Vec<_>>();
1238
1239 {
1242 let dial_addr = swarms[1].1.clone();
1243 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1244 }
1245
1246 let (mut swarm, _) = swarms.remove(0);
1247 pool.spawner()
1249 .spawn_obj({
1250 async move {
1251 loop {
1252 match swarm.select_next_some().await {
1253 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1254 result.unwrap();
1255 },
1256 _ => {},
1257 }
1258 }
1259 }
1260 .boxed()
1261 .into()
1262 })
1263 .unwrap();
1264
1265 let (mut swarm, _) = swarms.remove(0);
1267 pool.run_until(async move {
1268 let mut response_receiver = None;
1269
1270 loop {
1271 match swarm.select_next_some().await {
1272 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1273 let (sender, receiver) = oneshot::channel();
1274 swarm.behaviour_mut().send_request(
1275 &peer_id,
1276 protocol_name.clone(),
1277 b"this is a request".to_vec(),
1278 None,
1279 sender,
1280 IfDisconnected::ImmediateError,
1281 );
1282 assert!(response_receiver.is_none());
1283 response_receiver = Some(receiver);
1284 },
1285 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1286 result.unwrap();
1287 break
1288 },
1289 _ => {},
1290 }
1291 }
1292
1293 assert_eq!(
1294 response_receiver.unwrap().await.unwrap().unwrap(),
1295 (b"this is a response".to_vec(), protocol_name)
1296 );
1297 });
1298 }
1299
1300 #[test]
1301 fn max_response_size_exceeded() {
1302 let protocol_name = ProtocolName::from("/test/req-resp/1");
1303 let mut pool = LocalPool::new();
1304
1305 let mut swarms = (0..2)
1307 .map(|_| {
1308 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1309
1310 pool.spawner()
1311 .spawn_obj(
1312 async move {
1313 while let Some(rq) = rx.next().await {
1314 assert_eq!(rq.payload, b"this is a request");
1315 let _ = rq.pending_response.send(super::OutgoingResponse {
1316 result: Ok(b"this response exceeds the limit".to_vec()),
1317 reputation_changes: Vec::new(),
1318 sent_feedback: None,
1319 });
1320 }
1321 }
1322 .boxed()
1323 .into(),
1324 )
1325 .unwrap();
1326
1327 let protocol_config = ProtocolConfig {
1328 name: protocol_name.clone(),
1329 fallback_names: Vec::new(),
1330 max_request_size: 1024,
1331 max_response_size: 8, request_timeout: Duration::from_secs(30),
1333 inbound_queue: Some(tx),
1334 };
1335
1336 build_swarm(iter::once(protocol_config))
1337 })
1338 .collect::<Vec<_>>();
1339
1340 {
1343 let dial_addr = swarms[1].1.clone();
1344 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1345 }
1346
1347 let (mut swarm, _) = swarms.remove(0);
1350 pool.spawner()
1351 .spawn_obj({
1352 async move {
1353 loop {
1354 match swarm.select_next_some().await {
1355 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1356 assert!(result.is_ok());
1357 break
1358 },
1359 _ => {},
1360 }
1361 }
1362 }
1363 .boxed()
1364 .into()
1365 })
1366 .unwrap();
1367
1368 let (mut swarm, _) = swarms.remove(0);
1370 pool.run_until(async move {
1371 let mut response_receiver = None;
1372
1373 loop {
1374 match swarm.select_next_some().await {
1375 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1376 let (sender, receiver) = oneshot::channel();
1377 swarm.behaviour_mut().send_request(
1378 &peer_id,
1379 protocol_name.clone(),
1380 b"this is a request".to_vec(),
1381 None,
1382 sender,
1383 IfDisconnected::ImmediateError,
1384 );
1385 assert!(response_receiver.is_none());
1386 response_receiver = Some(receiver);
1387 },
1388 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1389 assert!(result.is_err());
1390 break
1391 },
1392 _ => {},
1393 }
1394 }
1395
1396 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1397 RequestFailure::Network(OutboundFailure::ConnectionClosed) => {},
1398 _ => panic!(),
1399 }
1400 });
1401 }
1402
1403 #[test]
1414 fn request_id_collision() {
1415 let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1416 let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1417 let mut pool = LocalPool::new();
1418
1419 let mut swarm_1 = {
1420 let protocol_configs = vec![
1421 ProtocolConfig {
1422 name: protocol_name_1.clone(),
1423 fallback_names: Vec::new(),
1424 max_request_size: 1024,
1425 max_response_size: 1024 * 1024,
1426 request_timeout: Duration::from_secs(30),
1427 inbound_queue: None,
1428 },
1429 ProtocolConfig {
1430 name: protocol_name_2.clone(),
1431 fallback_names: Vec::new(),
1432 max_request_size: 1024,
1433 max_response_size: 1024 * 1024,
1434 request_timeout: Duration::from_secs(30),
1435 inbound_queue: None,
1436 },
1437 ];
1438
1439 build_swarm(protocol_configs.into_iter()).0
1440 };
1441
1442 let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1443 let (tx_1, rx_1) = async_channel::bounded(64);
1444 let (tx_2, rx_2) = async_channel::bounded(64);
1445
1446 let protocol_configs = vec![
1447 ProtocolConfig {
1448 name: protocol_name_1.clone(),
1449 fallback_names: Vec::new(),
1450 max_request_size: 1024,
1451 max_response_size: 1024 * 1024,
1452 request_timeout: Duration::from_secs(30),
1453 inbound_queue: Some(tx_1),
1454 },
1455 ProtocolConfig {
1456 name: protocol_name_2.clone(),
1457 fallback_names: Vec::new(),
1458 max_request_size: 1024,
1459 max_response_size: 1024 * 1024,
1460 request_timeout: Duration::from_secs(30),
1461 inbound_queue: Some(tx_2),
1462 },
1463 ];
1464
1465 let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1466
1467 (swarm, rx_1, rx_2, listen_addr)
1468 };
1469
1470 swarm_1.dial(listen_add_2).unwrap();
1473
1474 pool.spawner()
1476 .spawn_obj(
1477 async move {
1478 loop {
1479 match swarm_2.select_next_some().await {
1480 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1481 result.unwrap();
1482 },
1483 _ => {},
1484 }
1485 }
1486 }
1487 .boxed()
1488 .into(),
1489 )
1490 .unwrap();
1491
1492 pool.spawner()
1497 .spawn_obj(
1498 async move {
1499 let protocol_1_request = swarm_2_handler_1.next().await;
1500 let protocol_2_request = swarm_2_handler_2.next().await;
1501
1502 protocol_1_request
1503 .unwrap()
1504 .pending_response
1505 .send(OutgoingResponse {
1506 result: Ok(b"this is a response".to_vec()),
1507 reputation_changes: Vec::new(),
1508 sent_feedback: None,
1509 })
1510 .unwrap();
1511 protocol_2_request
1512 .unwrap()
1513 .pending_response
1514 .send(OutgoingResponse {
1515 result: Ok(b"this is a response".to_vec()),
1516 reputation_changes: Vec::new(),
1517 sent_feedback: None,
1518 })
1519 .unwrap();
1520 }
1521 .boxed()
1522 .into(),
1523 )
1524 .unwrap();
1525
1526 pool.run_until(async move {
1528 let mut response_receivers = None;
1529 let mut num_responses = 0;
1530
1531 loop {
1532 match swarm_1.select_next_some().await {
1533 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1534 let (sender_1, receiver_1) = oneshot::channel();
1535 let (sender_2, receiver_2) = oneshot::channel();
1536 swarm_1.behaviour_mut().send_request(
1537 &peer_id,
1538 protocol_name_1.clone(),
1539 b"this is a request".to_vec(),
1540 None,
1541 sender_1,
1542 IfDisconnected::ImmediateError,
1543 );
1544 swarm_1.behaviour_mut().send_request(
1545 &peer_id,
1546 protocol_name_2.clone(),
1547 b"this is a request".to_vec(),
1548 None,
1549 sender_2,
1550 IfDisconnected::ImmediateError,
1551 );
1552 assert!(response_receivers.is_none());
1553 response_receivers = Some((receiver_1, receiver_2));
1554 },
1555 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1556 num_responses += 1;
1557 result.unwrap();
1558 if num_responses == 2 {
1559 break
1560 }
1561 },
1562 _ => {},
1563 }
1564 }
1565 let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1566 assert_eq!(
1567 response_receiver_1.await.unwrap().unwrap(),
1568 (b"this is a response".to_vec(), protocol_name_1)
1569 );
1570 assert_eq!(
1571 response_receiver_2.await.unwrap().unwrap(),
1572 (b"this is a response".to_vec(), protocol_name_2)
1573 );
1574 });
1575 }
1576
1577 #[test]
1578 fn request_fallback() {
1579 let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1580 let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1581 let protocol_name_2 = ProtocolName::from("/test/another");
1582 let mut pool = LocalPool::new();
1583
1584 let protocol_config_1 = ProtocolConfig {
1585 name: protocol_name_1.clone(),
1586 fallback_names: Vec::new(),
1587 max_request_size: 1024,
1588 max_response_size: 1024 * 1024,
1589 request_timeout: Duration::from_secs(30),
1590 inbound_queue: None,
1591 };
1592 let protocol_config_1_fallback = ProtocolConfig {
1593 name: protocol_name_1_fallback.clone(),
1594 fallback_names: Vec::new(),
1595 max_request_size: 1024,
1596 max_response_size: 1024 * 1024,
1597 request_timeout: Duration::from_secs(30),
1598 inbound_queue: None,
1599 };
1600 let protocol_config_2 = ProtocolConfig {
1601 name: protocol_name_2.clone(),
1602 fallback_names: Vec::new(),
1603 max_request_size: 1024,
1604 max_response_size: 1024 * 1024,
1605 request_timeout: Duration::from_secs(30),
1606 inbound_queue: None,
1607 };
1608
1609 let mut older_swarm = {
1612 let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1613 let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1614 let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1615 protocol_config_1_fallback.inbound_queue = Some(tx_1);
1616
1617 let mut protocol_config_2 = protocol_config_2.clone();
1618 protocol_config_2.inbound_queue = Some(tx_2);
1619
1620 pool.spawner()
1621 .spawn_obj(
1622 async move {
1623 for _ in 0..2 {
1624 if let Some(rq) = rx_1.next().await {
1625 let (fb_tx, fb_rx) = oneshot::channel();
1626 assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1627 let _ = rq.pending_response.send(super::OutgoingResponse {
1628 result: Ok(
1629 b"this is a response on protocol /test/req-resp/1".to_vec()
1630 ),
1631 reputation_changes: Vec::new(),
1632 sent_feedback: Some(fb_tx),
1633 });
1634 fb_rx.await.unwrap();
1635 }
1636 }
1637
1638 if let Some(rq) = rx_2.next().await {
1639 let (fb_tx, fb_rx) = oneshot::channel();
1640 assert_eq!(rq.payload, b"request on protocol /test/other");
1641 let _ = rq.pending_response.send(super::OutgoingResponse {
1642 result: Ok(b"this is a response on protocol /test/other".to_vec()),
1643 reputation_changes: Vec::new(),
1644 sent_feedback: Some(fb_tx),
1645 });
1646 fb_rx.await.unwrap();
1647 }
1648 }
1649 .boxed()
1650 .into(),
1651 )
1652 .unwrap();
1653
1654 build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1655 };
1656
1657 let mut new_swarm = build_swarm(
1659 vec![
1660 protocol_config_1.clone(),
1661 protocol_config_1_fallback.clone(),
1662 protocol_config_2.clone(),
1663 ]
1664 .into_iter(),
1665 );
1666
1667 {
1668 let dial_addr = older_swarm.1.clone();
1669 Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1670 }
1671
1672 pool.spawner()
1674 .spawn_obj({
1675 async move {
1676 loop {
1677 _ = older_swarm.0.select_next_some().await;
1678 }
1679 }
1680 .boxed()
1681 .into()
1682 })
1683 .unwrap();
1684
1685 let (mut swarm, _) = new_swarm;
1687 let mut older_peer_id = None;
1688
1689 pool.run_until(async move {
1690 let mut response_receiver = None;
1691 loop {
1693 match swarm.select_next_some().await {
1694 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1695 older_peer_id = Some(peer_id);
1696 let (sender, receiver) = oneshot::channel();
1697 swarm.behaviour_mut().send_request(
1698 &peer_id,
1699 protocol_name_1.clone(),
1700 b"request on protocol /test/req-resp/2".to_vec(),
1701 Some((
1702 b"request on protocol /test/req-resp/1".to_vec(),
1703 protocol_config_1_fallback.name.clone(),
1704 )),
1705 sender,
1706 IfDisconnected::ImmediateError,
1707 );
1708 response_receiver = Some(receiver);
1709 },
1710 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1711 result.unwrap();
1712 break
1713 },
1714 _ => {},
1715 }
1716 }
1717 assert_eq!(
1718 response_receiver.unwrap().await.unwrap().unwrap(),
1719 (
1720 b"this is a response on protocol /test/req-resp/1".to_vec(),
1721 protocol_name_1_fallback.clone()
1722 )
1723 );
1724 let (sender, response_receiver) = oneshot::channel();
1726 swarm.behaviour_mut().send_request(
1727 older_peer_id.as_ref().unwrap(),
1728 protocol_name_1_fallback.clone(),
1729 b"request on protocol /test/req-resp/1".to_vec(),
1730 Some((
1731 b"dummy request, will fail if processed".to_vec(),
1732 protocol_config_1_fallback.name.clone(),
1733 )),
1734 sender,
1735 IfDisconnected::ImmediateError,
1736 );
1737 loop {
1738 match swarm.select_next_some().await {
1739 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1740 result.unwrap();
1741 break
1742 },
1743 _ => {},
1744 }
1745 }
1746 assert_eq!(
1747 response_receiver.await.unwrap().unwrap(),
1748 (
1749 b"this is a response on protocol /test/req-resp/1".to_vec(),
1750 protocol_name_1_fallback.clone()
1751 )
1752 );
1753 let (sender, response_receiver) = oneshot::channel();
1755 swarm.behaviour_mut().send_request(
1756 older_peer_id.as_ref().unwrap(),
1757 protocol_name_1.clone(),
1758 b"request on protocol /test/req-resp-2".to_vec(),
1759 None,
1760 sender,
1761 IfDisconnected::ImmediateError,
1762 );
1763 loop {
1764 match swarm.select_next_some().await {
1765 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1766 assert_matches!(
1767 result.unwrap_err(),
1768 RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1769 );
1770 break
1771 },
1772 _ => {},
1773 }
1774 }
1775 assert!(response_receiver.await.unwrap().is_err());
1776 let (sender, response_receiver) = oneshot::channel();
1778 swarm.behaviour_mut().send_request(
1779 older_peer_id.as_ref().unwrap(),
1780 protocol_name_2.clone(),
1781 b"request on protocol /test/other".to_vec(),
1782 None,
1783 sender,
1784 IfDisconnected::ImmediateError,
1785 );
1786 loop {
1787 match swarm.select_next_some().await {
1788 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1789 result.unwrap();
1790 break
1791 },
1792 _ => {},
1793 }
1794 }
1795 assert_eq!(
1796 response_receiver.await.unwrap().unwrap(),
1797 (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1798 );
1799 });
1800 }
1801}