1use crate::{
24 error::{Error, NegotiationError, SubstreamError},
25 multistream_select::NegotiationError::Failed as MultistreamFailed,
26 protocol::{
27 request_response::handle::{InnerRequestResponseEvent, RequestResponseCommand},
28 Direction, TransportEvent, TransportService,
29 },
30 substream::{Substream, SubstreamSet},
31 types::{protocol::ProtocolName, RequestId, SubstreamId},
32 PeerId,
33};
34
35use bytes::BytesMut;
36use futures::{channel, future::BoxFuture, stream::FuturesUnordered, StreamExt};
37use tokio::{
38 sync::{
39 mpsc::{Receiver, Sender},
40 oneshot,
41 },
42 time::sleep,
43};
44
45use std::{
46 collections::{hash_map::Entry, HashMap, HashSet},
47 io::ErrorKind,
48 sync::{
49 atomic::{AtomicUsize, Ordering},
50 Arc,
51 },
52 time::Duration,
53};
54
55pub use config::{Config, ConfigBuilder};
56pub use handle::{
57 DialOptions, RejectReason, RequestResponseError, RequestResponseEvent, RequestResponseHandle,
58};
59
60mod config;
61mod handle;
62#[cfg(test)]
63mod tests;
64
65const LOG_TARGET: &str = "litep2p::request-response::protocol";
71
72const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
74
75type PendingRequest = (
77 PeerId,
78 RequestId,
79 Option<ProtocolName>,
80 Result<Vec<u8>, RequestResponseError>,
81);
82
83struct RequestContext {
85 peer: PeerId,
87
88 request_id: RequestId,
90
91 request: Vec<u8>,
93
94 fallback: Option<(ProtocolName, Vec<u8>)>,
96}
97
98impl RequestContext {
99 fn new(
101 peer: PeerId,
102 request_id: RequestId,
103 request: Vec<u8>,
104 fallback: Option<(ProtocolName, Vec<u8>)>,
105 ) -> Self {
106 Self {
107 peer,
108 request_id,
109 request,
110 fallback,
111 }
112 }
113}
114
115struct PeerContext {
117 active: HashSet<RequestId>,
119
120 active_inbound: HashMap<RequestId, Option<ProtocolName>>,
122}
123
124impl PeerContext {
125 fn new() -> Self {
127 Self {
128 active: HashSet::new(),
129 active_inbound: HashMap::new(),
130 }
131 }
132}
133
134pub(crate) struct RequestResponseProtocol {
136 service: TransportService,
138
139 protocol: ProtocolName,
141
142 peers: HashMap<PeerId, PeerContext>,
144
145 pending_outbound: HashMap<SubstreamId, RequestContext>,
147
148 pending_outbound_responses: FuturesUnordered<BoxFuture<'static, ()>>,
157
158 pending_inbound: FuturesUnordered<BoxFuture<'static, PendingRequest>>,
160
161 pending_outbound_cancels: HashMap<RequestId, oneshot::Sender<()>>,
163
164 pending_inbound_requests: SubstreamSet<(PeerId, RequestId), Substream>,
166
167 pending_dials: HashMap<PeerId, RequestContext>,
169
170 event_tx: Sender<InnerRequestResponseEvent>,
172
173 command_rx: Receiver<RequestResponseCommand>,
175
176 next_request_id: Arc<AtomicUsize>,
180
181 timeout: Duration,
183
184 max_concurrent_inbound_requests: Option<usize>,
186}
187
188impl RequestResponseProtocol {
189 pub(crate) fn new(service: TransportService, config: Config) -> Self {
191 Self {
192 service,
193 peers: HashMap::new(),
194 timeout: config.timeout,
195 next_request_id: config.next_request_id,
196 event_tx: config.event_tx,
197 command_rx: config.command_rx,
198 protocol: config.protocol_name,
199 pending_dials: HashMap::new(),
200 pending_outbound: HashMap::new(),
201 pending_inbound: FuturesUnordered::new(),
202 pending_outbound_cancels: HashMap::new(),
203 pending_inbound_requests: SubstreamSet::new(),
204 pending_outbound_responses: FuturesUnordered::new(),
205 max_concurrent_inbound_requests: config.max_concurrent_inbound_request,
206 }
207 }
208
209 fn next_request_id(&mut self) -> RequestId {
211 RequestId::from(self.next_request_id.fetch_add(1usize, Ordering::Relaxed))
212 }
213
214 async fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
216 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection established");
217
218 let Entry::Vacant(entry) = self.peers.entry(peer) else {
219 tracing::error!(
220 target: LOG_TARGET,
221 ?peer,
222 "state mismatch: peer already exists",
223 );
224 debug_assert!(false);
225 return Err(Error::PeerAlreadyExists(peer));
226 };
227
228 match self.pending_dials.remove(&peer) {
229 None => {
230 entry.insert(PeerContext::new());
231 }
232 Some(context) => match self.service.open_substream(peer) {
233 Ok(substream_id) => {
234 tracing::trace!(
235 target: LOG_TARGET,
236 ?peer,
237 protocol = %self.protocol,
238 request_id = ?context.request_id,
239 ?substream_id,
240 "dial succeeded, open substream",
241 );
242
243 entry.insert(PeerContext {
244 active: HashSet::from_iter([context.request_id]),
245 active_inbound: HashMap::new(),
246 });
247 self.pending_outbound.insert(
248 substream_id,
249 RequestContext::new(
250 peer,
251 context.request_id,
252 context.request,
253 context.fallback,
254 ),
255 );
256 }
257 Err(error) => {
262 tracing::warn!(
263 target: LOG_TARGET,
264 ?peer,
265 protocol = %self.protocol,
266 request_id = ?context.request_id,
267 ?error,
268 "failed to open substream",
269 );
270
271 return self
272 .report_request_failure(
273 peer,
274 context.request_id,
275 RequestResponseError::Rejected(error.into()),
276 )
277 .await;
278 }
279 },
280 }
281
282 Ok(())
283 }
284
285 async fn on_connection_closed(&mut self, peer: PeerId) {
287 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
288
289 let Some(context) = self.peers.remove(&peer) else {
290 tracing::error!(
291 target: LOG_TARGET,
292 ?peer,
293 "state mismatch: peer doesn't exist",
294 );
295 debug_assert!(false);
296 return;
297 };
298
299 for request_id in context.active {
301 let _ = self
302 .event_tx
303 .send(InnerRequestResponseEvent::RequestFailed {
304 peer,
305 request_id,
306 error: RequestResponseError::Rejected(RejectReason::ConnectionClosed),
307 })
308 .await;
309 }
310
311 for (request_id, _) in context.active_inbound {
313 self.pending_inbound_requests.remove(&(peer, request_id));
314 }
315 }
316
317 async fn on_outbound_substream(
319 &mut self,
320 peer: PeerId,
321 substream_id: SubstreamId,
322 mut substream: Substream,
323 fallback_protocol: Option<ProtocolName>,
324 ) -> crate::Result<()> {
325 let Some(RequestContext {
326 request_id,
327 request,
328 fallback,
329 ..
330 }) = self.pending_outbound.remove(&substream_id)
331 else {
332 tracing::error!(
333 target: LOG_TARGET,
334 ?peer,
335 protocol = %self.protocol,
336 ?substream_id,
337 "pending outbound request does not exist",
338 );
339 debug_assert!(false);
340
341 return Err(Error::InvalidState);
342 };
343
344 tracing::trace!(
345 target: LOG_TARGET,
346 ?peer,
347 protocol = %self.protocol,
348 ?substream_id,
349 ?request_id,
350 "substream opened, send request",
351 );
352
353 let request = match (&fallback_protocol, fallback) {
354 (Some(protocol), Some((fallback_protocol, fallback_request)))
355 if protocol == &fallback_protocol =>
356 fallback_request,
357 _ => request,
358 };
359
360 let request_timeout = self.timeout;
361 let protocol = self.protocol.clone();
362 let (tx, rx) = oneshot::channel();
363 self.pending_outbound_cancels.insert(request_id, tx);
364
365 self.pending_inbound.push(Box::pin(async move {
366 match tokio::time::timeout(request_timeout, substream.send_framed(request.into())).await
367 {
368 Err(_) => (
369 peer,
370 request_id,
371 fallback_protocol,
372 Err(RequestResponseError::Timeout),
373 ),
374 Ok(Err(SubstreamError::IoError(ErrorKind::PermissionDenied))) => {
375 tracing::warn!(
376 target: LOG_TARGET,
377 ?peer,
378 %protocol,
379 "tried to send too large request",
380 );
381
382 (
383 peer,
384 request_id,
385 fallback_protocol,
386 Err(RequestResponseError::TooLargePayload),
387 )
388 }
389 Ok(Err(error)) => (
390 peer,
391 request_id,
392 fallback_protocol,
393 Err(RequestResponseError::Rejected(error.into())),
394 ),
395 Ok(Ok(_)) => {
396 tokio::select! {
397 _ = rx => {
398 tracing::debug!(
399 target: LOG_TARGET,
400 ?peer,
401 %protocol,
402 ?request_id,
403 "request canceled",
404 );
405
406 let _ = substream.close().await;
407 (
408 peer,
409 request_id,
410 fallback_protocol,
411 Err(RequestResponseError::Canceled))
412 }
413 _ = sleep(request_timeout) => {
414 tracing::debug!(
415 target: LOG_TARGET,
416 ?peer,
417 %protocol,
418 ?request_id,
419 "request timed out",
420 );
421
422 let _ = substream.close().await;
423 (peer, request_id, fallback_protocol, Err(RequestResponseError::Timeout))
424 }
425 event = substream.next() => match event {
426 Some(Ok(response)) => {
427 (peer, request_id, fallback_protocol, Ok(response.freeze().into()))
428 },
429 Some(Err(error)) => {
430 (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(error.into())))
431 },
432 None => {
433 tracing::debug!(
434 target: LOG_TARGET,
435 ?peer,
436 %protocol,
437 ?request_id,
438 "substream closed",
439 );
440 (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(RejectReason::SubstreamClosed)))
441 }
442 }
443 }
444 }
445 }
446 }));
447
448 Ok(())
449 }
450
451 async fn on_inbound_request(
453 &mut self,
454 peer: PeerId,
455 request_id: RequestId,
456 request: Result<BytesMut, SubstreamError>,
457 ) -> crate::Result<()> {
458 let fallback = self
459 .peers
460 .get_mut(&peer)
461 .ok_or(Error::PeerDoesntExist(peer))?
462 .active_inbound
463 .remove(&request_id)
464 .ok_or_else(|| {
465 tracing::debug!(
466 target: LOG_TARGET,
467 ?peer,
468 protocol = %self.protocol,
469 ?request_id,
470 "no active inbound request",
471 );
472
473 Error::InvalidState
474 })?;
475 let mut substream =
476 self.pending_inbound_requests.remove(&(peer, request_id)).ok_or_else(|| {
477 tracing::debug!(
478 target: LOG_TARGET,
479 ?peer,
480 protocol = %self.protocol,
481 ?request_id,
482 "request doesn't exist in pending requests",
483 );
484
485 Error::InvalidState
486 })?;
487 let protocol = self.protocol.clone();
488
489 tracing::trace!(
490 target: LOG_TARGET,
491 ?peer,
492 %protocol,
493 ?request_id,
494 "inbound request",
495 );
496
497 let Ok(request) = request else {
498 tracing::debug!(
499 target: LOG_TARGET,
500 ?peer,
501 %protocol,
502 ?request_id,
503 ?request,
504 "failed to read request from substream",
505 );
506 return Err(Error::InvalidData);
507 };
508
509 let timeout = self.timeout;
515 let (response_tx, rx): (
516 oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>,
517 _,
518 ) = oneshot::channel();
519
520 self.pending_outbound_responses.push(Box::pin(async move {
521 match rx.await {
522 Err(_) => {
523 tracing::debug!(
524 target: LOG_TARGET,
525 ?peer,
526 %protocol,
527 ?request_id,
528 "request rejected",
529 );
530 let _ = substream.close().await;
531 }
532 Ok((response, mut feedback)) => {
533 tracing::trace!(
534 target: LOG_TARGET,
535 ?peer,
536 %protocol,
537 ?request_id,
538 "send response",
539 );
540
541 match tokio::time::timeout(timeout, substream.send_framed(response.into()))
542 .await
543 {
544 Err(_) => tracing::debug!(
545 target: LOG_TARGET,
546 ?peer,
547 %protocol,
548 ?request_id,
549 "timed out while sending response",
550 ),
551 Ok(Ok(_)) => feedback.take().map_or((), |feedback| {
552 let _ = feedback.send(());
553 }),
554 Ok(Err(error)) => tracing::trace!(
555 target: LOG_TARGET,
556 ?peer,
557 %protocol,
558 ?request_id,
559 ?error,
560 "failed to send request to peer",
561 ),
562 }
563 }
564 }
565 }));
566
567 self.event_tx
568 .send(InnerRequestResponseEvent::RequestReceived {
569 peer,
570 fallback,
571 request_id,
572 request: request.freeze().into(),
573 response_tx,
574 })
575 .await
576 .map_err(From::from)
577 }
578
579 async fn on_inbound_substream(
581 &mut self,
582 peer: PeerId,
583 fallback: Option<ProtocolName>,
584 substream: Substream,
585 ) -> crate::Result<()> {
586 tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "handle inbound substream");
587
588 if let Some(max_requests) = self.max_concurrent_inbound_requests {
589 let num_inbound_requests =
590 self.pending_inbound_requests.len() + self.pending_outbound_responses.len();
591
592 if max_requests <= num_inbound_requests {
593 tracing::debug!(
594 target: LOG_TARGET,
595 ?peer,
596 protocol = %self.protocol,
597 ?fallback,
598 ?max_requests,
599 "rejecting request as already at maximum",
600 );
601
602 let _ = substream.close().await;
603 return Ok(());
604 }
605 }
606
607 let request_id = self.next_request_id();
612 self.peers
613 .get_mut(&peer)
614 .ok_or(Error::PeerDoesntExist(peer))?
615 .active_inbound
616 .insert(request_id, fallback);
617 self.pending_inbound_requests.insert((peer, request_id), substream);
618
619 Ok(())
620 }
621
622 async fn on_dial_failure(&mut self, peer: PeerId) {
623 if let Some(context) = self.pending_dials.remove(&peer) {
624 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "failed to dial peer");
625
626 let _ = self
627 .peers
628 .get_mut(&peer)
629 .map(|peer_context| peer_context.active.remove(&context.request_id));
630 let _ = self
631 .report_request_failure(
632 peer,
633 context.request_id,
634 RequestResponseError::Rejected(RejectReason::DialFailed(None)),
635 )
636 .await;
637 }
638 }
639
640 async fn on_substream_open_failure(
642 &mut self,
643 substream: SubstreamId,
644 error: SubstreamError,
645 ) -> crate::Result<()> {
646 let Some(RequestContext {
647 request_id, peer, ..
648 }) = self.pending_outbound.remove(&substream)
649 else {
650 tracing::error!(
651 target: LOG_TARGET,
652 protocol = %self.protocol,
653 ?substream,
654 "pending outbound request does not exist",
655 );
656 debug_assert!(false);
657
658 return Err(Error::InvalidState);
659 };
660
661 tracing::debug!(
662 target: LOG_TARGET,
663 ?peer,
664 protocol = %self.protocol,
665 ?request_id,
666 ?substream,
667 ?error,
668 "failed to open substream",
669 );
670
671 let _ = self
672 .peers
673 .get_mut(&peer)
674 .map(|peer_context| peer_context.active.remove(&request_id));
675
676 self.event_tx
677 .send(InnerRequestResponseEvent::RequestFailed {
678 peer,
679 request_id,
680 error: match error {
681 SubstreamError::NegotiationError(NegotiationError::MultistreamSelectError(
682 MultistreamFailed,
683 )) => RequestResponseError::UnsupportedProtocol,
684 _ => RequestResponseError::Rejected(error.into()),
685 },
686 })
687 .await
688 .map_err(From::from)
689 }
690
691 async fn report_request_failure(
693 &mut self,
694 peer: PeerId,
695 request_id: RequestId,
696 error: RequestResponseError,
697 ) -> crate::Result<()> {
698 self.event_tx
699 .send(InnerRequestResponseEvent::RequestFailed {
700 peer,
701 request_id,
702 error,
703 })
704 .await
705 .map_err(From::from)
706 }
707
708 async fn on_send_request(
710 &mut self,
711 peer: PeerId,
712 request_id: RequestId,
713 request: Vec<u8>,
714 dial_options: DialOptions,
715 fallback: Option<(ProtocolName, Vec<u8>)>,
716 ) -> crate::Result<()> {
717 tracing::trace!(
718 target: LOG_TARGET,
719 ?peer,
720 protocol = %self.protocol,
721 ?request_id,
722 ?dial_options,
723 "send request to remote peer",
724 );
725
726 let Some(context) = self.peers.get_mut(&peer) else {
727 match dial_options {
728 DialOptions::Reject => {
729 tracing::debug!(
730 target: LOG_TARGET,
731 ?peer,
732 protocol = %self.protocol,
733 ?request_id,
734 ?dial_options,
735 "peer not connected and should not dial",
736 );
737
738 return self
739 .report_request_failure(
740 peer,
741 request_id,
742 RequestResponseError::NotConnected,
743 )
744 .await;
745 }
746 DialOptions::Dial => match self.service.dial(&peer) {
747 Ok(_) => {
748 tracing::trace!(
749 target: LOG_TARGET,
750 ?peer,
751 protocol = %self.protocol,
752 ?request_id,
753 "started dialing peer",
754 );
755
756 self.pending_dials.insert(
757 peer,
758 RequestContext::new(peer, request_id, request, fallback),
759 );
760 return Ok(());
761 }
762 Err(error) => {
763 tracing::debug!(
764 target: LOG_TARGET,
765 ?peer,
766 protocol = %self.protocol,
767 ?error,
768 "failed to dial peer"
769 );
770
771 return self
772 .report_request_failure(
773 peer,
774 request_id,
775 RequestResponseError::Rejected(RejectReason::DialFailed(Some(
776 error,
777 ))),
778 )
779 .await;
780 }
781 },
782 }
783 };
784
785 match self.service.open_substream(peer) {
788 Ok(substream_id) => {
789 let unique_request_id = context.active.insert(request_id);
790 debug_assert!(unique_request_id);
791
792 self.pending_outbound.insert(
793 substream_id,
794 RequestContext::new(peer, request_id, request, fallback),
795 );
796
797 Ok(())
798 }
799 Err(error) => {
800 tracing::debug!(
801 target: LOG_TARGET,
802 ?peer,
803 protocol = %self.protocol,
804 ?request_id,
805 ?error,
806 "failed to open substream",
807 );
808
809 self.report_request_failure(
810 peer,
811 request_id,
812 RequestResponseError::Rejected(error.into()),
813 )
814 .await
815 }
816 }
817 }
818
819 async fn on_substream_event(
821 &mut self,
822 peer: PeerId,
823 request_id: RequestId,
824 fallback: Option<ProtocolName>,
825 message: Result<Vec<u8>, RequestResponseError>,
826 ) -> crate::Result<()> {
827 if !self
828 .peers
829 .get_mut(&peer)
830 .ok_or(Error::PeerDoesntExist(peer))?
831 .active
832 .remove(&request_id)
833 {
834 tracing::warn!(
835 target: LOG_TARGET,
836 ?peer,
837 protocol = %self.protocol,
838 ?request_id,
839 "invalid state: received substream event but no active substream",
840 );
841 return Err(Error::InvalidState);
842 }
843
844 let event = match message {
845 Ok(response) => InnerRequestResponseEvent::ResponseReceived {
846 peer,
847 request_id,
848 response,
849 fallback,
850 },
851 Err(error) => match error {
852 RequestResponseError::Canceled => {
853 tracing::debug!(
854 target: LOG_TARGET,
855 ?peer,
856 protocol = %self.protocol,
857 ?request_id,
858 "request canceled by local node",
859 );
860 return Ok(());
861 }
862 error => InnerRequestResponseEvent::RequestFailed {
863 peer,
864 request_id,
865 error,
866 },
867 },
868 };
869
870 self.event_tx.send(event).await.map_err(From::from)
871 }
872
873 async fn on_cancel_request(&mut self, request_id: RequestId) -> crate::Result<()> {
875 tracing::trace!(target: LOG_TARGET, protocol = %self.protocol, ?request_id, "cancel outbound request");
876
877 match self.pending_outbound_cancels.remove(&request_id) {
878 Some(tx) => tx.send(()).map_err(|_| Error::SubstreamDoesntExist),
879 None => {
880 tracing::debug!(
881 target: LOG_TARGET,
882 protocol = %self.protocol,
883 ?request_id,
884 "tried to cancel request which doesn't exist",
885 );
886
887 Ok(())
888 }
889 }
890 }
891
892 pub async fn run(mut self) {
894 tracing::debug!(target: LOG_TARGET, "starting request-response event loop");
895
896 loop {
897 tokio::select! {
898 biased;
901
902 event = self.service.next() => match event {
903 Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
904 let _ = self.on_connection_established(peer).await;
905 }
906 Some(TransportEvent::ConnectionClosed { peer }) => {
907 self.on_connection_closed(peer).await;
908 }
909 Some(TransportEvent::SubstreamOpened {
910 peer,
911 substream,
912 direction,
913 fallback,
914 ..
915 }) => match direction {
916 Direction::Inbound => {
917 if let Err(error) = self.on_inbound_substream(peer, fallback, substream).await {
918 tracing::debug!(
919 target: LOG_TARGET,
920 ?peer,
921 protocol = %self.protocol,
922 ?error,
923 "failed to handle inbound substream",
924 );
925 }
926 }
927 Direction::Outbound(substream_id) => {
928 let _ = self.on_outbound_substream(peer, substream_id, substream, fallback).await;
929 }
930 },
931 Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
932 if let Err(error) = self.on_substream_open_failure(substream, error).await {
933 tracing::warn!(
934 target: LOG_TARGET,
935 protocol = %self.protocol,
936 ?error,
937 "failed to handle substream open failure",
938 );
939 }
940 }
941 Some(TransportEvent::DialFailure { peer, .. }) => self.on_dial_failure(peer).await,
942 None => return,
943 },
944 event = self.pending_inbound.select_next_some(), if !self.pending_inbound.is_empty() => {
945 let (peer, request_id, fallback, event) = event;
946
947 if let Err(error) = self.on_substream_event(peer, request_id, fallback, event).await {
948 tracing::debug!(
949 target: LOG_TARGET,
950 ?peer,
951 protocol = %self.protocol,
952 ?request_id,
953 ?error,
954 "failed to handle substream event",
955 );
956 }
957
958 self.pending_outbound_cancels.remove(&request_id);
959 }
960 _ = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => {}
961 event = self.pending_inbound_requests.next() => match event {
962 Some(((peer, request_id), message)) => {
963 if let Err(error) = self.on_inbound_request(peer, request_id, message).await {
964 tracing::debug!(
965 target: LOG_TARGET,
966 ?peer,
967 protocol = %self.protocol,
968 ?request_id,
969 ?error,
970 "failed to handle inbound request",
971 );
972 }
973 }
974 None => return,
975 },
976 command = self.command_rx.recv() => match command {
977 None => {
978 tracing::debug!(target: LOG_TARGET, protocol = %self.protocol, "user protocol has exited, exiting");
979 return
980 }
981 Some(command) => match command {
982 RequestResponseCommand::SendRequest { peer, request_id, request, dial_options } => {
983 if let Err(error) = self.on_send_request(peer, request_id, request, dial_options, None).await {
984 tracing::debug!(
985 target: LOG_TARGET,
986 ?peer,
987 protocol = %self.protocol,
988 ?request_id,
989 ?error,
990 "failed to send request",
991 );
992 }
993 }
994 RequestResponseCommand::CancelRequest { request_id } => {
995 if let Err(error) = self.on_cancel_request(request_id).await {
996 tracing::debug!(
997 target: LOG_TARGET,
998 protocol = %self.protocol,
999 ?request_id,
1000 ?error,
1001 "failed to cancel reqeuest",
1002 );
1003 }
1004 }
1005 RequestResponseCommand::SendRequestWithFallback { peer, request_id, request, fallback, dial_options } => {
1006 if let Err(error) = self.on_send_request(peer, request_id, request, dial_options, Some(fallback)).await {
1007 tracing::debug!(
1008 target: LOG_TARGET,
1009 ?peer,
1010 protocol = %self.protocol,
1011 ?request_id,
1012 ?error,
1013 "failed to send request",
1014 );
1015 }
1016 }
1017 }
1018 },
1019 }
1020 }
1021 }
1022}