libp2p_request_response/handler/
protocol.rs1use crate::codec::Codec;
27use crate::RequestId;
28
29use futures::{channel::oneshot, future::BoxFuture, prelude::*};
30use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
31use libp2p_swarm::Stream;
32use smallvec::SmallVec;
33use std::{fmt, io};
34
35#[derive(Debug, Clone)]
37pub enum ProtocolSupport {
38 Inbound,
40 Outbound,
42 Full,
44}
45
46impl ProtocolSupport {
47 pub fn inbound(&self) -> bool {
49 match self {
50 ProtocolSupport::Inbound | ProtocolSupport::Full => true,
51 ProtocolSupport::Outbound => false,
52 }
53 }
54
55 pub fn outbound(&self) -> bool {
57 match self {
58 ProtocolSupport::Outbound | ProtocolSupport::Full => true,
59 ProtocolSupport::Inbound => false,
60 }
61 }
62}
63
64#[derive(Debug)]
68pub struct ResponseProtocol<TCodec>
69where
70 TCodec: Codec,
71{
72 pub(crate) codec: TCodec,
73 pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
74 pub(crate) request_sender: oneshot::Sender<(RequestId, TCodec::Request)>,
75 pub(crate) response_receiver: oneshot::Receiver<TCodec::Response>,
76 pub(crate) request_id: RequestId,
77}
78
79impl<TCodec> UpgradeInfo for ResponseProtocol<TCodec>
80where
81 TCodec: Codec,
82{
83 type Info = TCodec::Protocol;
84 type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
85
86 fn protocol_info(&self) -> Self::InfoIter {
87 self.protocols.clone().into_iter()
88 }
89}
90
91impl<TCodec> InboundUpgrade<Stream> for ResponseProtocol<TCodec>
92where
93 TCodec: Codec + Send + 'static,
94{
95 type Output = bool;
96 type Error = io::Error;
97 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
98
99 fn upgrade_inbound(mut self, mut io: Stream, protocol: Self::Info) -> Self::Future {
100 async move {
101 let read = self.codec.read_request(&protocol, &mut io);
102 let request = read.await?;
103 match self.request_sender.send((self.request_id, request)) {
104 Ok(()) => {},
105 Err(_) => panic!(
106 "Expect request receiver to be alive i.e. protocol handler to be alive.",
107 ),
108 }
109
110 if let Ok(response) = self.response_receiver.await {
111 let write = self.codec.write_response(&protocol, &mut io, response);
112 write.await?;
113
114 io.close().await?;
115 Ok(true)
117 } else {
118 io.close().await?;
119 Ok(false)
121 }
122 }.boxed()
123 }
124}
125
126pub struct RequestProtocol<TCodec>
130where
131 TCodec: Codec,
132{
133 pub(crate) codec: TCodec,
134 pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
135 pub(crate) request_id: RequestId,
136 pub(crate) request: TCodec::Request,
137}
138
139impl<TCodec> fmt::Debug for RequestProtocol<TCodec>
140where
141 TCodec: Codec,
142{
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 f.debug_struct("RequestProtocol")
145 .field("request_id", &self.request_id)
146 .finish()
147 }
148}
149
150impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
151where
152 TCodec: Codec,
153{
154 type Info = TCodec::Protocol;
155 type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
156
157 fn protocol_info(&self) -> Self::InfoIter {
158 self.protocols.clone().into_iter()
159 }
160}
161
162impl<TCodec> OutboundUpgrade<Stream> for RequestProtocol<TCodec>
163where
164 TCodec: Codec + Send + 'static,
165{
166 type Output = TCodec::Response;
167 type Error = io::Error;
168 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
169
170 fn upgrade_outbound(mut self, mut io: Stream, protocol: Self::Info) -> Self::Future {
171 async move {
172 let write = self.codec.write_request(&protocol, &mut io, self.request);
173 write.await?;
174 io.close().await?;
175 let read = self.codec.read_response(&protocol, &mut io);
176 let response = read.await?;
177 Ok(response)
178 }
179 .boxed()
180 }
181}