litep2p/protocol/request_response/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Request-response protocol implementation.
22
23use crate::{
24    error::{Error, NegotiationError, SubstreamError},
25    multistream_select::NegotiationError::Failed as MultistreamFailed,
26    protocol::{
27        request_response::handle::InnerRequestResponseEvent, Direction, TransportEvent,
28        TransportService,
29    },
30    substream::Substream,
31    types::{protocol::ProtocolName, RequestId, SubstreamId},
32    utils::futures_stream::FuturesStream,
33    PeerId,
34};
35
36use bytes::BytesMut;
37use futures::{channel, future::BoxFuture, stream::FuturesUnordered, StreamExt};
38use tokio::{
39    sync::{
40        mpsc::{Receiver, Sender},
41        oneshot,
42    },
43    time::sleep,
44};
45
46use std::{
47    collections::{hash_map::Entry, HashMap, HashSet},
48    io::ErrorKind,
49    sync::{
50        atomic::{AtomicUsize, Ordering},
51        Arc,
52    },
53    time::Duration,
54};
55
56pub use config::{Config, ConfigBuilder};
57pub use handle::{
58    DialOptions, RejectReason, RequestResponseCommand, RequestResponseError, RequestResponseEvent,
59    RequestResponseHandle,
60};
61
62mod config;
63mod handle;
64#[cfg(test)]
65mod tests;
66
67/// Logging target for the file.
68const LOG_TARGET: &str = "litep2p::request-response::protocol";
69
70/// Default request timeout.
71const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
72
73/// Pending request.
74type PendingRequest = (
75    PeerId,
76    RequestId,
77    Option<ProtocolName>,
78    Result<Vec<u8>, RequestResponseError>,
79);
80
81/// Request context.
82struct RequestContext {
83    /// Peer ID.
84    peer: PeerId,
85
86    /// Request ID.
87    request_id: RequestId,
88
89    /// Request.
90    request: Vec<u8>,
91
92    /// Fallback request.
93    fallback: Option<(ProtocolName, Vec<u8>)>,
94}
95
96impl RequestContext {
97    /// Create new [`RequestContext`].
98    fn new(
99        peer: PeerId,
100        request_id: RequestId,
101        request: Vec<u8>,
102        fallback: Option<(ProtocolName, Vec<u8>)>,
103    ) -> Self {
104        Self {
105            peer,
106            request_id,
107            request,
108            fallback,
109        }
110    }
111}
112
113/// Peer context.
114struct PeerContext {
115    /// Active requests.
116    active: HashSet<RequestId>,
117
118    /// Active inbound requests and their fallback names.
119    active_inbound: HashMap<RequestId, Option<ProtocolName>>,
120}
121
122impl PeerContext {
123    /// Create new [`PeerContext`].
124    fn new() -> Self {
125        Self {
126            active: HashSet::new(),
127            active_inbound: HashMap::new(),
128        }
129    }
130}
131
132/// Request-response protocol.
133pub(crate) struct RequestResponseProtocol {
134    /// Transport service.
135    service: TransportService,
136
137    /// Protocol.
138    protocol: ProtocolName,
139
140    /// Connected peers.
141    peers: HashMap<PeerId, PeerContext>,
142
143    /// Pending outbound substreams, mapped from `SubstreamId` to `RequestId`.
144    pending_outbound: HashMap<SubstreamId, RequestContext>,
145
146    /// Pending outbound responses.
147    ///
148    /// The future listens to a `oneshot::Sender` which is given to `RequestResponseHandle`.
149    /// If the request is accepted by the local node, the response is sent over the channel to the
150    /// the future which sends it to remote peer and closes the substream.
151    ///
152    /// If the substream is rejected by the local node, the `oneshot::Sender` is dropped which
153    /// notifies the future that the request should be rejected by closing the substream.
154    pending_outbound_responses: FuturesUnordered<BoxFuture<'static, ()>>,
155
156    /// Pending outbound cancellation handles.
157    pending_outbound_cancels: HashMap<RequestId, oneshot::Sender<()>>,
158
159    /// Pending inbound responses.
160    pending_inbound: FuturesUnordered<BoxFuture<'static, PendingRequest>>,
161
162    /// Pending inbound requests.
163    pending_inbound_requests: FuturesStream<
164        BoxFuture<
165            'static,
166            (
167                PeerId,
168                RequestId,
169                Result<BytesMut, SubstreamError>,
170                Substream,
171            ),
172        >,
173    >,
174
175    /// Pending dials for outbound requests.
176    pending_dials: HashMap<PeerId, RequestContext>,
177
178    /// TX channel for sending events to the user protocol.
179    event_tx: Sender<InnerRequestResponseEvent>,
180
181    /// RX channel for receive commands from the `RequestResponseHandle`.
182    command_rx: Receiver<RequestResponseCommand>,
183
184    /// Next request ID.
185    next_request_id: Arc<AtomicUsize>,
186
187    /// Timeout for outbound requests.
188    timeout: Duration,
189
190    /// Maximum concurrent inbound requests, if specified.
191    max_concurrent_inbound_requests: Option<usize>,
192}
193
194impl RequestResponseProtocol {
195    /// Create new [`RequestResponseProtocol`].
196    pub(crate) fn new(service: TransportService, config: Config) -> Self {
197        Self {
198            service,
199            peers: HashMap::new(),
200            timeout: config.timeout,
201            next_request_id: config.next_request_id,
202            event_tx: config.event_tx,
203            command_rx: config.command_rx,
204            protocol: config.protocol_name,
205            pending_dials: HashMap::new(),
206            pending_outbound: HashMap::new(),
207            pending_inbound: FuturesUnordered::new(),
208            pending_outbound_cancels: HashMap::new(),
209            pending_inbound_requests: FuturesStream::new(),
210            pending_outbound_responses: FuturesUnordered::new(),
211            max_concurrent_inbound_requests: config.max_concurrent_inbound_request,
212        }
213    }
214
215    /// Get next ephemeral request ID.
216    fn next_request_id(&mut self) -> RequestId {
217        RequestId::from(self.next_request_id.fetch_add(1usize, Ordering::Relaxed))
218    }
219
220    /// Connection established to remote peer.
221    async fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
222        tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection established");
223
224        let Entry::Vacant(entry) = self.peers.entry(peer) else {
225            tracing::error!(
226                target: LOG_TARGET,
227                ?peer,
228                "state mismatch: peer already exists",
229            );
230            debug_assert!(false);
231            return Err(Error::PeerAlreadyExists(peer));
232        };
233
234        match self.pending_dials.remove(&peer) {
235            None => {
236                tracing::debug!(
237                    target: LOG_TARGET,
238                    ?peer,
239                    protocol = %self.protocol,
240                    "peer connected without pending dial",
241                );
242                entry.insert(PeerContext::new());
243            }
244            Some(context) => match self.service.open_substream(peer) {
245                Ok(substream_id) => {
246                    tracing::trace!(
247                        target: LOG_TARGET,
248                        ?peer,
249                        protocol = %self.protocol,
250                        request_id = ?context.request_id,
251                        ?substream_id,
252                        "dial succeeded, open substream",
253                    );
254
255                    entry.insert(PeerContext {
256                        active: HashSet::from_iter([context.request_id]),
257                        active_inbound: HashMap::new(),
258                    });
259                    self.pending_outbound.insert(
260                        substream_id,
261                        RequestContext::new(
262                            peer,
263                            context.request_id,
264                            context.request,
265                            context.fallback,
266                        ),
267                    );
268                }
269                // only reason the substream would fail to open would be that the connection
270                // would've been reported to the protocol with enough delay that the keep-alive
271                // timeout had expired and no other protocol had opened a substream to it, causing
272                // the connection to be closed
273                Err(error) => {
274                    tracing::warn!(
275                        target: LOG_TARGET,
276                        ?peer,
277                        protocol = %self.protocol,
278                        request_id = ?context.request_id,
279                        ?error,
280                        "failed to open substream",
281                    );
282
283                    return self
284                        .report_request_failure(
285                            peer,
286                            context.request_id,
287                            RequestResponseError::Rejected(error.into()),
288                        )
289                        .await;
290                }
291            },
292        }
293
294        Ok(())
295    }
296
297    /// Connection closed to remote peer.
298    async fn on_connection_closed(&mut self, peer: PeerId) {
299        tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
300
301        // Remove any pending outbound substreams for this peer.
302        self.pending_outbound.retain(|_, context| context.peer != peer);
303
304        let Some(context) = self.peers.remove(&peer) else {
305            tracing::error!(
306                target: LOG_TARGET,
307                ?peer,
308                "Peer does not exist or substream open failed during connection establishment",
309            );
310            return;
311        };
312
313        // sent failure events for all pending outbound requests
314        for request_id in context.active {
315            let _ = self
316                .event_tx
317                .send(InnerRequestResponseEvent::RequestFailed {
318                    peer,
319                    request_id,
320                    error: RequestResponseError::Rejected(RejectReason::ConnectionClosed),
321                })
322                .await;
323        }
324    }
325
326    /// Local node opened a substream to remote node.
327    async fn on_outbound_substream(
328        &mut self,
329        peer: PeerId,
330        substream_id: SubstreamId,
331        mut substream: Substream,
332        fallback_protocol: Option<ProtocolName>,
333    ) -> crate::Result<()> {
334        let Some(RequestContext {
335            request_id,
336            request,
337            fallback,
338            ..
339        }) = self.pending_outbound.remove(&substream_id)
340        else {
341            tracing::error!(
342                target: LOG_TARGET,
343                ?peer,
344                protocol = %self.protocol,
345                ?substream_id,
346                "pending outbound request does not exist",
347            );
348            debug_assert!(false);
349
350            return Err(Error::InvalidState);
351        };
352
353        tracing::trace!(
354            target: LOG_TARGET,
355            ?peer,
356            protocol = %self.protocol,
357            ?substream_id,
358            ?request_id,
359            "substream opened, send request",
360        );
361
362        let request = match (&fallback_protocol, fallback) {
363            (Some(protocol), Some((fallback_protocol, fallback_request)))
364                if protocol == &fallback_protocol =>
365                fallback_request,
366            _ => request,
367        };
368
369        let request_timeout = self.timeout;
370        let protocol = self.protocol.clone();
371        let (tx, rx) = oneshot::channel();
372        self.pending_outbound_cancels.insert(request_id, tx);
373
374        self.pending_inbound.push(Box::pin(async move {
375            match tokio::time::timeout(request_timeout, substream.send_framed(request.into())).await
376            {
377                Err(_) => (
378                    peer,
379                    request_id,
380                    fallback_protocol,
381                    Err(RequestResponseError::Timeout),
382                ),
383                Ok(Err(SubstreamError::IoError(ErrorKind::PermissionDenied))) => {
384                    tracing::warn!(
385                        target: LOG_TARGET,
386                        ?peer,
387                        %protocol,
388                        "tried to send too large request",
389                    );
390
391                    (
392                        peer,
393                        request_id,
394                        fallback_protocol,
395                        Err(RequestResponseError::TooLargePayload),
396                    )
397                }
398                Ok(Err(error)) => (
399                    peer,
400                    request_id,
401                    fallback_protocol,
402                    Err(RequestResponseError::Rejected(error.into())),
403                ),
404                Ok(Ok(_)) => {
405                    tokio::select! {
406                        _ = rx => {
407                            tracing::debug!(
408                                target: LOG_TARGET,
409                                ?peer,
410                                %protocol,
411                                ?request_id,
412                                "request canceled",
413                            );
414
415                            let _ = substream.close().await;
416                            (
417                                peer,
418                                request_id,
419                                fallback_protocol,
420                                Err(RequestResponseError::Canceled))
421                        }
422                        _ = sleep(request_timeout) => {
423                            tracing::debug!(
424                                target: LOG_TARGET,
425                                ?peer,
426                                %protocol,
427                                ?request_id,
428                                "request timed out",
429                            );
430
431                            let _ = substream.close().await;
432                            (peer, request_id, fallback_protocol, Err(RequestResponseError::Timeout))
433                        }
434                        event = substream.next() => match event {
435                            Some(Ok(response)) => {
436                                (peer, request_id, fallback_protocol, Ok(response.freeze().into()))
437                            },
438                            Some(Err(error)) => {
439                                (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(error.into())))
440                            },
441                            None => {
442                                tracing::debug!(
443                                    target: LOG_TARGET,
444                                    ?peer,
445                                    %protocol,
446                                    ?request_id,
447                                    "substream closed",
448                                );
449                                (peer, request_id, fallback_protocol, Err(RequestResponseError::Rejected(RejectReason::SubstreamClosed)))
450                            }
451                        }
452                    }
453                }
454            }
455        }));
456
457        Ok(())
458    }
459
460    /// Handle pending inbound response.
461    async fn on_inbound_request(
462        &mut self,
463        peer: PeerId,
464        request_id: RequestId,
465        request: Result<BytesMut, SubstreamError>,
466        mut substream: Substream,
467    ) -> crate::Result<()> {
468        // The peer will no longer exist if the connection was closed before processing the request.
469        let peer_context = self.peers.get_mut(&peer).ok_or(Error::PeerDoesntExist(peer))?;
470        let fallback = peer_context.active_inbound.remove(&request_id).ok_or_else(|| {
471            tracing::debug!(
472                target: LOG_TARGET,
473                ?peer,
474                protocol = %self.protocol,
475                ?request_id,
476                "no active inbound request",
477            );
478
479            Error::InvalidState
480        })?;
481
482        let protocol = self.protocol.clone();
483
484        tracing::trace!(
485            target: LOG_TARGET,
486            ?peer,
487            %protocol,
488            ?request_id,
489            "inbound request",
490        );
491
492        let Ok(request) = request else {
493            tracing::debug!(
494                target: LOG_TARGET,
495                ?peer,
496                %protocol,
497                ?request_id,
498                ?request,
499                "failed to read request from substream",
500            );
501            return Err(Error::InvalidData);
502        };
503
504        // once the request has been read from the substream, start a future which waits
505        // for an input from the user.
506        //
507        // the input is either a response (succes) or rejection (failure) which is communicated
508        // by sending the response over the `oneshot::Sender` or closing it, respectively.
509        let timeout = self.timeout;
510        let (response_tx, rx): (
511            oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>,
512            _,
513        ) = oneshot::channel();
514
515        self.pending_outbound_responses.push(Box::pin(async move {
516            match rx.await {
517                Err(_) => {
518                    tracing::debug!(
519                        target: LOG_TARGET,
520                        ?peer,
521                        %protocol,
522                        ?request_id,
523                        "request rejected",
524                    );
525                    let _ = substream.close().await;
526                }
527                Ok((response, mut feedback)) => {
528                    tracing::trace!(
529                        target: LOG_TARGET,
530                        ?peer,
531                        %protocol,
532                        ?request_id,
533                        "send response",
534                    );
535
536                    match tokio::time::timeout(timeout, substream.send_framed(response.into()))
537                        .await
538                    {
539                        Err(_) => tracing::debug!(
540                            target: LOG_TARGET,
541                            ?peer,
542                            %protocol,
543                            ?request_id,
544                            "timed out while sending response",
545                        ),
546                        Ok(Ok(_)) => feedback.take().map_or((), |feedback| {
547                            let _ = feedback.send(());
548                        }),
549                        Ok(Err(error)) => tracing::trace!(
550                        target: LOG_TARGET,
551                            ?peer,
552                            %protocol,
553                            ?request_id,
554                            ?error,
555                            "failed to send request to peer",
556                        ),
557                    }
558                }
559            }
560        }));
561
562        self.event_tx
563            .send(InnerRequestResponseEvent::RequestReceived {
564                peer,
565                fallback,
566                request_id,
567                request: request.freeze().into(),
568                response_tx,
569            })
570            .await
571            .map_err(From::from)
572    }
573
574    /// Remote opened a substream to local node.
575    async fn on_inbound_substream(
576        &mut self,
577        peer: PeerId,
578        fallback: Option<ProtocolName>,
579        mut substream: Substream,
580    ) -> crate::Result<()> {
581        tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "handle inbound substream");
582
583        if let Some(max_requests) = self.max_concurrent_inbound_requests {
584            let num_inbound_requests =
585                self.pending_inbound_requests.len() + self.pending_outbound_responses.len();
586
587            if max_requests <= num_inbound_requests {
588                tracing::debug!(
589                    target: LOG_TARGET,
590                    ?peer,
591                    protocol = %self.protocol,
592                    ?fallback,
593                    ?max_requests,
594                    "rejecting request as already at maximum",
595                );
596
597                let _ = substream.close().await;
598                return Ok(());
599            }
600        }
601
602        // allocate ephemeral id for the inbound request and return it to the user protocol
603        //
604        // when user responds to the request, this is used to associate the response with the
605        // correct substream.
606        let request_id = self.next_request_id();
607        self.peers
608            .get_mut(&peer)
609            .ok_or(Error::PeerDoesntExist(peer))?
610            .active_inbound
611            .insert(request_id, fallback);
612
613        self.pending_inbound_requests.push(Box::pin(async move {
614            let request = match substream.next().await {
615                Some(Ok(request)) => Ok(request),
616                Some(Err(error)) => Err(error),
617                None => Err(SubstreamError::ConnectionClosed),
618            };
619
620            (peer, request_id, request, substream)
621        }));
622
623        Ok(())
624    }
625
626    async fn on_dial_failure(&mut self, peer: PeerId) {
627        if let Some(context) = self.pending_dials.remove(&peer) {
628            tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "failed to dial peer");
629
630            let _ = self
631                .peers
632                .get_mut(&peer)
633                .map(|peer_context| peer_context.active.remove(&context.request_id));
634            let _ = self
635                .report_request_failure(
636                    peer,
637                    context.request_id,
638                    RequestResponseError::Rejected(RejectReason::DialFailed(None)),
639                )
640                .await;
641        }
642    }
643
644    /// Failed to open substream to remote peer.
645    async fn on_substream_open_failure(
646        &mut self,
647        substream: SubstreamId,
648        error: SubstreamError,
649    ) -> crate::Result<()> {
650        let Some(RequestContext {
651            request_id, peer, ..
652        }) = self.pending_outbound.remove(&substream)
653        else {
654            tracing::error!(
655                target: LOG_TARGET,
656                protocol = %self.protocol,
657                ?substream,
658                "pending outbound request does not exist",
659            );
660            debug_assert!(false);
661
662            return Err(Error::InvalidState);
663        };
664
665        tracing::debug!(
666            target: LOG_TARGET,
667            ?peer,
668            protocol = %self.protocol,
669            ?request_id,
670            ?substream,
671            ?error,
672            "failed to open substream",
673        );
674
675        let _ = self
676            .peers
677            .get_mut(&peer)
678            .map(|peer_context| peer_context.active.remove(&request_id));
679
680        self.event_tx
681            .send(InnerRequestResponseEvent::RequestFailed {
682                peer,
683                request_id,
684                error: match error {
685                    SubstreamError::NegotiationError(NegotiationError::MultistreamSelectError(
686                        MultistreamFailed,
687                    )) => RequestResponseError::UnsupportedProtocol,
688                    _ => RequestResponseError::Rejected(error.into()),
689                },
690            })
691            .await
692            .map_err(From::from)
693    }
694
695    /// Report request send failure to user.
696    async fn report_request_failure(
697        &mut self,
698        peer: PeerId,
699        request_id: RequestId,
700        error: RequestResponseError,
701    ) -> crate::Result<()> {
702        self.event_tx
703            .send(InnerRequestResponseEvent::RequestFailed {
704                peer,
705                request_id,
706                error,
707            })
708            .await
709            .map_err(From::from)
710    }
711
712    /// Send request to remote peer.
713    fn on_send_request(
714        &mut self,
715        peer: PeerId,
716        request_id: RequestId,
717        request: Vec<u8>,
718        dial_options: DialOptions,
719        fallback: Option<(ProtocolName, Vec<u8>)>,
720    ) -> Result<(), RequestResponseError> {
721        tracing::trace!(
722            target: LOG_TARGET,
723            ?peer,
724            protocol = %self.protocol,
725            ?request_id,
726            ?dial_options,
727            "send request to remote peer",
728        );
729
730        let Some(context) = self.peers.get_mut(&peer) else {
731            match dial_options {
732                DialOptions::Reject => {
733                    tracing::debug!(
734                        target: LOG_TARGET,
735                        ?peer,
736                        protocol = %self.protocol,
737                        ?request_id,
738                        ?dial_options,
739                        "peer not connected and should not dial",
740                    );
741
742                    return Err(RequestResponseError::NotConnected);
743                }
744                DialOptions::Dial => match self.service.dial(&peer) {
745                    Ok(_) => {
746                        tracing::trace!(
747                            target: LOG_TARGET,
748                            ?peer,
749                            protocol = %self.protocol,
750                            ?request_id,
751                            "started dialing peer",
752                        );
753
754                        self.pending_dials.insert(
755                            peer,
756                            RequestContext::new(peer, request_id, request, fallback),
757                        );
758                        return Ok(());
759                    }
760                    Err(error) => {
761                        tracing::debug!(
762                            target: LOG_TARGET,
763                            ?peer,
764                            protocol = %self.protocol,
765                            ?error,
766                            "failed to dial peer"
767                        );
768
769                        return Err(RequestResponseError::Rejected(RejectReason::DialFailed(
770                            Some(error),
771                        )));
772                    }
773                },
774            }
775        };
776
777        // open substream and push it pending outbound substreams
778        // once the substream is opened, send the request.
779        match self.service.open_substream(peer) {
780            Ok(substream_id) => {
781                let unique_request_id = context.active.insert(request_id);
782                debug_assert!(unique_request_id);
783
784                self.pending_outbound.insert(
785                    substream_id,
786                    RequestContext::new(peer, request_id, request, fallback),
787                );
788
789                Ok(())
790            }
791            Err(error) => {
792                tracing::debug!(
793                    target: LOG_TARGET,
794                    ?peer,
795                    protocol = %self.protocol,
796                    ?request_id,
797                    ?error,
798                    "failed to open substream",
799                );
800
801                Err(RequestResponseError::Rejected(error.into()))
802            }
803        }
804    }
805
806    /// Handle substream event.
807    async fn on_substream_event(
808        &mut self,
809        peer: PeerId,
810        request_id: RequestId,
811        fallback: Option<ProtocolName>,
812        message: Result<Vec<u8>, RequestResponseError>,
813    ) -> crate::Result<()> {
814        if !self
815            .peers
816            .get_mut(&peer)
817            .ok_or(Error::PeerDoesntExist(peer))?
818            .active
819            .remove(&request_id)
820        {
821            tracing::warn!(
822                target: LOG_TARGET,
823                ?peer,
824                protocol = %self.protocol,
825                ?request_id,
826                "invalid state: received substream event but no active substream",
827            );
828            return Err(Error::InvalidState);
829        }
830
831        let event = match message {
832            Ok(response) => InnerRequestResponseEvent::ResponseReceived {
833                peer,
834                request_id,
835                response,
836                fallback,
837            },
838            Err(error) => match error {
839                RequestResponseError::Canceled => {
840                    tracing::debug!(
841                        target: LOG_TARGET,
842                        ?peer,
843                        protocol = %self.protocol,
844                        ?request_id,
845                        "request canceled by local node",
846                    );
847                    return Ok(());
848                }
849                error => InnerRequestResponseEvent::RequestFailed {
850                    peer,
851                    request_id,
852                    error,
853                },
854            },
855        };
856
857        self.event_tx.send(event).await.map_err(From::from)
858    }
859
860    /// Cancel outbound request.
861    fn on_cancel_request(&mut self, request_id: RequestId) -> crate::Result<()> {
862        tracing::trace!(target: LOG_TARGET, protocol = %self.protocol, ?request_id, "cancel outbound request");
863
864        match self.pending_outbound_cancels.remove(&request_id) {
865            Some(tx) => tx.send(()).map_err(|_| Error::SubstreamDoesntExist),
866            None => {
867                tracing::debug!(
868                    target: LOG_TARGET,
869                    protocol = %self.protocol,
870                    ?request_id,
871                    "tried to cancel request which doesn't exist",
872                );
873
874                Ok(())
875            }
876        }
877    }
878
879    /// Handles the service event.
880    async fn handle_service_event(&mut self, event: TransportEvent) {
881        match event {
882            TransportEvent::ConnectionEstablished { peer, .. } => {
883                if let Err(error) = self.on_connection_established(peer).await {
884                    tracing::debug!(
885                        target: LOG_TARGET,
886                        ?peer,
887                        protocol = %self.protocol,
888                        ?error,
889                        "failed to handle connection established",
890                    );
891                }
892            }
893
894            TransportEvent::ConnectionClosed { peer } => {
895                self.on_connection_closed(peer).await;
896            }
897
898            TransportEvent::SubstreamOpened {
899                peer,
900                substream,
901                direction,
902                fallback,
903                ..
904            } => match direction {
905                Direction::Inbound => {
906                    if let Err(error) = self.on_inbound_substream(peer, fallback, substream).await {
907                        tracing::debug!(
908                            target: LOG_TARGET,
909                            ?peer,
910                            protocol = %self.protocol,
911                            ?error,
912                            "failed to handle inbound substream",
913                        );
914                    }
915                }
916                Direction::Outbound(substream_id) => {
917                    let _ =
918                        self.on_outbound_substream(peer, substream_id, substream, fallback).await;
919                }
920            },
921
922            TransportEvent::SubstreamOpenFailure { substream, error } => {
923                if let Err(error) = self.on_substream_open_failure(substream, error).await {
924                    tracing::warn!(
925                        target: LOG_TARGET,
926                        protocol = %self.protocol,
927                        ?error,
928                        "failed to handle substream open failure",
929                    );
930                }
931            }
932
933            TransportEvent::DialFailure { peer, .. } => self.on_dial_failure(peer).await,
934        }
935    }
936
937    /// Handles the user command.
938    async fn handle_user_command(&mut self, command: RequestResponseCommand) {
939        match command {
940            RequestResponseCommand::SendRequest {
941                peer,
942                request_id,
943                request,
944                dial_options,
945            } => {
946                if let Err(error) =
947                    self.on_send_request(peer, request_id, request, dial_options, None)
948                {
949                    tracing::debug!(
950                        target: LOG_TARGET,
951                        ?peer,
952                        protocol = %self.protocol,
953                        ?request_id,
954                        ?error,
955                        "failed to send request",
956                    );
957
958                    if let Err(error) = self.report_request_failure(peer, request_id, error).await {
959                        tracing::debug!(
960                            target: LOG_TARGET,
961                            ?peer,
962                            protocol = %self.protocol,
963                            ?request_id,
964                            ?error,
965                            "failed to report request failure",
966                        );
967                    }
968                }
969            }
970            RequestResponseCommand::SendRequestWithFallback {
971                peer,
972                request_id,
973                request,
974                fallback,
975                dial_options,
976            } => {
977                if let Err(error) =
978                    self.on_send_request(peer, request_id, request, dial_options, Some(fallback))
979                {
980                    tracing::debug!(
981                        target: LOG_TARGET,
982                        ?peer,
983                        protocol = %self.protocol,
984                        ?request_id,
985                        ?error,
986                        "failed to send request",
987                    );
988
989                    if let Err(error) = self.report_request_failure(peer, request_id, error).await {
990                        tracing::debug!(
991                            target: LOG_TARGET,
992                            ?peer,
993                            protocol = %self.protocol,
994                            ?request_id,
995                            ?error,
996                            "failed to report request failure",
997                        );
998                    }
999                }
1000            }
1001            RequestResponseCommand::CancelRequest { request_id } => {
1002                if let Err(error) = self.on_cancel_request(request_id) {
1003                    tracing::debug!(
1004                        target: LOG_TARGET,
1005                        protocol = %self.protocol,
1006                        ?request_id,
1007                        ?error,
1008                        "failed to cancel reqeuest",
1009                    );
1010                }
1011            }
1012        }
1013    }
1014
1015    /// Start [`RequestResponseProtocol`] event loop.
1016    pub async fn run(mut self) {
1017        tracing::debug!(target: LOG_TARGET, "starting request-response event loop");
1018
1019        loop {
1020            tokio::select! {
1021                // events coming from the network have higher priority than user commands as all user commands are
1022                // responses to network behaviour so ensure that the commands operate on the most up to date information.
1023                biased;
1024
1025                // Connection and substream events from the transport service.
1026                event = self.service.next() => match event {
1027                    Some(event) => self.handle_service_event(event).await,
1028                    None => {
1029                        tracing::debug!(target: LOG_TARGET, protocol = %self.protocol, "service has exited, exiting");
1030                        return
1031                    }
1032                },
1033
1034                // These are outbound requests waiting for the substream to produce a response.
1035                event = self.pending_inbound.select_next_some(), if !self.pending_inbound.is_empty() => {
1036                    let (peer, request_id, fallback, event) = event;
1037
1038                    if let Err(error) = self.on_substream_event(peer, request_id, fallback, event).await {
1039                        tracing::debug!(
1040                            target: LOG_TARGET,
1041                            ?peer,
1042                            protocol = %self.protocol,
1043                            ?request_id,
1044                            ?error,
1045                            "failed to handle substream event",
1046                        );
1047                    }
1048
1049                    self.pending_outbound_cancels.remove(&request_id);
1050                }
1051
1052                // These are inbound requests waiting for the user to respond, then for the substream to send the response.
1053                _ = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => {}
1054
1055                // Inbound requests that are moved to `pending_outbound_responses`.
1056                event = self.pending_inbound_requests.next(), if !self.pending_inbound_requests.is_empty() => match event {
1057                    Some((peer, request_id, request, substream)) => {
1058                        if let Err(error) = self.on_inbound_request(peer, request_id, request, substream).await {
1059                            tracing::debug!(
1060                                target: LOG_TARGET,
1061                                ?peer,
1062                                protocol = %self.protocol,
1063                                ?request_id,
1064                                ?error,
1065                                "failed to handle inbound request",
1066                            );
1067                        }
1068                    }
1069                    None => return,
1070                },
1071
1072                // User commands.
1073                command = self.command_rx.recv() => match command {
1074                    Some(command) => self.handle_user_command(command).await,
1075                    None => {
1076                        tracing::debug!(target: LOG_TARGET, protocol = %self.protocol, "user protocol has exited, exiting");
1077                        return
1078                    }
1079                },
1080            }
1081        }
1082    }
1083}