libp2p_request_response/
handler.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21pub(crate) mod protocol;
22
23pub use protocol::ProtocolSupport;
24
25use crate::codec::Codec;
26use crate::handler::protocol::{RequestProtocol, ResponseProtocol};
27use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD};
28
29use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered};
30use instant::Instant;
31use libp2p_swarm::handler::{
32    ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
33    ListenUpgradeError,
34};
35use libp2p_swarm::{
36    handler::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError},
37    SubstreamProtocol,
38};
39use smallvec::SmallVec;
40use std::{
41    collections::VecDeque,
42    fmt,
43    sync::{
44        atomic::{AtomicU64, Ordering},
45        Arc,
46    },
47    task::{Context, Poll},
48    time::Duration,
49};
50
51/// A connection handler for a request response [`Behaviour`](super::Behaviour) protocol.
52pub struct Handler<TCodec>
53where
54    TCodec: Codec,
55{
56    /// The supported inbound protocols.
57    inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
58    /// The request/response message codec.
59    codec: TCodec,
60    /// The keep-alive timeout of idle connections. A connection is considered
61    /// idle if there are no outbound substreams.
62    keep_alive_timeout: Duration,
63    /// The timeout for inbound and outbound substreams (i.e. request
64    /// and response processing).
65    substream_timeout: Duration,
66    /// The current connection keep-alive.
67    keep_alive: KeepAlive,
68    /// Queue of events to emit in `poll()`.
69    pending_events: VecDeque<Event<TCodec>>,
70    /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
71    outbound: VecDeque<RequestProtocol<TCodec>>,
72    /// Inbound upgrades waiting for the incoming request.
73    inbound: FuturesUnordered<
74        BoxFuture<
75            'static,
76            Result<
77                (
78                    (RequestId, TCodec::Request),
79                    oneshot::Sender<TCodec::Response>,
80                ),
81                oneshot::Canceled,
82            >,
83        >,
84    >,
85    inbound_request_id: Arc<AtomicU64>,
86}
87
88impl<TCodec> Handler<TCodec>
89where
90    TCodec: Codec + Send + Clone + 'static,
91{
92    pub(super) fn new(
93        inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
94        codec: TCodec,
95        keep_alive_timeout: Duration,
96        substream_timeout: Duration,
97        inbound_request_id: Arc<AtomicU64>,
98    ) -> Self {
99        Self {
100            inbound_protocols,
101            codec,
102            keep_alive: KeepAlive::Yes,
103            keep_alive_timeout,
104            substream_timeout,
105            outbound: VecDeque::new(),
106            inbound: FuturesUnordered::new(),
107            pending_events: VecDeque::new(),
108            inbound_request_id,
109        }
110    }
111
112    fn on_fully_negotiated_inbound(
113        &mut self,
114        FullyNegotiatedInbound {
115            protocol: sent,
116            info: request_id,
117        }: FullyNegotiatedInbound<
118            <Self as ConnectionHandler>::InboundProtocol,
119            <Self as ConnectionHandler>::InboundOpenInfo,
120        >,
121    ) {
122        if sent {
123            self.pending_events
124                .push_back(Event::ResponseSent(request_id))
125        } else {
126            self.pending_events
127                .push_back(Event::ResponseOmission(request_id))
128        }
129    }
130
131    fn on_dial_upgrade_error(
132        &mut self,
133        DialUpgradeError { info, error }: DialUpgradeError<
134            <Self as ConnectionHandler>::OutboundOpenInfo,
135            <Self as ConnectionHandler>::OutboundProtocol,
136        >,
137    ) {
138        match error {
139            StreamUpgradeError::Timeout => {
140                self.pending_events.push_back(Event::OutboundTimeout(info));
141            }
142            StreamUpgradeError::NegotiationFailed => {
143                // The remote merely doesn't support the protocol(s) we requested.
144                // This is no reason to close the connection, which may
145                // successfully communicate with other protocols already.
146                // An event is reported to permit user code to react to the fact that
147                // the remote peer does not support the requested protocol(s).
148                self.pending_events
149                    .push_back(Event::OutboundUnsupportedProtocols(info));
150            }
151            StreamUpgradeError::Apply(e) => {
152                log::debug!("outbound stream {info} failed: {e}");
153            }
154            StreamUpgradeError::Io(e) => {
155                log::debug!("outbound stream {info} failed: {e}");
156            }
157        }
158    }
159    fn on_listen_upgrade_error(
160        &mut self,
161        ListenUpgradeError { error, info }: ListenUpgradeError<
162            <Self as ConnectionHandler>::InboundOpenInfo,
163            <Self as ConnectionHandler>::InboundProtocol,
164        >,
165    ) {
166        log::debug!("inbound stream {info} failed: {error}");
167    }
168}
169
170/// The events emitted by the [`Handler`].
171pub enum Event<TCodec>
172where
173    TCodec: Codec,
174{
175    /// A request has been received.
176    Request {
177        request_id: RequestId,
178        request: TCodec::Request,
179        sender: oneshot::Sender<TCodec::Response>,
180    },
181    /// A response has been received.
182    Response {
183        request_id: RequestId,
184        response: TCodec::Response,
185    },
186    /// A response to an inbound request has been sent.
187    ResponseSent(RequestId),
188    /// A response to an inbound request was omitted as a result
189    /// of dropping the response `sender` of an inbound `Request`.
190    ResponseOmission(RequestId),
191    /// An outbound request timed out while sending the request
192    /// or waiting for the response.
193    OutboundTimeout(RequestId),
194    /// An outbound request failed to negotiate a mutually supported protocol.
195    OutboundUnsupportedProtocols(RequestId),
196}
197
198impl<TCodec: Codec> fmt::Debug for Event<TCodec> {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        match self {
201            Event::Request {
202                request_id,
203                request: _,
204                sender: _,
205            } => f
206                .debug_struct("Event::Request")
207                .field("request_id", request_id)
208                .finish(),
209            Event::Response {
210                request_id,
211                response: _,
212            } => f
213                .debug_struct("Event::Response")
214                .field("request_id", request_id)
215                .finish(),
216            Event::ResponseSent(request_id) => f
217                .debug_tuple("Event::ResponseSent")
218                .field(request_id)
219                .finish(),
220            Event::ResponseOmission(request_id) => f
221                .debug_tuple("Event::ResponseOmission")
222                .field(request_id)
223                .finish(),
224            Event::OutboundTimeout(request_id) => f
225                .debug_tuple("Event::OutboundTimeout")
226                .field(request_id)
227                .finish(),
228            Event::OutboundUnsupportedProtocols(request_id) => f
229                .debug_tuple("Event::OutboundUnsupportedProtocols")
230                .field(request_id)
231                .finish(),
232        }
233    }
234}
235
236impl<TCodec> ConnectionHandler for Handler<TCodec>
237where
238    TCodec: Codec + Send + Clone + 'static,
239{
240    type FromBehaviour = RequestProtocol<TCodec>;
241    type ToBehaviour = Event<TCodec>;
242    type Error = void::Void;
243    type InboundProtocol = ResponseProtocol<TCodec>;
244    type OutboundProtocol = RequestProtocol<TCodec>;
245    type OutboundOpenInfo = RequestId;
246    type InboundOpenInfo = RequestId;
247
248    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
249        // A channel for notifying the handler when the inbound
250        // upgrade received the request.
251        let (rq_send, rq_recv) = oneshot::channel();
252
253        // A channel for notifying the inbound upgrade when the
254        // response is sent.
255        let (rs_send, rs_recv) = oneshot::channel();
256
257        let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed));
258
259        // By keeping all I/O inside the `ResponseProtocol` and thus the
260        // inbound substream upgrade via above channels, we ensure that it
261        // is all subject to the configured timeout without extra bookkeeping
262        // for inbound substreams as well as their timeouts and also make the
263        // implementation of inbound and outbound upgrades symmetric in
264        // this sense.
265        let proto = ResponseProtocol {
266            protocols: self.inbound_protocols.clone(),
267            codec: self.codec.clone(),
268            request_sender: rq_send,
269            response_receiver: rs_recv,
270            request_id,
271        };
272
273        // The handler waits for the request to come in. It then emits
274        // `Event::Request` together with a
275        // `ResponseChannel`.
276        self.inbound
277            .push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed());
278
279        SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout)
280    }
281
282    fn on_behaviour_event(&mut self, request: Self::FromBehaviour) {
283        self.keep_alive = KeepAlive::Yes;
284        self.outbound.push_back(request);
285    }
286
287    fn connection_keep_alive(&self) -> KeepAlive {
288        self.keep_alive
289    }
290
291    #[allow(deprecated)]
292    fn poll(
293        &mut self,
294        cx: &mut Context<'_>,
295    ) -> Poll<
296        ConnectionHandlerEvent<RequestProtocol<TCodec>, RequestId, Self::ToBehaviour, Self::Error>,
297    > {
298        // Drain pending events.
299        if let Some(event) = self.pending_events.pop_front() {
300            return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
301        } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
302            self.pending_events.shrink_to_fit();
303        }
304
305        // Check for inbound requests.
306        while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
307            match result {
308                Ok(((id, rq), rs_sender)) => {
309                    // We received an inbound request.
310                    self.keep_alive = KeepAlive::Yes;
311                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request {
312                        request_id: id,
313                        request: rq,
314                        sender: rs_sender,
315                    }));
316                }
317                Err(oneshot::Canceled) => {
318                    // The inbound upgrade has errored or timed out reading
319                    // or waiting for the request. The handler is informed
320                    // via `on_connection_event` call with `ConnectionEvent::ListenUpgradeError`.
321                }
322            }
323        }
324
325        // Emit outbound requests.
326        if let Some(request) = self.outbound.pop_front() {
327            let info = request.request_id;
328            return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
329                protocol: SubstreamProtocol::new(request, info)
330                    .with_timeout(self.substream_timeout),
331            });
332        }
333
334        debug_assert!(self.outbound.is_empty());
335
336        if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
337            self.outbound.shrink_to_fit();
338        }
339
340        #[allow(deprecated)]
341        if self.inbound.is_empty() && self.keep_alive.is_yes() {
342            // No new inbound or outbound requests. However, we may just have
343            // started the latest inbound or outbound upgrade(s), so make sure
344            // the keep-alive timeout is preceded by the substream timeout.
345            let until = Instant::now() + self.substream_timeout + self.keep_alive_timeout;
346            self.keep_alive = KeepAlive::Until(until);
347        }
348
349        Poll::Pending
350    }
351
352    fn on_connection_event(
353        &mut self,
354        event: ConnectionEvent<
355            Self::InboundProtocol,
356            Self::OutboundProtocol,
357            Self::InboundOpenInfo,
358            Self::OutboundOpenInfo,
359        >,
360    ) {
361        match event {
362            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
363                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
364            }
365            ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
366                protocol: response,
367                info: request_id,
368            }) => {
369                self.pending_events.push_back(Event::Response {
370                    request_id,
371                    response,
372                });
373            }
374            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
375                self.on_dial_upgrade_error(dial_upgrade_error)
376            }
377            ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
378                self.on_listen_upgrade_error(listen_upgrade_error)
379            }
380            ConnectionEvent::AddressChange(_)
381            | ConnectionEvent::LocalProtocolsChange(_)
382            | ConnectionEvent::RemoteProtocolsChange(_) => {}
383        }
384    }
385}