1use crate::{
24 error::{Error, NegotiationError, SubstreamError},
25 multistream_select::NegotiationError::Failed as MultistreamFailed,
26 protocol::{
27 request_response::handle::InnerRequestResponseEvent, Direction, TransportEvent,
28 TransportService,
29 },
30 substream::Substream,
31 types::{protocol::ProtocolName, RequestId, SubstreamId},
32 utils::futures_stream::FuturesStream,
33 PeerId,
34};
35
36use bytes::BytesMut;
37use futures::{channel, future::BoxFuture, stream::FuturesUnordered, StreamExt};
38use tokio::{
39 sync::{
40 mpsc::{Receiver, Sender},
41 oneshot,
42 },
43 time::sleep,
44};
45
46use std::{
47 collections::{hash_map::Entry, HashMap, HashSet},
48 io::ErrorKind,
49 sync::{
50 atomic::{AtomicUsize, Ordering},
51 Arc,
52 },
53 time::Duration,
54};
55
56pub use config::{Config, ConfigBuilder};
57pub use handle::{
58 DialOptions, RejectReason, RequestResponseCommand, RequestResponseError, RequestResponseEvent,
59 RequestResponseHandle,
60};
61
62mod config;
63mod handle;
64#[cfg(test)]
65mod tests;
66
67const LOG_TARGET: &str = "litep2p::request-response::protocol";
69
70const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
72
73type PendingRequest = (
75 PeerId,
76 RequestId,
77 Option<ProtocolName>,
78 Result<Vec<u8>, RequestResponseError>,
79);
80
81struct RequestContext {
83 peer: PeerId,
85
86 request_id: RequestId,
88
89 request: Vec<u8>,
91
92 fallback: Option<(ProtocolName, Vec<u8>)>,
94}
95
96impl RequestContext {
97 fn new(
99 peer: PeerId,
100 request_id: RequestId,
101 request: Vec<u8>,
102 fallback: Option<(ProtocolName, Vec<u8>)>,
103 ) -> Self {
104 Self {
105 peer,
106 request_id,
107 request,
108 fallback,
109 }
110 }
111}
112
113struct PeerContext {
115 active: HashSet<RequestId>,
117
118 active_inbound: HashMap<RequestId, Option<ProtocolName>>,
120}
121
122impl PeerContext {
123 fn new() -> Self {
125 Self {
126 active: HashSet::new(),
127 active_inbound: HashMap::new(),
128 }
129 }
130}
131
132pub(crate) struct RequestResponseProtocol {
134 service: TransportService,
136
137 protocol: ProtocolName,
139
140 peers: HashMap<PeerId, PeerContext>,
142
143 pending_outbound: HashMap<SubstreamId, RequestContext>,
145
146 pending_outbound_responses: FuturesUnordered<BoxFuture<'static, ()>>,
155
156 pending_outbound_cancels: HashMap<RequestId, oneshot::Sender<()>>,
158
159 pending_inbound: FuturesUnordered<BoxFuture<'static, PendingRequest>>,
161
162 pending_inbound_requests: FuturesStream<
164 BoxFuture<
165 'static,
166 (
167 PeerId,
168 RequestId,
169 Result<BytesMut, SubstreamError>,
170 Substream,
171 ),
172 >,
173 >,
174
175 pending_dials: HashMap<PeerId, RequestContext>,
177
178 event_tx: Sender<InnerRequestResponseEvent>,
180
181 command_rx: Receiver<RequestResponseCommand>,
183
184 next_request_id: Arc<AtomicUsize>,
186
187 timeout: Duration,
189
190 max_concurrent_inbound_requests: Option<usize>,
192}
193
194impl RequestResponseProtocol {
195 pub(crate) fn new(service: TransportService, config: Config) -> Self {
197 Self {
198 service,
199 peers: HashMap::new(),
200 timeout: config.timeout,
201 next_request_id: config.next_request_id,
202 event_tx: config.event_tx,
203 command_rx: config.command_rx,
204 protocol: config.protocol_name,
205 pending_dials: HashMap::new(),
206 pending_outbound: HashMap::new(),
207 pending_inbound: FuturesUnordered::new(),
208 pending_outbound_cancels: HashMap::new(),
209 pending_inbound_requests: FuturesStream::new(),
210 pending_outbound_responses: FuturesUnordered::new(),
211 max_concurrent_inbound_requests: config.max_concurrent_inbound_request,
212 }
213 }
214
215 fn next_request_id(&mut self) -> RequestId {
217 RequestId::from(self.next_request_id.fetch_add(1usize, Ordering::Relaxed))
218 }
219
220 async fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
222 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection established");
223
224 let Entry::Vacant(entry) = self.peers.entry(peer) else {
225 tracing::error!(
226 target: LOG_TARGET,
227 ?peer,
228 "state mismatch: peer already exists",
229 );
230 debug_assert!(false);
231 return Err(Error::PeerAlreadyExists(peer));
232 };
233
234 match self.pending_dials.remove(&peer) {
235 None => {
236 tracing::debug!(
237 target: LOG_TARGET,
238 ?peer,
239 protocol = %self.protocol,
240 "peer connected without pending dial",
241 );
242 entry.insert(PeerContext::new());
243 }
244 Some(context) => match self.service.open_substream(peer) {
245 Ok(substream_id) => {
246 tracing::trace!(
247 target: LOG_TARGET,
248 ?peer,
249 protocol = %self.protocol,
250 request_id = ?context.request_id,
251 ?substream_id,
252 "dial succeeded, open substream",
253 );
254
255 entry.insert(PeerContext {
256 active: HashSet::from_iter([context.request_id]),
257 active_inbound: HashMap::new(),
258 });
259 self.pending_outbound.insert(
260 substream_id,
261 RequestContext::new(
262 peer,
263 context.request_id,
264 context.request,
265 context.fallback,
266 ),
267 );
268 }
269 Err(error) => {
274 tracing::warn!(
275 target: LOG_TARGET,
276 ?peer,
277 protocol = %self.protocol,
278 request_id = ?context.request_id,
279 ?error,
280 "failed to open substream",
281 );
282
283 return self
284 .report_request_failure(
285 peer,
286 context.request_id,
287 RequestResponseError::Rejected(error.into()),
288 )
289 .await;
290 }
291 },
292 }
293
294 Ok(())
295 }
296
297 async fn on_connection_closed(&mut self, peer: PeerId) {
299 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
300
301 self.pending_outbound.retain(|_, context| context.peer != peer);
303
304 let Some(context) = self.peers.remove(&peer) else {
305 tracing::error!(
306 target: LOG_TARGET,
307 ?peer,
308 "Peer does not exist or substream open failed during connection establishment",
309 );
310 return;
311 };
312
313 for request_id in context.active {
315 let _ = self
316 .event_tx
317 .send(InnerRequestResponseEvent::RequestFailed {
318 peer,
319 request_id,
320 error: RequestResponseError::Rejected(RejectReason::ConnectionClosed),
321 })
322 .await;
323 }
324 }
325
326 async fn on_outbound_substream(
328 &mut self,
329 peer: PeerId,
330 substream_id: SubstreamId,
331 mut substream: Substream,
332 fallback_protocol: Option<ProtocolName>,
333 ) -> crate::Result<()> {
334 let Some(RequestContext {
335 request_id,
336 request,
337 fallback,
338 ..
339 }) = self.pending_outbound.remove(&substream_id)
340 else {
341 tracing::error!(
342 target: LOG_TARGET,
343 ?peer,
344 protocol = %self.protocol,
345 ?substream_id,
346 "pending outbound request does not exist",
347 );
348 debug_assert!(false);
349
350 return Err(Error::InvalidState);
351 };
352
353 tracing::trace!(
354 target: LOG_TARGET,
355 ?peer,
356 protocol = %self.protocol,
357 ?substream_id,
358 ?request_id,
359 "substream opened, send request",
360 );
361
362 let request = match (&fallback_protocol, fallback) {
363 (Some(protocol), Some((fallback_protocol, fallback_request)))
364 if protocol == &fallback_protocol =>
365 fallback_request,
366 _ => request,
367 };
368
369 let request_timeout = self.timeout;
370 let protocol = self.protocol.clone();
371 let (tx, rx) = oneshot::channel();
372 self.pending_outbound_cancels.insert(request_id, tx);
373
374 self.pending_inbound.push(Box::pin(async move {
375 match tokio::time::timeout(request_timeout, substream.send_framed(request.into())).await
376 {
377 Err(_) => (
378 peer,
379 request_id,
380 fallback_protocol,
381 Err(RequestResponseError::Timeout),
382 ),
383 Ok(Err(SubstreamError::IoError(ErrorKind::PermissionDenied))) => {
384 tracing::warn!(
385 target: LOG_TARGET,
386 ?peer,
387 %protocol,
388 "tried to send too large request",
389 );
390
391 (
392 peer,
393 request_id,
394 fallback_protocol,
395 Err(RequestResponseError::TooLargePayload),
396 )
397 }
398 Ok(Err(error)) => (
399 peer,
400 request_id,
401 fallback_protocol,
402 Err(RequestResponseError::Rejected(error.into())),
403 ),
404 Ok(Ok(_)) => {
405 tokio::select! {
406 _ = rx => {
407 tracing::debug!(
408 target: LOG_TARGET,
409 ?peer,
410 %protocol,
411 ?request_id,
412 "request canceled",
413 );
414
415 let _ = substream.close().await;
416 (
417 peer,
418 request_id,
419 fallback_protocol,
420 Err(RequestResponseError::Canceled))
421 }
422 _ = sleep(request_timeout) => {
423 tracing::debug!(
424 target: LOG_TARGET,
425 ?peer,
426 %protocol,
427 ?request_id,
428 "request timed out",
429 );
430
431 let _ = substream.close().await;
432 (peer, request_id, fallback_protocol, Err(RequestResponseError::Timeout))
433 }
434 event = substream.next() => match event {
435 Some(Ok(response)) => {
436 (peer, request_id, fallback_protocol, Ok(response.freeze().into()))
437 },
438 Some(Err(error)) => {
439 (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(error.into())))
440 },
441 None => {
442 tracing::debug!(
443 target: LOG_TARGET,
444 ?peer,
445 %protocol,
446 ?request_id,
447 "substream closed",
448 );
449 (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(RejectReason::SubstreamClosed)))
450 }
451 }
452 }
453 }
454 }
455 }));
456
457 Ok(())
458 }
459
460 async fn on_inbound_request(
462 &mut self,
463 peer: PeerId,
464 request_id: RequestId,
465 request: Result<BytesMut, SubstreamError>,
466 mut substream: Substream,
467 ) -> crate::Result<()> {
468 let peer_context = self.peers.get_mut(&peer).ok_or(Error::PeerDoesntExist(peer))?;
470 let fallback = peer_context.active_inbound.remove(&request_id).ok_or_else(|| {
471 tracing::debug!(
472 target: LOG_TARGET,
473 ?peer,
474 protocol = %self.protocol,
475 ?request_id,
476 "no active inbound request",
477 );
478
479 Error::InvalidState
480 })?;
481
482 let protocol = self.protocol.clone();
483
484 tracing::trace!(
485 target: LOG_TARGET,
486 ?peer,
487 %protocol,
488 ?request_id,
489 "inbound request",
490 );
491
492 let Ok(request) = request else {
493 tracing::debug!(
494 target: LOG_TARGET,
495 ?peer,
496 %protocol,
497 ?request_id,
498 ?request,
499 "failed to read request from substream",
500 );
501 return Err(Error::InvalidData);
502 };
503
504 let timeout = self.timeout;
510 let (response_tx, rx): (
511 oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>,
512 _,
513 ) = oneshot::channel();
514
515 self.pending_outbound_responses.push(Box::pin(async move {
516 match rx.await {
517 Err(_) => {
518 tracing::debug!(
519 target: LOG_TARGET,
520 ?peer,
521 %protocol,
522 ?request_id,
523 "request rejected",
524 );
525 let _ = substream.close().await;
526 }
527 Ok((response, mut feedback)) => {
528 tracing::trace!(
529 target: LOG_TARGET,
530 ?peer,
531 %protocol,
532 ?request_id,
533 "send response",
534 );
535
536 match tokio::time::timeout(timeout, substream.send_framed(response.into()))
537 .await
538 {
539 Err(_) => tracing::debug!(
540 target: LOG_TARGET,
541 ?peer,
542 %protocol,
543 ?request_id,
544 "timed out while sending response",
545 ),
546 Ok(Ok(_)) => feedback.take().map_or((), |feedback| {
547 let _ = feedback.send(());
548 }),
549 Ok(Err(error)) => tracing::trace!(
550 target: LOG_TARGET,
551 ?peer,
552 %protocol,
553 ?request_id,
554 ?error,
555 "failed to send request to peer",
556 ),
557 }
558 }
559 }
560 }));
561
562 self.event_tx
563 .send(InnerRequestResponseEvent::RequestReceived {
564 peer,
565 fallback,
566 request_id,
567 request: request.freeze().into(),
568 response_tx,
569 })
570 .await
571 .map_err(From::from)
572 }
573
574 async fn on_inbound_substream(
576 &mut self,
577 peer: PeerId,
578 fallback: Option<ProtocolName>,
579 mut substream: Substream,
580 ) -> crate::Result<()> {
581 tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "handle inbound substream");
582
583 if let Some(max_requests) = self.max_concurrent_inbound_requests {
584 let num_inbound_requests =
585 self.pending_inbound_requests.len() + self.pending_outbound_responses.len();
586
587 if max_requests <= num_inbound_requests {
588 tracing::debug!(
589 target: LOG_TARGET,
590 ?peer,
591 protocol = %self.protocol,
592 ?fallback,
593 ?max_requests,
594 "rejecting request as already at maximum",
595 );
596
597 let _ = substream.close().await;
598 return Ok(());
599 }
600 }
601
602 let request_id = self.next_request_id();
607 self.peers
608 .get_mut(&peer)
609 .ok_or(Error::PeerDoesntExist(peer))?
610 .active_inbound
611 .insert(request_id, fallback);
612
613 self.pending_inbound_requests.push(Box::pin(async move {
614 let request = match substream.next().await {
615 Some(Ok(request)) => Ok(request),
616 Some(Err(error)) => Err(error),
617 None => Err(SubstreamError::ConnectionClosed),
618 };
619
620 (peer, request_id, request, substream)
621 }));
622
623 Ok(())
624 }
625
626 async fn on_dial_failure(&mut self, peer: PeerId) {
627 if let Some(context) = self.pending_dials.remove(&peer) {
628 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "failed to dial peer");
629
630 let _ = self
631 .peers
632 .get_mut(&peer)
633 .map(|peer_context| peer_context.active.remove(&context.request_id));
634 let _ = self
635 .report_request_failure(
636 peer,
637 context.request_id,
638 RequestResponseError::Rejected(RejectReason::DialFailed(None)),
639 )
640 .await;
641 }
642 }
643
644 async fn on_substream_open_failure(
646 &mut self,
647 substream: SubstreamId,
648 error: SubstreamError,
649 ) -> crate::Result<()> {
650 let Some(RequestContext {
651 request_id, peer, ..
652 }) = self.pending_outbound.remove(&substream)
653 else {
654 tracing::error!(
655 target: LOG_TARGET,
656 protocol = %self.protocol,
657 ?substream,
658 "pending outbound request does not exist",
659 );
660 debug_assert!(false);
661
662 return Err(Error::InvalidState);
663 };
664
665 tracing::debug!(
666 target: LOG_TARGET,
667 ?peer,
668 protocol = %self.protocol,
669 ?request_id,
670 ?substream,
671 ?error,
672 "failed to open substream",
673 );
674
675 let _ = self
676 .peers
677 .get_mut(&peer)
678 .map(|peer_context| peer_context.active.remove(&request_id));
679
680 self.event_tx
681 .send(InnerRequestResponseEvent::RequestFailed {
682 peer,
683 request_id,
684 error: match error {
685 SubstreamError::NegotiationError(NegotiationError::MultistreamSelectError(
686 MultistreamFailed,
687 )) => RequestResponseError::UnsupportedProtocol,
688 _ => RequestResponseError::Rejected(error.into()),
689 },
690 })
691 .await
692 .map_err(From::from)
693 }
694
695 async fn report_request_failure(
697 &mut self,
698 peer: PeerId,
699 request_id: RequestId,
700 error: RequestResponseError,
701 ) -> crate::Result<()> {
702 self.event_tx
703 .send(InnerRequestResponseEvent::RequestFailed {
704 peer,
705 request_id,
706 error,
707 })
708 .await
709 .map_err(From::from)
710 }
711
712 fn on_send_request(
714 &mut self,
715 peer: PeerId,
716 request_id: RequestId,
717 request: Vec<u8>,
718 dial_options: DialOptions,
719 fallback: Option<(ProtocolName, Vec<u8>)>,
720 ) -> Result<(), RequestResponseError> {
721 tracing::trace!(
722 target: LOG_TARGET,
723 ?peer,
724 protocol = %self.protocol,
725 ?request_id,
726 ?dial_options,
727 "send request to remote peer",
728 );
729
730 let Some(context) = self.peers.get_mut(&peer) else {
731 match dial_options {
732 DialOptions::Reject => {
733 tracing::debug!(
734 target: LOG_TARGET,
735 ?peer,
736 protocol = %self.protocol,
737 ?request_id,
738 ?dial_options,
739 "peer not connected and should not dial",
740 );
741
742 return Err(RequestResponseError::NotConnected);
743 }
744 DialOptions::Dial => match self.service.dial(&peer) {
745 Ok(_) => {
746 tracing::trace!(
747 target: LOG_TARGET,
748 ?peer,
749 protocol = %self.protocol,
750 ?request_id,
751 "started dialing peer",
752 );
753
754 self.pending_dials.insert(
755 peer,
756 RequestContext::new(peer, request_id, request, fallback),
757 );
758 return Ok(());
759 }
760 Err(error) => {
761 tracing::debug!(
762 target: LOG_TARGET,
763 ?peer,
764 protocol = %self.protocol,
765 ?error,
766 "failed to dial peer"
767 );
768
769 return Err(RequestResponseError::Rejected(RejectReason::DialFailed(
770 Some(error),
771 )));
772 }
773 },
774 }
775 };
776
777 match self.service.open_substream(peer) {
780 Ok(substream_id) => {
781 let unique_request_id = context.active.insert(request_id);
782 debug_assert!(unique_request_id);
783
784 self.pending_outbound.insert(
785 substream_id,
786 RequestContext::new(peer, request_id, request, fallback),
787 );
788
789 Ok(())
790 }
791 Err(error) => {
792 tracing::debug!(
793 target: LOG_TARGET,
794 ?peer,
795 protocol = %self.protocol,
796 ?request_id,
797 ?error,
798 "failed to open substream",
799 );
800
801 Err(RequestResponseError::Rejected(error.into()))
802 }
803 }
804 }
805
806 async fn on_substream_event(
808 &mut self,
809 peer: PeerId,
810 request_id: RequestId,
811 fallback: Option<ProtocolName>,
812 message: Result<Vec<u8>, RequestResponseError>,
813 ) -> crate::Result<()> {
814 if !self
815 .peers
816 .get_mut(&peer)
817 .ok_or(Error::PeerDoesntExist(peer))?
818 .active
819 .remove(&request_id)
820 {
821 tracing::warn!(
822 target: LOG_TARGET,
823 ?peer,
824 protocol = %self.protocol,
825 ?request_id,
826 "invalid state: received substream event but no active substream",
827 );
828 return Err(Error::InvalidState);
829 }
830
831 let event = match message {
832 Ok(response) => InnerRequestResponseEvent::ResponseReceived {
833 peer,
834 request_id,
835 response,
836 fallback,
837 },
838 Err(error) => match error {
839 RequestResponseError::Canceled => {
840 tracing::debug!(
841 target: LOG_TARGET,
842 ?peer,
843 protocol = %self.protocol,
844 ?request_id,
845 "request canceled by local node",
846 );
847 return Ok(());
848 }
849 error => InnerRequestResponseEvent::RequestFailed {
850 peer,
851 request_id,
852 error,
853 },
854 },
855 };
856
857 self.event_tx.send(event).await.map_err(From::from)
858 }
859
860 fn on_cancel_request(&mut self, request_id: RequestId) -> crate::Result<()> {
862 tracing::trace!(target: LOG_TARGET, protocol = %self.protocol, ?request_id, "cancel outbound request");
863
864 match self.pending_outbound_cancels.remove(&request_id) {
865 Some(tx) => tx.send(()).map_err(|_| Error::SubstreamDoesntExist),
866 None => {
867 tracing::debug!(
868 target: LOG_TARGET,
869 protocol = %self.protocol,
870 ?request_id,
871 "tried to cancel request which doesn't exist",
872 );
873
874 Ok(())
875 }
876 }
877 }
878
879 async fn handle_service_event(&mut self, event: TransportEvent) {
881 match event {
882 TransportEvent::ConnectionEstablished { peer, .. } => {
883 if let Err(error) = self.on_connection_established(peer).await {
884 tracing::debug!(
885 target: LOG_TARGET,
886 ?peer,
887 protocol = %self.protocol,
888 ?error,
889 "failed to handle connection established",
890 );
891 }
892 }
893
894 TransportEvent::ConnectionClosed { peer } => {
895 self.on_connection_closed(peer).await;
896 }
897
898 TransportEvent::SubstreamOpened {
899 peer,
900 substream,
901 direction,
902 fallback,
903 ..
904 } => match direction {
905 Direction::Inbound => {
906 if let Err(error) = self.on_inbound_substream(peer, fallback, substream).await {
907 tracing::debug!(
908 target: LOG_TARGET,
909 ?peer,
910 protocol = %self.protocol,
911 ?error,
912 "failed to handle inbound substream",
913 );
914 }
915 }
916 Direction::Outbound(substream_id) => {
917 let _ =
918 self.on_outbound_substream(peer, substream_id, substream, fallback).await;
919 }
920 },
921
922 TransportEvent::SubstreamOpenFailure { substream, error } => {
923 if let Err(error) = self.on_substream_open_failure(substream, error).await {
924 tracing::warn!(
925 target: LOG_TARGET,
926 protocol = %self.protocol,
927 ?error,
928 "failed to handle substream open failure",
929 );
930 }
931 }
932
933 TransportEvent::DialFailure { peer, .. } => self.on_dial_failure(peer).await,
934 }
935 }
936
937 async fn handle_user_command(&mut self, command: RequestResponseCommand) {
939 match command {
940 RequestResponseCommand::SendRequest {
941 peer,
942 request_id,
943 request,
944 dial_options,
945 } => {
946 if let Err(error) =
947 self.on_send_request(peer, request_id, request, dial_options, None)
948 {
949 tracing::debug!(
950 target: LOG_TARGET,
951 ?peer,
952 protocol = %self.protocol,
953 ?request_id,
954 ?error,
955 "failed to send request",
956 );
957
958 if let Err(error) = self.report_request_failure(peer, request_id, error).await {
959 tracing::debug!(
960 target: LOG_TARGET,
961 ?peer,
962 protocol = %self.protocol,
963 ?request_id,
964 ?error,
965 "failed to report request failure",
966 );
967 }
968 }
969 }
970 RequestResponseCommand::SendRequestWithFallback {
971 peer,
972 request_id,
973 request,
974 fallback,
975 dial_options,
976 } => {
977 if let Err(error) =
978 self.on_send_request(peer, request_id, request, dial_options, Some(fallback))
979 {
980 tracing::debug!(
981 target: LOG_TARGET,
982 ?peer,
983 protocol = %self.protocol,
984 ?request_id,
985 ?error,
986 "failed to send request",
987 );
988
989 if let Err(error) = self.report_request_failure(peer, request_id, error).await {
990 tracing::debug!(
991 target: LOG_TARGET,
992 ?peer,
993 protocol = %self.protocol,
994 ?request_id,
995 ?error,
996 "failed to report request failure",
997 );
998 }
999 }
1000 }
1001 RequestResponseCommand::CancelRequest { request_id } => {
1002 if let Err(error) = self.on_cancel_request(request_id) {
1003 tracing::debug!(
1004 target: LOG_TARGET,
1005 protocol = %self.protocol,
1006 ?request_id,
1007 ?error,
1008 "failed to cancel reqeuest",
1009 );
1010 }
1011 }
1012 }
1013 }
1014
1015 pub async fn run(mut self) {
1017 tracing::debug!(target: LOG_TARGET, "starting request-response event loop");
1018
1019 loop {
1020 tokio::select! {
1021 biased;
1024
1025 event = self.service.next() => match event {
1027 Some(event) => self.handle_service_event(event).await,
1028 None => {
1029 tracing::debug!(target: LOG_TARGET, protocol = %self.protocol, "service has exited, exiting");
1030 return
1031 }
1032 },
1033
1034 event = self.pending_inbound.select_next_some(), if !self.pending_inbound.is_empty() => {
1036 let (peer, request_id, fallback, event) = event;
1037
1038 if let Err(error) = self.on_substream_event(peer, request_id, fallback, event).await {
1039 tracing::debug!(
1040 target: LOG_TARGET,
1041 ?peer,
1042 protocol = %self.protocol,
1043 ?request_id,
1044 ?error,
1045 "failed to handle substream event",
1046 );
1047 }
1048
1049 self.pending_outbound_cancels.remove(&request_id);
1050 }
1051
1052 _ = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => {}
1054
1055 event = self.pending_inbound_requests.next(), if !self.pending_inbound_requests.is_empty() => match event {
1057 Some((peer, request_id, request, substream)) => {
1058 if let Err(error) = self.on_inbound_request(peer, request_id, request, substream).await {
1059 tracing::debug!(
1060 target: LOG_TARGET,
1061 ?peer,
1062 protocol = %self.protocol,
1063 ?request_id,
1064 ?error,
1065 "failed to handle inbound request",
1066 );
1067 }
1068 }
1069 None => return,
1070 },
1071
1072 command = self.command_rx.recv() => match command {
1074 Some(command) => self.handle_user_command(command).await,
1075 None => {
1076 tracing::debug!(target: LOG_TARGET, protocol = %self.protocol, "user protocol has exited, exiting");
1077 return
1078 }
1079 },
1080 }
1081 }
1082 }
1083}