1pub(crate) mod protocol;
22
23pub use protocol::ProtocolSupport;
24
25use crate::codec::Codec;
26use crate::handler::protocol::{RequestProtocol, ResponseProtocol};
27use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD};
28
29use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered};
30use instant::Instant;
31use libp2p_swarm::handler::{
32 ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
33 ListenUpgradeError,
34};
35use libp2p_swarm::{
36 handler::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError},
37 SubstreamProtocol,
38};
39use smallvec::SmallVec;
40use std::{
41 collections::VecDeque,
42 fmt,
43 sync::{
44 atomic::{AtomicU64, Ordering},
45 Arc,
46 },
47 task::{Context, Poll},
48 time::Duration,
49};
50
51pub struct Handler<TCodec>
53where
54 TCodec: Codec,
55{
56 inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
58 codec: TCodec,
60 keep_alive_timeout: Duration,
63 substream_timeout: Duration,
66 keep_alive: KeepAlive,
68 pending_events: VecDeque<Event<TCodec>>,
70 outbound: VecDeque<RequestProtocol<TCodec>>,
72 inbound: FuturesUnordered<
74 BoxFuture<
75 'static,
76 Result<
77 (
78 (RequestId, TCodec::Request),
79 oneshot::Sender<TCodec::Response>,
80 ),
81 oneshot::Canceled,
82 >,
83 >,
84 >,
85 inbound_request_id: Arc<AtomicU64>,
86}
87
88impl<TCodec> Handler<TCodec>
89where
90 TCodec: Codec + Send + Clone + 'static,
91{
92 pub(super) fn new(
93 inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
94 codec: TCodec,
95 keep_alive_timeout: Duration,
96 substream_timeout: Duration,
97 inbound_request_id: Arc<AtomicU64>,
98 ) -> Self {
99 Self {
100 inbound_protocols,
101 codec,
102 keep_alive: KeepAlive::Yes,
103 keep_alive_timeout,
104 substream_timeout,
105 outbound: VecDeque::new(),
106 inbound: FuturesUnordered::new(),
107 pending_events: VecDeque::new(),
108 inbound_request_id,
109 }
110 }
111
112 fn on_fully_negotiated_inbound(
113 &mut self,
114 FullyNegotiatedInbound {
115 protocol: sent,
116 info: request_id,
117 }: FullyNegotiatedInbound<
118 <Self as ConnectionHandler>::InboundProtocol,
119 <Self as ConnectionHandler>::InboundOpenInfo,
120 >,
121 ) {
122 if sent {
123 self.pending_events
124 .push_back(Event::ResponseSent(request_id))
125 } else {
126 self.pending_events
127 .push_back(Event::ResponseOmission(request_id))
128 }
129 }
130
131 fn on_dial_upgrade_error(
132 &mut self,
133 DialUpgradeError { info, error }: DialUpgradeError<
134 <Self as ConnectionHandler>::OutboundOpenInfo,
135 <Self as ConnectionHandler>::OutboundProtocol,
136 >,
137 ) {
138 match error {
139 StreamUpgradeError::Timeout => {
140 self.pending_events.push_back(Event::OutboundTimeout(info));
141 }
142 StreamUpgradeError::NegotiationFailed => {
143 self.pending_events
149 .push_back(Event::OutboundUnsupportedProtocols(info));
150 }
151 StreamUpgradeError::Apply(e) => {
152 log::debug!("outbound stream {info} failed: {e}");
153 }
154 StreamUpgradeError::Io(e) => {
155 log::debug!("outbound stream {info} failed: {e}");
156 }
157 }
158 }
159 fn on_listen_upgrade_error(
160 &mut self,
161 ListenUpgradeError { error, info }: ListenUpgradeError<
162 <Self as ConnectionHandler>::InboundOpenInfo,
163 <Self as ConnectionHandler>::InboundProtocol,
164 >,
165 ) {
166 log::debug!("inbound stream {info} failed: {error}");
167 }
168}
169
170pub enum Event<TCodec>
172where
173 TCodec: Codec,
174{
175 Request {
177 request_id: RequestId,
178 request: TCodec::Request,
179 sender: oneshot::Sender<TCodec::Response>,
180 },
181 Response {
183 request_id: RequestId,
184 response: TCodec::Response,
185 },
186 ResponseSent(RequestId),
188 ResponseOmission(RequestId),
191 OutboundTimeout(RequestId),
194 OutboundUnsupportedProtocols(RequestId),
196}
197
198impl<TCodec: Codec> fmt::Debug for Event<TCodec> {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 match self {
201 Event::Request {
202 request_id,
203 request: _,
204 sender: _,
205 } => f
206 .debug_struct("Event::Request")
207 .field("request_id", request_id)
208 .finish(),
209 Event::Response {
210 request_id,
211 response: _,
212 } => f
213 .debug_struct("Event::Response")
214 .field("request_id", request_id)
215 .finish(),
216 Event::ResponseSent(request_id) => f
217 .debug_tuple("Event::ResponseSent")
218 .field(request_id)
219 .finish(),
220 Event::ResponseOmission(request_id) => f
221 .debug_tuple("Event::ResponseOmission")
222 .field(request_id)
223 .finish(),
224 Event::OutboundTimeout(request_id) => f
225 .debug_tuple("Event::OutboundTimeout")
226 .field(request_id)
227 .finish(),
228 Event::OutboundUnsupportedProtocols(request_id) => f
229 .debug_tuple("Event::OutboundUnsupportedProtocols")
230 .field(request_id)
231 .finish(),
232 }
233 }
234}
235
236impl<TCodec> ConnectionHandler for Handler<TCodec>
237where
238 TCodec: Codec + Send + Clone + 'static,
239{
240 type FromBehaviour = RequestProtocol<TCodec>;
241 type ToBehaviour = Event<TCodec>;
242 type Error = void::Void;
243 type InboundProtocol = ResponseProtocol<TCodec>;
244 type OutboundProtocol = RequestProtocol<TCodec>;
245 type OutboundOpenInfo = RequestId;
246 type InboundOpenInfo = RequestId;
247
248 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
249 let (rq_send, rq_recv) = oneshot::channel();
252
253 let (rs_send, rs_recv) = oneshot::channel();
256
257 let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed));
258
259 let proto = ResponseProtocol {
266 protocols: self.inbound_protocols.clone(),
267 codec: self.codec.clone(),
268 request_sender: rq_send,
269 response_receiver: rs_recv,
270 request_id,
271 };
272
273 self.inbound
277 .push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed());
278
279 SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout)
280 }
281
282 fn on_behaviour_event(&mut self, request: Self::FromBehaviour) {
283 self.keep_alive = KeepAlive::Yes;
284 self.outbound.push_back(request);
285 }
286
287 fn connection_keep_alive(&self) -> KeepAlive {
288 self.keep_alive
289 }
290
291 #[allow(deprecated)]
292 fn poll(
293 &mut self,
294 cx: &mut Context<'_>,
295 ) -> Poll<
296 ConnectionHandlerEvent<RequestProtocol<TCodec>, RequestId, Self::ToBehaviour, Self::Error>,
297 > {
298 if let Some(event) = self.pending_events.pop_front() {
300 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
301 } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
302 self.pending_events.shrink_to_fit();
303 }
304
305 while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
307 match result {
308 Ok(((id, rq), rs_sender)) => {
309 self.keep_alive = KeepAlive::Yes;
311 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request {
312 request_id: id,
313 request: rq,
314 sender: rs_sender,
315 }));
316 }
317 Err(oneshot::Canceled) => {
318 }
322 }
323 }
324
325 if let Some(request) = self.outbound.pop_front() {
327 let info = request.request_id;
328 return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
329 protocol: SubstreamProtocol::new(request, info)
330 .with_timeout(self.substream_timeout),
331 });
332 }
333
334 debug_assert!(self.outbound.is_empty());
335
336 if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
337 self.outbound.shrink_to_fit();
338 }
339
340 #[allow(deprecated)]
341 if self.inbound.is_empty() && self.keep_alive.is_yes() {
342 let until = Instant::now() + self.substream_timeout + self.keep_alive_timeout;
346 self.keep_alive = KeepAlive::Until(until);
347 }
348
349 Poll::Pending
350 }
351
352 fn on_connection_event(
353 &mut self,
354 event: ConnectionEvent<
355 Self::InboundProtocol,
356 Self::OutboundProtocol,
357 Self::InboundOpenInfo,
358 Self::OutboundOpenInfo,
359 >,
360 ) {
361 match event {
362 ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
363 self.on_fully_negotiated_inbound(fully_negotiated_inbound)
364 }
365 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
366 protocol: response,
367 info: request_id,
368 }) => {
369 self.pending_events.push_back(Event::Response {
370 request_id,
371 response,
372 });
373 }
374 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
375 self.on_dial_upgrade_error(dial_upgrade_error)
376 }
377 ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
378 self.on_listen_upgrade_error(listen_upgrade_error)
379 }
380 ConnectionEvent::AddressChange(_)
381 | ConnectionEvent::LocalProtocolsChange(_)
382 | ConnectionEvent::RemoteProtocolsChange(_) => {}
383 }
384 }
385}