libp2p_request_response/
lib.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Generic request/response protocols.
22//!
23//! ## General Usage
24//!
25//! The [`Behaviour`] struct is a [`NetworkBehaviour`] that implements a generic
26//! request/response protocol or protocol family, whereby each request is
27//! sent over a new substream on a connection. `Behaviour` is generic
28//! over the actual messages being sent, which are defined in terms of a
29//! [`Codec`]. Creating a request/response protocol thus amounts
30//! to providing an implementation of this trait which can then be
31//! given to [`Behaviour::with_codec`]. Further configuration options are
32//! available via the [`Config`].
33//!
34//! Requests are sent using [`Behaviour::send_request`] and the
35//! responses received as [`Message::Response`] via
36//! [`Event::Message`].
37//!
38//! Responses are sent using [`Behaviour::send_response`] upon
39//! receiving a [`Message::Request`] via
40//! [`Event::Message`].
41//!
42//! ## Predefined codecs
43//!
44//! In case your message types implement [`serde::Serialize`] and [`serde::Deserialize`],
45//! you can use two predefined behaviours:
46//!
47//! - [`cbor::Behaviour`] for CBOR-encoded messages
48//! - [`json::Behaviour`] for JSON-encoded messages
49//!
50//! ## Protocol Families
51//!
52//! A single [`Behaviour`] instance can be used with an entire
53//! protocol family that share the same request and response types.
54//! For that purpose, [`Codec::Protocol`] is typically
55//! instantiated with a sum type.
56//!
57//! ## Limited Protocol Support
58//!
59//! It is possible to only support inbound or outbound requests for
60//! a particular protocol. This is achieved by instantiating `Behaviour`
61//! with protocols using [`ProtocolSupport::Inbound`] or
62//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
63//! family can be configured in this way. Such protocols will not be
64//! advertised during inbound respectively outbound protocol negotiation
65//! on the substreams.
66
67#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
68
69#[cfg(feature = "cbor")]
70pub mod cbor;
71mod codec;
72mod handler;
73#[cfg(feature = "json")]
74pub mod json;
75
76pub use codec::Codec;
77pub use handler::ProtocolSupport;
78
79use crate::handler::protocol::RequestProtocol;
80use futures::channel::oneshot;
81use handler::Handler;
82use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
83use libp2p_identity::PeerId;
84use libp2p_swarm::{
85    behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
86    dial_opts::DialOpts,
87    ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler,
88    PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
89};
90use smallvec::SmallVec;
91use std::{
92    collections::{HashMap, HashSet, VecDeque},
93    fmt,
94    sync::{atomic::AtomicU64, Arc},
95    task::{Context, Poll},
96    time::Duration,
97};
98
99/// An inbound request or response.
100#[derive(Debug)]
101pub enum Message<TRequest, TResponse, TChannelResponse = TResponse> {
102    /// A request message.
103    Request {
104        /// The ID of this request.
105        request_id: RequestId,
106        /// The request message.
107        request: TRequest,
108        /// The channel waiting for the response.
109        ///
110        /// If this channel is dropped instead of being used to send a response
111        /// via [`Behaviour::send_response`], a [`Event::InboundFailure`]
112        /// with [`InboundFailure::ResponseOmission`] is emitted.
113        channel: ResponseChannel<TChannelResponse>,
114    },
115    /// A response message.
116    Response {
117        /// The ID of the request that produced this response.
118        ///
119        /// See [`Behaviour::send_request`].
120        request_id: RequestId,
121        /// The response message.
122        response: TResponse,
123    },
124}
125
126/// The events emitted by a request-response [`Behaviour`].
127#[derive(Debug)]
128pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
129    /// An incoming message (request or response).
130    Message {
131        /// The peer who sent the message.
132        peer: PeerId,
133        /// The incoming message.
134        message: Message<TRequest, TResponse, TChannelResponse>,
135    },
136    /// An outbound request failed.
137    OutboundFailure {
138        /// The peer to whom the request was sent.
139        peer: PeerId,
140        /// The (local) ID of the failed request.
141        request_id: RequestId,
142        /// The error that occurred.
143        error: OutboundFailure,
144    },
145    /// An inbound request failed.
146    InboundFailure {
147        /// The peer from whom the request was received.
148        peer: PeerId,
149        /// The ID of the failed inbound request.
150        request_id: RequestId,
151        /// The error that occurred.
152        error: InboundFailure,
153    },
154    /// A response to an inbound request has been sent.
155    ///
156    /// When this event is received, the response has been flushed on
157    /// the underlying transport connection.
158    ResponseSent {
159        /// The peer to whom the response was sent.
160        peer: PeerId,
161        /// The ID of the inbound request whose response was sent.
162        request_id: RequestId,
163    },
164}
165
166/// Possible failures occurring in the context of sending
167/// an outbound request and receiving the response.
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub enum OutboundFailure {
170    /// The request could not be sent because a dialing attempt failed.
171    DialFailure,
172    /// The request timed out before a response was received.
173    ///
174    /// It is not known whether the request may have been
175    /// received (and processed) by the remote peer.
176    Timeout,
177    /// The connection closed before a response was received.
178    ///
179    /// It is not known whether the request may have been
180    /// received (and processed) by the remote peer.
181    ConnectionClosed,
182    /// The remote supports none of the requested protocols.
183    UnsupportedProtocols,
184}
185
186impl fmt::Display for OutboundFailure {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        match self {
189            OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"),
190            OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"),
191            OutboundFailure::ConnectionClosed => {
192                write!(f, "Connection was closed before a response was received")
193            }
194            OutboundFailure::UnsupportedProtocols => {
195                write!(f, "The remote supports none of the requested protocols")
196            }
197        }
198    }
199}
200
201impl std::error::Error for OutboundFailure {}
202
203/// Possible failures occurring in the context of receiving an
204/// inbound request and sending a response.
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum InboundFailure {
207    /// The inbound request timed out, either while reading the
208    /// incoming request or before a response is sent, e.g. if
209    /// [`Behaviour::send_response`] is not called in a
210    /// timely manner.
211    Timeout,
212    /// The connection closed before a response could be send.
213    ConnectionClosed,
214    /// The local peer supports none of the protocols requested
215    /// by the remote.
216    UnsupportedProtocols,
217    /// The local peer failed to respond to an inbound request
218    /// due to the [`ResponseChannel`] being dropped instead of
219    /// being passed to [`Behaviour::send_response`].
220    ResponseOmission,
221}
222
223impl fmt::Display for InboundFailure {
224    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225        match self {
226            InboundFailure::Timeout => {
227                write!(f, "Timeout while receiving request or sending response")
228            }
229            InboundFailure::ConnectionClosed => {
230                write!(f, "Connection was closed before a response could be sent")
231            }
232            InboundFailure::UnsupportedProtocols => write!(
233                f,
234                "The local peer supports none of the protocols requested by the remote"
235            ),
236            InboundFailure::ResponseOmission => write!(
237                f,
238                "The response channel was dropped without sending a response to the remote"
239            ),
240        }
241    }
242}
243
244impl std::error::Error for InboundFailure {}
245
246/// A channel for sending a response to an inbound request.
247///
248/// See [`Behaviour::send_response`].
249#[derive(Debug)]
250pub struct ResponseChannel<TResponse> {
251    sender: oneshot::Sender<TResponse>,
252}
253
254impl<TResponse> ResponseChannel<TResponse> {
255    /// Checks whether the response channel is still open, i.e.
256    /// the `Behaviour` is still waiting for a
257    /// a response to be sent via [`Behaviour::send_response`]
258    /// and this response channel.
259    ///
260    /// If the response channel is no longer open then the inbound
261    /// request timed out waiting for the response.
262    pub fn is_open(&self) -> bool {
263        !self.sender.is_canceled()
264    }
265}
266
267/// The ID of an inbound or outbound request.
268///
269/// Note: [`RequestId`]'s uniqueness is only guaranteed between two
270/// inbound and likewise between two outbound requests. There is no
271/// uniqueness guarantee in a set of both inbound and outbound
272/// [`RequestId`]s nor in a set of inbound or outbound requests
273/// originating from different [`Behaviour`]'s.
274#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
275pub struct RequestId(u64);
276
277impl fmt::Display for RequestId {
278    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
279        write!(f, "{}", self.0)
280    }
281}
282
283/// The configuration for a `Behaviour` protocol.
284#[derive(Debug, Clone)]
285pub struct Config {
286    request_timeout: Duration,
287    connection_keep_alive: Duration,
288}
289
290impl Default for Config {
291    fn default() -> Self {
292        Self {
293            connection_keep_alive: Duration::from_secs(10),
294            request_timeout: Duration::from_secs(10),
295        }
296    }
297}
298
299impl Config {
300    /// Sets the keep-alive timeout of idle connections.
301    #[deprecated(
302        note = "Set a global idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead."
303    )]
304    pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self {
305        self.connection_keep_alive = v;
306        self
307    }
308
309    /// Sets the timeout for inbound and outbound requests.
310    pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
311        self.request_timeout = v;
312        self
313    }
314}
315
316/// A request/response protocol for some message codec.
317pub struct Behaviour<TCodec>
318where
319    TCodec: Codec + Clone + Send + 'static,
320{
321    /// The supported inbound protocols.
322    inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
323    /// The supported outbound protocols.
324    outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
325    /// The next (local) request ID.
326    next_request_id: RequestId,
327    /// The next (inbound) request ID.
328    next_inbound_id: Arc<AtomicU64>,
329    /// The protocol configuration.
330    config: Config,
331    /// The protocol codec for reading and writing requests and responses.
332    codec: TCodec,
333    /// Pending events to return from `poll`.
334    pending_events:
335        VecDeque<ToSwarm<Event<TCodec::Request, TCodec::Response>, RequestProtocol<TCodec>>>,
336    /// The currently connected peers, their pending outbound and inbound responses and their known,
337    /// reachable addresses, if any.
338    connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
339    /// Externally managed addresses via `add_address` and `remove_address`.
340    addresses: HashMap<PeerId, HashSet<Multiaddr>>,
341    /// Requests that have not yet been sent and are waiting for a connection
342    /// to be established.
343    pending_outbound_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
344}
345
346impl<TCodec> Behaviour<TCodec>
347where
348    TCodec: Codec + Default + Clone + Send + 'static,
349{
350    /// Creates a new `Behaviour` for the given protocols and configuration, using [`Default`] to construct the codec.
351    pub fn new<I>(protocols: I, cfg: Config) -> Self
352    where
353        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
354    {
355        Self::with_codec(TCodec::default(), protocols, cfg)
356    }
357}
358
359impl<TCodec> Behaviour<TCodec>
360where
361    TCodec: Codec + Clone + Send + 'static,
362{
363    /// Creates a new `Behaviour` for the given
364    /// protocols, codec and configuration.
365    pub fn with_codec<I>(codec: TCodec, protocols: I, cfg: Config) -> Self
366    where
367        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
368    {
369        let mut inbound_protocols = SmallVec::new();
370        let mut outbound_protocols = SmallVec::new();
371        for (p, s) in protocols {
372            if s.inbound() {
373                inbound_protocols.push(p.clone());
374            }
375            if s.outbound() {
376                outbound_protocols.push(p.clone());
377            }
378        }
379        Behaviour {
380            inbound_protocols,
381            outbound_protocols,
382            next_request_id: RequestId(1),
383            next_inbound_id: Arc::new(AtomicU64::new(1)),
384            config: cfg,
385            codec,
386            pending_events: VecDeque::new(),
387            connected: HashMap::new(),
388            pending_outbound_requests: HashMap::new(),
389            addresses: HashMap::new(),
390        }
391    }
392
393    /// Initiates sending a request.
394    ///
395    /// If the targeted peer is currently not connected, a dialing
396    /// attempt is initiated and the request is sent as soon as a
397    /// connection is established.
398    ///
399    /// > **Note**: In order for such a dialing attempt to succeed,
400    /// > the `RequestResonse` protocol must either be embedded
401    /// > in another `NetworkBehaviour` that provides peer and
402    /// > address discovery, or known addresses of peers must be
403    /// > managed via [`Behaviour::add_address`] and
404    /// > [`Behaviour::remove_address`].
405    pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId {
406        let request_id = self.next_request_id();
407        let request = RequestProtocol {
408            request_id,
409            codec: self.codec.clone(),
410            protocols: self.outbound_protocols.clone(),
411            request,
412        };
413
414        if let Some(request) = self.try_send_request(peer, request) {
415            self.pending_events.push_back(ToSwarm::Dial {
416                opts: DialOpts::peer_id(*peer).build(),
417            });
418            self.pending_outbound_requests
419                .entry(*peer)
420                .or_default()
421                .push(request);
422        }
423
424        request_id
425    }
426
427    /// Initiates sending a response to an inbound request.
428    ///
429    /// If the [`ResponseChannel`] is already closed due to a timeout or the
430    /// connection being closed, the response is returned as an `Err` for
431    /// further handling. Once the response has been successfully sent on the
432    /// corresponding connection, [`Event::ResponseSent`] is
433    /// emitted. In all other cases [`Event::InboundFailure`]
434    /// will be or has been emitted.
435    ///
436    /// The provided `ResponseChannel` is obtained from an inbound
437    /// [`Message::Request`].
438    pub fn send_response(
439        &mut self,
440        ch: ResponseChannel<TCodec::Response>,
441        rs: TCodec::Response,
442    ) -> Result<(), TCodec::Response> {
443        ch.sender.send(rs)
444    }
445
446    /// Adds a known address for a peer that can be used for
447    /// dialing attempts by the `Swarm`, i.e. is returned
448    /// by [`NetworkBehaviour::handle_pending_outbound_connection`].
449    ///
450    /// Addresses added in this way are only removed by `remove_address`.
451    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
452        self.addresses.entry(*peer).or_default().insert(address);
453    }
454
455    /// Removes an address of a peer previously added via `add_address`.
456    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
457        let mut last = false;
458        if let Some(addresses) = self.addresses.get_mut(peer) {
459            addresses.retain(|a| a != address);
460            last = addresses.is_empty();
461        }
462        if last {
463            self.addresses.remove(peer);
464        }
465    }
466
467    /// Checks whether a peer is currently connected.
468    pub fn is_connected(&self, peer: &PeerId) -> bool {
469        if let Some(connections) = self.connected.get(peer) {
470            !connections.is_empty()
471        } else {
472            false
473        }
474    }
475
476    /// Checks whether an outbound request to the peer with the provided
477    /// [`PeerId`] initiated by [`Behaviour::send_request`] is still
478    /// pending, i.e. waiting for a response.
479    pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
480        // Check if request is already sent on established connection.
481        let est_conn = self
482            .connected
483            .get(peer)
484            .map(|cs| {
485                cs.iter()
486                    .any(|c| c.pending_inbound_responses.contains(request_id))
487            })
488            .unwrap_or(false);
489        // Check if request is still pending to be sent.
490        let pen_conn = self
491            .pending_outbound_requests
492            .get(peer)
493            .map(|rps| rps.iter().any(|rp| rp.request_id == *request_id))
494            .unwrap_or(false);
495
496        est_conn || pen_conn
497    }
498
499    /// Checks whether an inbound request from the peer with the provided
500    /// [`PeerId`] is still pending, i.e. waiting for a response by the local
501    /// node through [`Behaviour::send_response`].
502    pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
503        self.connected
504            .get(peer)
505            .map(|cs| {
506                cs.iter()
507                    .any(|c| c.pending_outbound_responses.contains(request_id))
508            })
509            .unwrap_or(false)
510    }
511
512    /// Returns the next request ID.
513    fn next_request_id(&mut self) -> RequestId {
514        let request_id = self.next_request_id;
515        self.next_request_id.0 += 1;
516        request_id
517    }
518
519    /// Tries to send a request by queueing an appropriate event to be
520    /// emitted to the `Swarm`. If the peer is not currently connected,
521    /// the given request is return unchanged.
522    fn try_send_request(
523        &mut self,
524        peer: &PeerId,
525        request: RequestProtocol<TCodec>,
526    ) -> Option<RequestProtocol<TCodec>> {
527        if let Some(connections) = self.connected.get_mut(peer) {
528            if connections.is_empty() {
529                return Some(request);
530            }
531            let ix = (request.request_id.0 as usize) % connections.len();
532            let conn = &mut connections[ix];
533            conn.pending_inbound_responses.insert(request.request_id);
534            self.pending_events.push_back(ToSwarm::NotifyHandler {
535                peer_id: *peer,
536                handler: NotifyHandler::One(conn.id),
537                event: request,
538            });
539            None
540        } else {
541            Some(request)
542        }
543    }
544
545    /// Remove pending outbound response for the given peer and connection.
546    ///
547    /// Returns `true` if the provided connection to the given peer is still
548    /// alive and the [`RequestId`] was previously present and is now removed.
549    /// Returns `false` otherwise.
550    fn remove_pending_outbound_response(
551        &mut self,
552        peer: &PeerId,
553        connection: ConnectionId,
554        request: RequestId,
555    ) -> bool {
556        self.get_connection_mut(peer, connection)
557            .map(|c| c.pending_outbound_responses.remove(&request))
558            .unwrap_or(false)
559    }
560
561    /// Remove pending inbound response for the given peer and connection.
562    ///
563    /// Returns `true` if the provided connection to the given peer is still
564    /// alive and the [`RequestId`] was previously present and is now removed.
565    /// Returns `false` otherwise.
566    fn remove_pending_inbound_response(
567        &mut self,
568        peer: &PeerId,
569        connection: ConnectionId,
570        request: &RequestId,
571    ) -> bool {
572        self.get_connection_mut(peer, connection)
573            .map(|c| c.pending_inbound_responses.remove(request))
574            .unwrap_or(false)
575    }
576
577    /// Returns a mutable reference to the connection in `self.connected`
578    /// corresponding to the given [`PeerId`] and [`ConnectionId`].
579    fn get_connection_mut(
580        &mut self,
581        peer: &PeerId,
582        connection: ConnectionId,
583    ) -> Option<&mut Connection> {
584        self.connected
585            .get_mut(peer)
586            .and_then(|connections| connections.iter_mut().find(|c| c.id == connection))
587    }
588
589    fn on_address_change(
590        &mut self,
591        AddressChange {
592            peer_id,
593            connection_id,
594            new,
595            ..
596        }: AddressChange,
597    ) {
598        let new_address = match new {
599            ConnectedPoint::Dialer { address, .. } => Some(address.clone()),
600            ConnectedPoint::Listener { .. } => None,
601        };
602        let connections = self
603            .connected
604            .get_mut(&peer_id)
605            .expect("Address change can only happen on an established connection.");
606
607        let connection = connections
608            .iter_mut()
609            .find(|c| c.id == connection_id)
610            .expect("Address change can only happen on an established connection.");
611        connection.remote_address = new_address;
612    }
613
614    fn on_connection_closed(
615        &mut self,
616        ConnectionClosed {
617            peer_id,
618            connection_id,
619            remaining_established,
620            ..
621        }: ConnectionClosed<<Self as NetworkBehaviour>::ConnectionHandler>,
622    ) {
623        let connections = self
624            .connected
625            .get_mut(&peer_id)
626            .expect("Expected some established connection to peer before closing.");
627
628        let connection = connections
629            .iter()
630            .position(|c| c.id == connection_id)
631            .map(|p: usize| connections.remove(p))
632            .expect("Expected connection to be established before closing.");
633
634        debug_assert_eq!(connections.is_empty(), remaining_established == 0);
635        if connections.is_empty() {
636            self.connected.remove(&peer_id);
637        }
638
639        for request_id in connection.pending_outbound_responses {
640            self.pending_events
641                .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
642                    peer: peer_id,
643                    request_id,
644                    error: InboundFailure::ConnectionClosed,
645                }));
646        }
647
648        for request_id in connection.pending_inbound_responses {
649            self.pending_events
650                .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
651                    peer: peer_id,
652                    request_id,
653                    error: OutboundFailure::ConnectionClosed,
654                }));
655        }
656    }
657
658    fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) {
659        if let Some(peer) = peer_id {
660            // If there are pending outgoing requests when a dial failure occurs,
661            // it is implied that we are not connected to the peer, since pending
662            // outgoing requests are drained when a connection is established and
663            // only created when a peer is not connected when a request is made.
664            // Thus these requests must be considered failed, even if there is
665            // another, concurrent dialing attempt ongoing.
666            if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
667                for request in pending {
668                    self.pending_events
669                        .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
670                            peer,
671                            request_id: request.request_id,
672                            error: OutboundFailure::DialFailure,
673                        }));
674                }
675            }
676        }
677    }
678
679    /// Preloads a new [`Handler`] with requests that are waiting to be sent to the newly connected peer.
680    fn preload_new_handler(
681        &mut self,
682        handler: &mut Handler<TCodec>,
683        peer: PeerId,
684        connection_id: ConnectionId,
685        remote_address: Option<Multiaddr>,
686    ) {
687        let mut connection = Connection::new(connection_id, remote_address);
688
689        if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
690            for request in pending_requests {
691                connection
692                    .pending_inbound_responses
693                    .insert(request.request_id);
694                handler.on_behaviour_event(request);
695            }
696        }
697
698        self.connected.entry(peer).or_default().push(connection);
699    }
700}
701
702impl<TCodec> NetworkBehaviour for Behaviour<TCodec>
703where
704    TCodec: Codec + Send + Clone + 'static,
705{
706    type ConnectionHandler = Handler<TCodec>;
707    type ToSwarm = Event<TCodec::Request, TCodec::Response>;
708
709    fn handle_established_inbound_connection(
710        &mut self,
711        connection_id: ConnectionId,
712        peer: PeerId,
713        _: &Multiaddr,
714        _: &Multiaddr,
715    ) -> Result<THandler<Self>, ConnectionDenied> {
716        let mut handler = Handler::new(
717            self.inbound_protocols.clone(),
718            self.codec.clone(),
719            self.config.request_timeout,
720            self.config.connection_keep_alive,
721            self.next_inbound_id.clone(),
722        );
723
724        self.preload_new_handler(&mut handler, peer, connection_id, None);
725
726        Ok(handler)
727    }
728
729    fn handle_pending_outbound_connection(
730        &mut self,
731        _connection_id: ConnectionId,
732        maybe_peer: Option<PeerId>,
733        _addresses: &[Multiaddr],
734        _effective_role: Endpoint,
735    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
736        let peer = match maybe_peer {
737            None => return Ok(vec![]),
738            Some(peer) => peer,
739        };
740
741        let mut addresses = Vec::new();
742        if let Some(connections) = self.connected.get(&peer) {
743            addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone()))
744        }
745        if let Some(more) = self.addresses.get(&peer) {
746            addresses.extend(more.iter().cloned());
747        }
748
749        Ok(addresses)
750    }
751
752    fn handle_established_outbound_connection(
753        &mut self,
754        connection_id: ConnectionId,
755        peer: PeerId,
756        remote_address: &Multiaddr,
757        _: Endpoint,
758    ) -> Result<THandler<Self>, ConnectionDenied> {
759        let mut handler = Handler::new(
760            self.inbound_protocols.clone(),
761            self.codec.clone(),
762            self.config.request_timeout,
763            self.config.connection_keep_alive,
764            self.next_inbound_id.clone(),
765        );
766
767        self.preload_new_handler(
768            &mut handler,
769            peer,
770            connection_id,
771            Some(remote_address.clone()),
772        );
773
774        Ok(handler)
775    }
776
777    fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
778        match event {
779            FromSwarm::ConnectionEstablished(_) => {}
780            FromSwarm::ConnectionClosed(connection_closed) => {
781                self.on_connection_closed(connection_closed)
782            }
783            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
784            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
785            FromSwarm::ListenFailure(_) => {}
786            FromSwarm::NewListener(_) => {}
787            FromSwarm::NewListenAddr(_) => {}
788            FromSwarm::ExpiredListenAddr(_) => {}
789            FromSwarm::ListenerError(_) => {}
790            FromSwarm::ListenerClosed(_) => {}
791            FromSwarm::NewExternalAddrCandidate(_) => {}
792            FromSwarm::ExternalAddrExpired(_) => {}
793            FromSwarm::ExternalAddrConfirmed(_) => {}
794        }
795    }
796
797    fn on_connection_handler_event(
798        &mut self,
799        peer: PeerId,
800        connection: ConnectionId,
801        event: THandlerOutEvent<Self>,
802    ) {
803        match event {
804            handler::Event::Response {
805                request_id,
806                response,
807            } => {
808                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
809                debug_assert!(
810                    removed,
811                    "Expect request_id to be pending before receiving response.",
812                );
813
814                let message = Message::Response {
815                    request_id,
816                    response,
817                };
818                self.pending_events
819                    .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message }));
820            }
821            handler::Event::Request {
822                request_id,
823                request,
824                sender,
825            } => {
826                let channel = ResponseChannel { sender };
827                let message = Message::Request {
828                    request_id,
829                    request,
830                    channel,
831                };
832                self.pending_events
833                    .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message }));
834
835                match self.get_connection_mut(&peer, connection) {
836                    Some(connection) => {
837                        let inserted = connection.pending_outbound_responses.insert(request_id);
838                        debug_assert!(inserted, "Expect id of new request to be unknown.");
839                    }
840                    // Connection closed after `Event::Request` has been emitted.
841                    None => {
842                        self.pending_events.push_back(ToSwarm::GenerateEvent(
843                            Event::InboundFailure {
844                                peer,
845                                request_id,
846                                error: InboundFailure::ConnectionClosed,
847                            },
848                        ));
849                    }
850                }
851            }
852            handler::Event::ResponseSent(request_id) => {
853                let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
854                debug_assert!(
855                    removed,
856                    "Expect request_id to be pending before response is sent."
857                );
858
859                self.pending_events
860                    .push_back(ToSwarm::GenerateEvent(Event::ResponseSent {
861                        peer,
862                        request_id,
863                    }));
864            }
865            handler::Event::ResponseOmission(request_id) => {
866                let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
867                debug_assert!(
868                    removed,
869                    "Expect request_id to be pending before response is omitted.",
870                );
871
872                self.pending_events
873                    .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
874                        peer,
875                        request_id,
876                        error: InboundFailure::ResponseOmission,
877                    }));
878            }
879            handler::Event::OutboundTimeout(request_id) => {
880                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
881                debug_assert!(
882                    removed,
883                    "Expect request_id to be pending before request times out."
884                );
885
886                self.pending_events
887                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
888                        peer,
889                        request_id,
890                        error: OutboundFailure::Timeout,
891                    }));
892            }
893            handler::Event::OutboundUnsupportedProtocols(request_id) => {
894                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
895                debug_assert!(
896                    removed,
897                    "Expect request_id to be pending before failing to connect.",
898                );
899
900                self.pending_events
901                    .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
902                        peer,
903                        request_id,
904                        error: OutboundFailure::UnsupportedProtocols,
905                    }));
906            }
907        }
908    }
909
910    fn poll(
911        &mut self,
912        _: &mut Context<'_>,
913        _: &mut impl PollParameters,
914    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
915        if let Some(ev) = self.pending_events.pop_front() {
916            return Poll::Ready(ev);
917        } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
918            self.pending_events.shrink_to_fit();
919        }
920
921        Poll::Pending
922    }
923}
924
925/// Internal threshold for when to shrink the capacity
926/// of empty queues. If the capacity of an empty queue
927/// exceeds this threshold, the associated memory is
928/// released.
929const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
930
931/// Internal information tracked for an established connection.
932struct Connection {
933    id: ConnectionId,
934    remote_address: Option<Multiaddr>,
935    /// Pending outbound responses where corresponding inbound requests have
936    /// been received on this connection and emitted via `poll` but have not yet
937    /// been answered.
938    pending_outbound_responses: HashSet<RequestId>,
939    /// Pending inbound responses for previously sent requests on this
940    /// connection.
941    pending_inbound_responses: HashSet<RequestId>,
942}
943
944impl Connection {
945    fn new(id: ConnectionId, remote_address: Option<Multiaddr>) -> Self {
946        Self {
947            id,
948            remote_address,
949            pending_outbound_responses: Default::default(),
950            pending_inbound_responses: Default::default(),
951        }
952    }
953}