litep2p/protocol/request_response/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Request-response protocol implementation.
22
23use crate::{
24    error::{Error, NegotiationError, SubstreamError},
25    multistream_select::NegotiationError::Failed as MultistreamFailed,
26    protocol::{
27        request_response::handle::{InnerRequestResponseEvent, RequestResponseCommand},
28        Direction, TransportEvent, TransportService,
29    },
30    substream::{Substream, SubstreamSet},
31    types::{protocol::ProtocolName, RequestId, SubstreamId},
32    PeerId,
33};
34
35use bytes::BytesMut;
36use futures::{channel, future::BoxFuture, stream::FuturesUnordered, StreamExt};
37use tokio::{
38    sync::{
39        mpsc::{Receiver, Sender},
40        oneshot,
41    },
42    time::sleep,
43};
44
45use std::{
46    collections::{hash_map::Entry, HashMap, HashSet},
47    io::ErrorKind,
48    sync::{
49        atomic::{AtomicUsize, Ordering},
50        Arc,
51    },
52    time::Duration,
53};
54
55pub use config::{Config, ConfigBuilder};
56pub use handle::{
57    DialOptions, RejectReason, RequestResponseError, RequestResponseEvent, RequestResponseHandle,
58};
59
60mod config;
61mod handle;
62#[cfg(test)]
63mod tests;
64
65// TODO: add ability to specify limit for inbound requests?
66// TODO: convert inbound/outbound substreams to use `oneshot:Sender<()>` for sending/rejecting
67// responses.       this way, the code dealing with rejecting/responding doesn't have to block.
68
69/// Logging target for the file.
70const LOG_TARGET: &str = "litep2p::request-response::protocol";
71
72/// Default request timeout.
73const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
74
75/// Pending request.
76type PendingRequest = (
77    PeerId,
78    RequestId,
79    Option<ProtocolName>,
80    Result<Vec<u8>, RequestResponseError>,
81);
82
83/// Request context.
84struct RequestContext {
85    /// Peer ID.
86    peer: PeerId,
87
88    /// Request ID.
89    request_id: RequestId,
90
91    /// Request.
92    request: Vec<u8>,
93
94    /// Fallback request.
95    fallback: Option<(ProtocolName, Vec<u8>)>,
96}
97
98impl RequestContext {
99    /// Create new [`RequestContext`].
100    fn new(
101        peer: PeerId,
102        request_id: RequestId,
103        request: Vec<u8>,
104        fallback: Option<(ProtocolName, Vec<u8>)>,
105    ) -> Self {
106        Self {
107            peer,
108            request_id,
109            request,
110            fallback,
111        }
112    }
113}
114
115/// Peer context.
116struct PeerContext {
117    /// Active requests.
118    active: HashSet<RequestId>,
119
120    /// Active inbound requests and their fallback names.
121    active_inbound: HashMap<RequestId, Option<ProtocolName>>,
122}
123
124impl PeerContext {
125    /// Create new [`PeerContext`].
126    fn new() -> Self {
127        Self {
128            active: HashSet::new(),
129            active_inbound: HashMap::new(),
130        }
131    }
132}
133
134/// Request-response protocol.
135pub(crate) struct RequestResponseProtocol {
136    /// Transport service.
137    service: TransportService,
138
139    /// Protocol.
140    protocol: ProtocolName,
141
142    /// Connected peers.
143    peers: HashMap<PeerId, PeerContext>,
144
145    /// Pending outbound substreams, mapped from `SubstreamId` to `RequestId`.
146    pending_outbound: HashMap<SubstreamId, RequestContext>,
147
148    /// Pending outbound responses.
149    ///
150    /// The future listens to a `oneshot::Sender` which is given to `RequestResponseHandle`.
151    /// If the request is accepted by the local node, the response is sent over the channel to the
152    /// the future which sends it to remote peer and closes the substream.
153    ///
154    /// If the substream is rejected by the local node, the `oneshot::Sender` is dropped which
155    /// notifies the future that the request should be rejected by closing the substream.
156    pending_outbound_responses: FuturesUnordered<BoxFuture<'static, ()>>,
157
158    /// Pending inbound responses.
159    pending_inbound: FuturesUnordered<BoxFuture<'static, PendingRequest>>,
160
161    /// Pending outbound cancellation handles.
162    pending_outbound_cancels: HashMap<RequestId, oneshot::Sender<()>>,
163
164    /// Pending inbound requests.
165    pending_inbound_requests: SubstreamSet<(PeerId, RequestId), Substream>,
166
167    /// Pending dials for outbound requests.
168    pending_dials: HashMap<PeerId, RequestContext>,
169
170    /// TX channel for sending events to the user protocol.
171    event_tx: Sender<InnerRequestResponseEvent>,
172
173    /// RX channel for receive commands from the `RequestResponseHandle`.
174    command_rx: Receiver<RequestResponseCommand>,
175
176    /// Next request ID.
177    ///
178    /// Inbound requests are assigned an ephemeral ID TODO: finish
179    next_request_id: Arc<AtomicUsize>,
180
181    /// Timeout for outbound requests.
182    timeout: Duration,
183
184    /// Maximum concurrent inbound requests, if specified.
185    max_concurrent_inbound_requests: Option<usize>,
186}
187
188impl RequestResponseProtocol {
189    /// Create new [`RequestResponseProtocol`].
190    pub(crate) fn new(service: TransportService, config: Config) -> Self {
191        Self {
192            service,
193            peers: HashMap::new(),
194            timeout: config.timeout,
195            next_request_id: config.next_request_id,
196            event_tx: config.event_tx,
197            command_rx: config.command_rx,
198            protocol: config.protocol_name,
199            pending_dials: HashMap::new(),
200            pending_outbound: HashMap::new(),
201            pending_inbound: FuturesUnordered::new(),
202            pending_outbound_cancels: HashMap::new(),
203            pending_inbound_requests: SubstreamSet::new(),
204            pending_outbound_responses: FuturesUnordered::new(),
205            max_concurrent_inbound_requests: config.max_concurrent_inbound_request,
206        }
207    }
208
209    /// Get next ephemeral request ID.
210    fn next_request_id(&mut self) -> RequestId {
211        RequestId::from(self.next_request_id.fetch_add(1usize, Ordering::Relaxed))
212    }
213
214    /// Connection established to remote peer.
215    async fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
216        tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection established");
217
218        let Entry::Vacant(entry) = self.peers.entry(peer) else {
219            tracing::error!(
220                target: LOG_TARGET,
221                ?peer,
222                "state mismatch: peer already exists",
223            );
224            debug_assert!(false);
225            return Err(Error::PeerAlreadyExists(peer));
226        };
227
228        match self.pending_dials.remove(&peer) {
229            None => {
230                entry.insert(PeerContext::new());
231            }
232            Some(context) => match self.service.open_substream(peer) {
233                Ok(substream_id) => {
234                    tracing::trace!(
235                        target: LOG_TARGET,
236                        ?peer,
237                        protocol = %self.protocol,
238                        request_id = ?context.request_id,
239                        ?substream_id,
240                        "dial succeeded, open substream",
241                    );
242
243                    entry.insert(PeerContext {
244                        active: HashSet::from_iter([context.request_id]),
245                        active_inbound: HashMap::new(),
246                    });
247                    self.pending_outbound.insert(
248                        substream_id,
249                        RequestContext::new(
250                            peer,
251                            context.request_id,
252                            context.request,
253                            context.fallback,
254                        ),
255                    );
256                }
257                // only reason the substream would fail to open would be that the connection
258                // would've been reported to the protocol with enough delay that the keep-alive
259                // timeout had expired and no other protocol had opened a substream to it, causing
260                // the connection to be closed
261                Err(error) => {
262                    tracing::warn!(
263                        target: LOG_TARGET,
264                        ?peer,
265                        protocol = %self.protocol,
266                        request_id = ?context.request_id,
267                        ?error,
268                        "failed to open substream",
269                    );
270
271                    return self
272                        .report_request_failure(
273                            peer,
274                            context.request_id,
275                            RequestResponseError::Rejected(error.into()),
276                        )
277                        .await;
278                }
279            },
280        }
281
282        Ok(())
283    }
284
285    /// Connection closed to remote peer.
286    async fn on_connection_closed(&mut self, peer: PeerId) {
287        tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
288
289        let Some(context) = self.peers.remove(&peer) else {
290            tracing::error!(
291                target: LOG_TARGET,
292                ?peer,
293                "state mismatch: peer doesn't exist",
294            );
295            debug_assert!(false);
296            return;
297        };
298
299        // sent failure events for all pending outbound requests
300        for request_id in context.active {
301            let _ = self
302                .event_tx
303                .send(InnerRequestResponseEvent::RequestFailed {
304                    peer,
305                    request_id,
306                    error: RequestResponseError::Rejected(RejectReason::ConnectionClosed),
307                })
308                .await;
309        }
310
311        // remove all pending inbound requests
312        for (request_id, _) in context.active_inbound {
313            self.pending_inbound_requests.remove(&(peer, request_id));
314        }
315    }
316
317    /// Local node opened a substream to remote node.
318    async fn on_outbound_substream(
319        &mut self,
320        peer: PeerId,
321        substream_id: SubstreamId,
322        mut substream: Substream,
323        fallback_protocol: Option<ProtocolName>,
324    ) -> crate::Result<()> {
325        let Some(RequestContext {
326            request_id,
327            request,
328            fallback,
329            ..
330        }) = self.pending_outbound.remove(&substream_id)
331        else {
332            tracing::error!(
333                target: LOG_TARGET,
334                ?peer,
335                protocol = %self.protocol,
336                ?substream_id,
337                "pending outbound request does not exist",
338            );
339            debug_assert!(false);
340
341            return Err(Error::InvalidState);
342        };
343
344        tracing::trace!(
345            target: LOG_TARGET,
346            ?peer,
347            protocol = %self.protocol,
348            ?substream_id,
349            ?request_id,
350            "substream opened, send request",
351        );
352
353        let request = match (&fallback_protocol, fallback) {
354            (Some(protocol), Some((fallback_protocol, fallback_request)))
355                if protocol == &fallback_protocol =>
356                fallback_request,
357            _ => request,
358        };
359
360        let request_timeout = self.timeout;
361        let protocol = self.protocol.clone();
362        let (tx, rx) = oneshot::channel();
363        self.pending_outbound_cancels.insert(request_id, tx);
364
365        self.pending_inbound.push(Box::pin(async move {
366            match tokio::time::timeout(request_timeout, substream.send_framed(request.into())).await
367            {
368                Err(_) => (
369                    peer,
370                    request_id,
371                    fallback_protocol,
372                    Err(RequestResponseError::Timeout),
373                ),
374                Ok(Err(SubstreamError::IoError(ErrorKind::PermissionDenied))) => {
375                    tracing::warn!(
376                        target: LOG_TARGET,
377                        ?peer,
378                        %protocol,
379                        "tried to send too large request",
380                    );
381
382                    (
383                        peer,
384                        request_id,
385                        fallback_protocol,
386                        Err(RequestResponseError::TooLargePayload),
387                    )
388                }
389                Ok(Err(error)) => (
390                    peer,
391                    request_id,
392                    fallback_protocol,
393                    Err(RequestResponseError::Rejected(error.into())),
394                ),
395                Ok(Ok(_)) => {
396                    tokio::select! {
397                        _ = rx => {
398                            tracing::debug!(
399                                target: LOG_TARGET,
400                                ?peer,
401                                %protocol,
402                                ?request_id,
403                                "request canceled",
404                            );
405
406                            let _ = substream.close().await;
407                            (
408                                peer,
409                                request_id,
410                                fallback_protocol,
411                                Err(RequestResponseError::Canceled))
412                        }
413                        _ = sleep(request_timeout) => {
414                            tracing::debug!(
415                                target: LOG_TARGET,
416                                ?peer,
417                                %protocol,
418                                ?request_id,
419                                "request timed out",
420                            );
421
422                            let _ = substream.close().await;
423                            (peer, request_id, fallback_protocol, Err(RequestResponseError::Timeout))
424                        }
425                        event = substream.next() => match event {
426                            Some(Ok(response)) => {
427                                (peer, request_id, fallback_protocol, Ok(response.freeze().into()))
428                            },
429                            Some(Err(error)) => {
430                                (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(error.into())))
431                            },
432                            None => {
433                                tracing::debug!(
434                                    target: LOG_TARGET,
435                                    ?peer,
436                                    %protocol,
437                                    ?request_id,
438                                    "substream closed",
439                                );
440                                (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(RejectReason::SubstreamClosed)))
441                            }
442                        }
443                    }
444                }
445            }
446        }));
447
448        Ok(())
449    }
450
451    /// Handle pending inbound response.
452    async fn on_inbound_request(
453        &mut self,
454        peer: PeerId,
455        request_id: RequestId,
456        request: Result<BytesMut, SubstreamError>,
457    ) -> crate::Result<()> {
458        let fallback = self
459            .peers
460            .get_mut(&peer)
461            .ok_or(Error::PeerDoesntExist(peer))?
462            .active_inbound
463            .remove(&request_id)
464            .ok_or_else(|| {
465                tracing::debug!(
466                    target: LOG_TARGET,
467                    ?peer,
468                    protocol = %self.protocol,
469                    ?request_id,
470                    "no active inbound request",
471                );
472
473                Error::InvalidState
474            })?;
475        let mut substream =
476            self.pending_inbound_requests.remove(&(peer, request_id)).ok_or_else(|| {
477                tracing::debug!(
478                    target: LOG_TARGET,
479                    ?peer,
480                    protocol = %self.protocol,
481                    ?request_id,
482                    "request doesn't exist in pending requests",
483                );
484
485                Error::InvalidState
486            })?;
487        let protocol = self.protocol.clone();
488
489        tracing::trace!(
490            target: LOG_TARGET,
491            ?peer,
492            %protocol,
493            ?request_id,
494            "inbound request",
495        );
496
497        let Ok(request) = request else {
498            tracing::debug!(
499                target: LOG_TARGET,
500                ?peer,
501                %protocol,
502                ?request_id,
503                ?request,
504                "failed to read request from substream",
505            );
506            return Err(Error::InvalidData);
507        };
508
509        // once the request has been read from the substream, start a future which waits
510        // for an input from the user.
511        //
512        // the input is either a response (succes) or rejection (failure) which is communicated
513        // by sending the response over the `oneshot::Sender` or closing it, respectively.
514        let timeout = self.timeout;
515        let (response_tx, rx): (
516            oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>,
517            _,
518        ) = oneshot::channel();
519
520        self.pending_outbound_responses.push(Box::pin(async move {
521            match rx.await {
522                Err(_) => {
523                    tracing::debug!(
524                        target: LOG_TARGET,
525                        ?peer,
526                        %protocol,
527                        ?request_id,
528                        "request rejected",
529                    );
530                    let _ = substream.close().await;
531                }
532                Ok((response, mut feedback)) => {
533                    tracing::trace!(
534                        target: LOG_TARGET,
535                        ?peer,
536                        %protocol,
537                        ?request_id,
538                        "send response",
539                    );
540
541                    match tokio::time::timeout(timeout, substream.send_framed(response.into()))
542                        .await
543                    {
544                        Err(_) => tracing::debug!(
545                            target: LOG_TARGET,
546                            ?peer,
547                            %protocol,
548                            ?request_id,
549                            "timed out while sending response",
550                        ),
551                        Ok(Ok(_)) => feedback.take().map_or((), |feedback| {
552                            let _ = feedback.send(());
553                        }),
554                        Ok(Err(error)) => tracing::trace!(
555                        target: LOG_TARGET,
556                            ?peer,
557                            %protocol,
558                            ?request_id,
559                            ?error,
560                            "failed to send request to peer",
561                        ),
562                    }
563                }
564            }
565        }));
566
567        self.event_tx
568            .send(InnerRequestResponseEvent::RequestReceived {
569                peer,
570                fallback,
571                request_id,
572                request: request.freeze().into(),
573                response_tx,
574            })
575            .await
576            .map_err(From::from)
577    }
578
579    /// Remote opened a substream to local node.
580    async fn on_inbound_substream(
581        &mut self,
582        peer: PeerId,
583        fallback: Option<ProtocolName>,
584        substream: Substream,
585    ) -> crate::Result<()> {
586        tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "handle inbound substream");
587
588        if let Some(max_requests) = self.max_concurrent_inbound_requests {
589            let num_inbound_requests =
590                self.pending_inbound_requests.len() + self.pending_outbound_responses.len();
591
592            if max_requests <= num_inbound_requests {
593                tracing::debug!(
594                    target: LOG_TARGET,
595                    ?peer,
596                    protocol = %self.protocol,
597                    ?fallback,
598                    ?max_requests,
599                    "rejecting request as already at maximum",
600                );
601
602                let _ = substream.close().await;
603                return Ok(());
604            }
605        }
606
607        // allocate ephemeral id for the inbound request and return it to the user protocol
608        //
609        // when user responds to the request, this is used to associate the response with the
610        // correct substream.
611        let request_id = self.next_request_id();
612        self.peers
613            .get_mut(&peer)
614            .ok_or(Error::PeerDoesntExist(peer))?
615            .active_inbound
616            .insert(request_id, fallback);
617        self.pending_inbound_requests.insert((peer, request_id), substream);
618
619        Ok(())
620    }
621
622    async fn on_dial_failure(&mut self, peer: PeerId) {
623        if let Some(context) = self.pending_dials.remove(&peer) {
624            tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "failed to dial peer");
625
626            let _ = self
627                .peers
628                .get_mut(&peer)
629                .map(|peer_context| peer_context.active.remove(&context.request_id));
630            let _ = self
631                .report_request_failure(
632                    peer,
633                    context.request_id,
634                    RequestResponseError::Rejected(RejectReason::DialFailed(None)),
635                )
636                .await;
637        }
638    }
639
640    /// Failed to open substream to remote peer.
641    async fn on_substream_open_failure(
642        &mut self,
643        substream: SubstreamId,
644        error: SubstreamError,
645    ) -> crate::Result<()> {
646        let Some(RequestContext {
647            request_id, peer, ..
648        }) = self.pending_outbound.remove(&substream)
649        else {
650            tracing::error!(
651                target: LOG_TARGET,
652                protocol = %self.protocol,
653                ?substream,
654                "pending outbound request does not exist",
655            );
656            debug_assert!(false);
657
658            return Err(Error::InvalidState);
659        };
660
661        tracing::debug!(
662            target: LOG_TARGET,
663            ?peer,
664            protocol = %self.protocol,
665            ?request_id,
666            ?substream,
667            ?error,
668            "failed to open substream",
669        );
670
671        let _ = self
672            .peers
673            .get_mut(&peer)
674            .map(|peer_context| peer_context.active.remove(&request_id));
675
676        self.event_tx
677            .send(InnerRequestResponseEvent::RequestFailed {
678                peer,
679                request_id,
680                error: match error {
681                    SubstreamError::NegotiationError(NegotiationError::MultistreamSelectError(
682                        MultistreamFailed,
683                    )) => RequestResponseError::UnsupportedProtocol,
684                    _ => RequestResponseError::Rejected(error.into()),
685                },
686            })
687            .await
688            .map_err(From::from)
689    }
690
691    /// Report request send failure to user.
692    async fn report_request_failure(
693        &mut self,
694        peer: PeerId,
695        request_id: RequestId,
696        error: RequestResponseError,
697    ) -> crate::Result<()> {
698        self.event_tx
699            .send(InnerRequestResponseEvent::RequestFailed {
700                peer,
701                request_id,
702                error,
703            })
704            .await
705            .map_err(From::from)
706    }
707
708    /// Send request to remote peer.
709    async fn on_send_request(
710        &mut self,
711        peer: PeerId,
712        request_id: RequestId,
713        request: Vec<u8>,
714        dial_options: DialOptions,
715        fallback: Option<(ProtocolName, Vec<u8>)>,
716    ) -> crate::Result<()> {
717        tracing::trace!(
718            target: LOG_TARGET,
719            ?peer,
720            protocol = %self.protocol,
721            ?request_id,
722            ?dial_options,
723            "send request to remote peer",
724        );
725
726        let Some(context) = self.peers.get_mut(&peer) else {
727            match dial_options {
728                DialOptions::Reject => {
729                    tracing::debug!(
730                        target: LOG_TARGET,
731                        ?peer,
732                        protocol = %self.protocol,
733                        ?request_id,
734                        ?dial_options,
735                        "peer not connected and should not dial",
736                    );
737
738                    return self
739                        .report_request_failure(
740                            peer,
741                            request_id,
742                            RequestResponseError::NotConnected,
743                        )
744                        .await;
745                }
746                DialOptions::Dial => match self.service.dial(&peer) {
747                    Ok(_) => {
748                        tracing::trace!(
749                            target: LOG_TARGET,
750                            ?peer,
751                            protocol = %self.protocol,
752                            ?request_id,
753                            "started dialing peer",
754                        );
755
756                        self.pending_dials.insert(
757                            peer,
758                            RequestContext::new(peer, request_id, request, fallback),
759                        );
760                        return Ok(());
761                    }
762                    Err(error) => {
763                        tracing::debug!(
764                            target: LOG_TARGET,
765                            ?peer,
766                            protocol = %self.protocol,
767                            ?error,
768                            "failed to dial peer"
769                        );
770
771                        return self
772                            .report_request_failure(
773                                peer,
774                                request_id,
775                                RequestResponseError::Rejected(RejectReason::DialFailed(Some(
776                                    error,
777                                ))),
778                            )
779                            .await;
780                    }
781                },
782            }
783        };
784
785        // open substream and push it pending outbound substreams
786        // once the substream is opened, send the request.
787        match self.service.open_substream(peer) {
788            Ok(substream_id) => {
789                let unique_request_id = context.active.insert(request_id);
790                debug_assert!(unique_request_id);
791
792                self.pending_outbound.insert(
793                    substream_id,
794                    RequestContext::new(peer, request_id, request, fallback),
795                );
796
797                Ok(())
798            }
799            Err(error) => {
800                tracing::debug!(
801                    target: LOG_TARGET,
802                    ?peer,
803                    protocol = %self.protocol,
804                    ?request_id,
805                    ?error,
806                    "failed to open substream",
807                );
808
809                self.report_request_failure(
810                    peer,
811                    request_id,
812                    RequestResponseError::Rejected(error.into()),
813                )
814                .await
815            }
816        }
817    }
818
819    /// Handle substream event.
820    async fn on_substream_event(
821        &mut self,
822        peer: PeerId,
823        request_id: RequestId,
824        fallback: Option<ProtocolName>,
825        message: Result<Vec<u8>, RequestResponseError>,
826    ) -> crate::Result<()> {
827        if !self
828            .peers
829            .get_mut(&peer)
830            .ok_or(Error::PeerDoesntExist(peer))?
831            .active
832            .remove(&request_id)
833        {
834            tracing::warn!(
835                target: LOG_TARGET,
836                ?peer,
837                protocol = %self.protocol,
838                ?request_id,
839                "invalid state: received substream event but no active substream",
840            );
841            return Err(Error::InvalidState);
842        }
843
844        let event = match message {
845            Ok(response) => InnerRequestResponseEvent::ResponseReceived {
846                peer,
847                request_id,
848                response,
849                fallback,
850            },
851            Err(error) => match error {
852                RequestResponseError::Canceled => {
853                    tracing::debug!(
854                        target: LOG_TARGET,
855                        ?peer,
856                        protocol = %self.protocol,
857                        ?request_id,
858                        "request canceled by local node",
859                    );
860                    return Ok(());
861                }
862                error => InnerRequestResponseEvent::RequestFailed {
863                    peer,
864                    request_id,
865                    error,
866                },
867            },
868        };
869
870        self.event_tx.send(event).await.map_err(From::from)
871    }
872
873    /// Cancel outbound request.
874    async fn on_cancel_request(&mut self, request_id: RequestId) -> crate::Result<()> {
875        tracing::trace!(target: LOG_TARGET, protocol = %self.protocol, ?request_id, "cancel outbound request");
876
877        match self.pending_outbound_cancels.remove(&request_id) {
878            Some(tx) => tx.send(()).map_err(|_| Error::SubstreamDoesntExist),
879            None => {
880                tracing::debug!(
881                    target: LOG_TARGET,
882                    protocol = %self.protocol,
883                    ?request_id,
884                    "tried to cancel request which doesn't exist",
885                );
886
887                Ok(())
888            }
889        }
890    }
891
892    /// Start [`RequestResponseProtocol`] event loop.
893    pub async fn run(mut self) {
894        tracing::debug!(target: LOG_TARGET, "starting request-response event loop");
895
896        loop {
897            tokio::select! {
898                // events coming from the network have higher priority than user commands as all user commands are
899                // responses to network behaviour so ensure that the commands operate on the most up to date information.
900                biased;
901
902                event = self.service.next() => match event {
903                    Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
904                        let _ = self.on_connection_established(peer).await;
905                    }
906                    Some(TransportEvent::ConnectionClosed { peer }) => {
907                        self.on_connection_closed(peer).await;
908                    }
909                    Some(TransportEvent::SubstreamOpened {
910                        peer,
911                        substream,
912                        direction,
913                        fallback,
914                        ..
915                    }) => match direction {
916                        Direction::Inbound => {
917                            if let Err(error) = self.on_inbound_substream(peer, fallback, substream).await {
918                                tracing::debug!(
919                                    target: LOG_TARGET,
920                                    ?peer,
921                                    protocol = %self.protocol,
922                                    ?error,
923                                    "failed to handle inbound substream",
924                                );
925                            }
926                        }
927                        Direction::Outbound(substream_id) => {
928                            let _ = self.on_outbound_substream(peer, substream_id, substream, fallback).await;
929                        }
930                    },
931                    Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
932                        if let Err(error) = self.on_substream_open_failure(substream, error).await {
933                            tracing::warn!(
934                                target: LOG_TARGET,
935                                protocol = %self.protocol,
936                                ?error,
937                                "failed to handle substream open failure",
938                            );
939                        }
940                    }
941                    Some(TransportEvent::DialFailure { peer, ..  }) => self.on_dial_failure(peer).await,
942                    None => return,
943                },
944                event = self.pending_inbound.select_next_some(), if !self.pending_inbound.is_empty() => {
945                    let (peer, request_id, fallback, event) = event;
946
947                    if let Err(error) = self.on_substream_event(peer, request_id, fallback, event).await {
948                        tracing::debug!(
949                            target: LOG_TARGET,
950                            ?peer,
951                            protocol = %self.protocol,
952                            ?request_id,
953                            ?error,
954                            "failed to handle substream event",
955                        );
956                    }
957
958                    self.pending_outbound_cancels.remove(&request_id);
959                }
960                _ = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => {}
961                event = self.pending_inbound_requests.next() => match event {
962                    Some(((peer, request_id), message)) => {
963                        if let Err(error) = self.on_inbound_request(peer, request_id, message).await {
964                            tracing::debug!(
965                                target: LOG_TARGET,
966                                ?peer,
967                                protocol = %self.protocol,
968                                ?request_id,
969                                ?error,
970                                "failed to handle inbound request",
971                            );
972                        }
973                    }
974                    None => return,
975                },
976                command = self.command_rx.recv() => match command {
977                    None => {
978                        tracing::debug!(target: LOG_TARGET, protocol = %self.protocol, "user protocol has exited, exiting");
979                        return
980                    }
981                    Some(command) => match command {
982                        RequestResponseCommand::SendRequest { peer, request_id, request, dial_options } => {
983                            if let Err(error) = self.on_send_request(peer, request_id, request, dial_options, None).await {
984                                tracing::debug!(
985                                    target: LOG_TARGET,
986                                    ?peer,
987                                    protocol = %self.protocol,
988                                    ?request_id,
989                                    ?error,
990                                    "failed to send request",
991                                );
992                            }
993                        }
994                        RequestResponseCommand::CancelRequest { request_id } => {
995                            if let Err(error) = self.on_cancel_request(request_id).await {
996                                tracing::debug!(
997                                    target: LOG_TARGET,
998                                    protocol = %self.protocol,
999                                    ?request_id,
1000                                    ?error,
1001                                    "failed to cancel reqeuest",
1002                                );
1003                            }
1004                        }
1005                        RequestResponseCommand::SendRequestWithFallback { peer, request_id, request, fallback, dial_options } => {
1006                            if let Err(error) = self.on_send_request(peer, request_id, request, dial_options, Some(fallback)).await {
1007                                tracing::debug!(
1008                                    target: LOG_TARGET,
1009                                    ?peer,
1010                                    protocol = %self.protocol,
1011                                    ?request_id,
1012                                    ?error,
1013                                    "failed to send request",
1014                                );
1015                            }
1016                        }
1017                    }
1018                },
1019            }
1020        }
1021    }
1022}