libp2p_request_response/handler/
protocol.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! The definition of a request/response protocol via inbound
22//! and outbound substream upgrades. The inbound upgrade
23//! receives a request and sends a response, whereas the
24//! outbound upgrade send a request and receives a response.
25
26use crate::codec::Codec;
27use crate::RequestId;
28
29use futures::{channel::oneshot, future::BoxFuture, prelude::*};
30use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
31use libp2p_swarm::Stream;
32use smallvec::SmallVec;
33use std::{fmt, io};
34
35/// The level of support for a particular protocol.
36#[derive(Debug, Clone)]
37pub enum ProtocolSupport {
38    /// The protocol is only supported for inbound requests.
39    Inbound,
40    /// The protocol is only supported for outbound requests.
41    Outbound,
42    /// The protocol is supported for inbound and outbound requests.
43    Full,
44}
45
46impl ProtocolSupport {
47    /// Whether inbound requests are supported.
48    pub fn inbound(&self) -> bool {
49        match self {
50            ProtocolSupport::Inbound | ProtocolSupport::Full => true,
51            ProtocolSupport::Outbound => false,
52        }
53    }
54
55    /// Whether outbound requests are supported.
56    pub fn outbound(&self) -> bool {
57        match self {
58            ProtocolSupport::Outbound | ProtocolSupport::Full => true,
59            ProtocolSupport::Inbound => false,
60        }
61    }
62}
63
64/// Response substream upgrade protocol.
65///
66/// Receives a request and sends a response.
67#[derive(Debug)]
68pub struct ResponseProtocol<TCodec>
69where
70    TCodec: Codec,
71{
72    pub(crate) codec: TCodec,
73    pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
74    pub(crate) request_sender: oneshot::Sender<(RequestId, TCodec::Request)>,
75    pub(crate) response_receiver: oneshot::Receiver<TCodec::Response>,
76    pub(crate) request_id: RequestId,
77}
78
79impl<TCodec> UpgradeInfo for ResponseProtocol<TCodec>
80where
81    TCodec: Codec,
82{
83    type Info = TCodec::Protocol;
84    type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
85
86    fn protocol_info(&self) -> Self::InfoIter {
87        self.protocols.clone().into_iter()
88    }
89}
90
91impl<TCodec> InboundUpgrade<Stream> for ResponseProtocol<TCodec>
92where
93    TCodec: Codec + Send + 'static,
94{
95    type Output = bool;
96    type Error = io::Error;
97    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
98
99    fn upgrade_inbound(mut self, mut io: Stream, protocol: Self::Info) -> Self::Future {
100        async move {
101            let read = self.codec.read_request(&protocol, &mut io);
102            let request = read.await?;
103            match self.request_sender.send((self.request_id, request)) {
104                Ok(()) => {},
105                Err(_) => panic!(
106                    "Expect request receiver to be alive i.e. protocol handler to be alive.",
107                ),
108            }
109
110            if let Ok(response) = self.response_receiver.await {
111                let write = self.codec.write_response(&protocol, &mut io, response);
112                write.await?;
113
114                io.close().await?;
115                // Response was sent. Indicate to handler to emit a `ResponseSent` event.
116                Ok(true)
117            } else {
118                io.close().await?;
119                // No response was sent. Indicate to handler to emit a `ResponseOmission` event.
120                Ok(false)
121            }
122        }.boxed()
123    }
124}
125
126/// Request substream upgrade protocol.
127///
128/// Sends a request and receives a response.
129pub struct RequestProtocol<TCodec>
130where
131    TCodec: Codec,
132{
133    pub(crate) codec: TCodec,
134    pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
135    pub(crate) request_id: RequestId,
136    pub(crate) request: TCodec::Request,
137}
138
139impl<TCodec> fmt::Debug for RequestProtocol<TCodec>
140where
141    TCodec: Codec,
142{
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        f.debug_struct("RequestProtocol")
145            .field("request_id", &self.request_id)
146            .finish()
147    }
148}
149
150impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
151where
152    TCodec: Codec,
153{
154    type Info = TCodec::Protocol;
155    type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
156
157    fn protocol_info(&self) -> Self::InfoIter {
158        self.protocols.clone().into_iter()
159    }
160}
161
162impl<TCodec> OutboundUpgrade<Stream> for RequestProtocol<TCodec>
163where
164    TCodec: Codec + Send + 'static,
165{
166    type Output = TCodec::Response;
167    type Error = io::Error;
168    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
169
170    fn upgrade_outbound(mut self, mut io: Stream, protocol: Self::Info) -> Self::Future {
171        async move {
172            let write = self.codec.write_request(&protocol, &mut io, self.request);
173            write.await?;
174            io.close().await?;
175            let read = self.codec.read_response(&protocol, &mut io);
176            let response = read.await?;
177            Ok(response)
178        }
179        .boxed()
180    }
181}