litep2p/protocol/request_response/
handle.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    error::{ImmediateDialError, SubstreamError},
23    multistream_select::ProtocolError,
24    types::{protocol::ProtocolName, RequestId},
25    Error, PeerId,
26};
27
28use futures::channel;
29use tokio::sync::{
30    mpsc::{Receiver, Sender},
31    oneshot,
32};
33
34use std::{
35    collections::HashMap,
36    io::ErrorKind,
37    pin::Pin,
38    sync::{
39        atomic::{AtomicUsize, Ordering},
40        Arc,
41    },
42    task::{Context, Poll},
43};
44
45/// Logging target for the file.
46const LOG_TARGET: &str = "litep2p::request-response::handle";
47
48/// Request-response error.
49#[derive(Debug, PartialEq)]
50pub enum RequestResponseError {
51    /// Request was rejected.
52    Rejected(RejectReason),
53
54    /// Request was canceled by the local node.
55    Canceled,
56
57    /// Request timed out.
58    Timeout,
59
60    /// The peer is not connected and the dialing option was [`DialOptions::Reject`].
61    NotConnected,
62
63    /// Too large payload.
64    TooLargePayload,
65
66    /// Protocol not supported.
67    UnsupportedProtocol,
68}
69
70/// The reason why a request was rejected.
71#[derive(Debug, PartialEq)]
72pub enum RejectReason {
73    /// Substream error.
74    SubstreamOpenError(SubstreamError),
75
76    /// The peer disconnected before the request was processed.
77    ConnectionClosed,
78
79    /// The substream was closed before the request was processed.
80    SubstreamClosed,
81
82    /// The dial failed.
83    ///
84    /// If the dial failure is immediate, the error is included.
85    ///
86    /// If the dialing process is happening in parallel on multiple
87    /// addresses (potentially with multiple protocols), the dialing
88    /// process is not considered immediate and the given errors are not
89    /// propagated for simplicity.
90    DialFailed(Option<ImmediateDialError>),
91}
92
93impl From<SubstreamError> for RejectReason {
94    fn from(error: SubstreamError) -> Self {
95        // Convert `ErrorKind::NotConnected` to `RejectReason::ConnectionClosed`.
96        match error {
97            SubstreamError::IoError(error) if error == ErrorKind::NotConnected =>
98                RejectReason::ConnectionClosed,
99            SubstreamError::YamuxError(crate::yamux::ConnectionError::Io(error), _)
100                if error.kind() == ErrorKind::NotConnected =>
101                RejectReason::ConnectionClosed,
102            SubstreamError::NegotiationError(crate::error::NegotiationError::IoError(error))
103                if error == ErrorKind::NotConnected =>
104                RejectReason::ConnectionClosed,
105            SubstreamError::NegotiationError(
106                crate::error::NegotiationError::MultistreamSelectError(
107                    crate::multistream_select::NegotiationError::ProtocolError(
108                        ProtocolError::IoError(error),
109                    ),
110                ),
111            ) if error.kind() == ErrorKind::NotConnected => RejectReason::ConnectionClosed,
112            error => RejectReason::SubstreamOpenError(error),
113        }
114    }
115}
116
117/// Request-response events.
118pub(super) enum InnerRequestResponseEvent {
119    /// Request received from remote
120    RequestReceived {
121        /// Peer Id.
122        peer: PeerId,
123
124        /// Fallback protocol, if the substream was negotiated using a fallback.
125        fallback: Option<ProtocolName>,
126
127        /// Request ID.
128        request_id: RequestId,
129
130        /// Received request.
131        request: Vec<u8>,
132
133        /// `oneshot::Sender` for response.
134        response_tx: oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>,
135    },
136
137    /// Response received.
138    ResponseReceived {
139        /// Peer Id.
140        peer: PeerId,
141
142        /// Fallback protocol, if the substream was negotiated using a fallback.
143        fallback: Option<ProtocolName>,
144
145        /// Request ID.
146        request_id: RequestId,
147
148        /// Received request.
149        response: Vec<u8>,
150    },
151
152    /// Request failed.
153    RequestFailed {
154        /// Peer Id.
155        peer: PeerId,
156
157        /// Request ID.
158        request_id: RequestId,
159
160        /// Request-response error.
161        error: RequestResponseError,
162    },
163}
164
165impl From<InnerRequestResponseEvent> for RequestResponseEvent {
166    fn from(event: InnerRequestResponseEvent) -> Self {
167        match event {
168            InnerRequestResponseEvent::ResponseReceived {
169                peer,
170                request_id,
171                response,
172                fallback,
173            } => RequestResponseEvent::ResponseReceived {
174                peer,
175                request_id,
176                response,
177                fallback,
178            },
179            InnerRequestResponseEvent::RequestFailed {
180                peer,
181                request_id,
182                error,
183            } => RequestResponseEvent::RequestFailed {
184                peer,
185                request_id,
186                error,
187            },
188            _ => panic!("unhandled event"),
189        }
190    }
191}
192
193/// Request-response events.
194#[derive(Debug, PartialEq)]
195pub enum RequestResponseEvent {
196    /// Request received from remote
197    RequestReceived {
198        /// Peer Id.
199        peer: PeerId,
200
201        /// Fallback protocol, if the substream was negotiated using a fallback.
202        fallback: Option<ProtocolName>,
203
204        /// Request ID.
205        ///
206        /// While `request_id` is guaranteed to be unique for this protocols, the request IDs are
207        /// not unique across different request-response protocols, meaning two different
208        /// request-response protocols can both assign `RequestId(123)` for any given request.
209        request_id: RequestId,
210
211        /// Received request.
212        request: Vec<u8>,
213    },
214
215    /// Response received.
216    ResponseReceived {
217        /// Peer Id.
218        peer: PeerId,
219
220        /// Request ID.
221        request_id: RequestId,
222
223        /// Fallback protocol, if the substream was negotiated using a fallback.
224        fallback: Option<ProtocolName>,
225
226        /// Received request.
227        response: Vec<u8>,
228    },
229
230    /// Request failed.
231    RequestFailed {
232        /// Peer Id.
233        peer: PeerId,
234
235        /// Request ID.
236        request_id: RequestId,
237
238        /// Request-response error.
239        error: RequestResponseError,
240    },
241}
242
243/// Dial behavior when sending requests.
244#[derive(Debug)]
245pub enum DialOptions {
246    /// If the peer is not currently connected, attempt to dial them before sending a request.
247    ///
248    /// If the dial succeeds, the request is sent to the peer once the peer has been registered
249    /// to the protocol.
250    ///
251    /// If the dial fails, [`RequestResponseError::Rejected`] is returned.
252    Dial,
253
254    /// If the peer is not connected, immediately reject the request and return
255    /// [`RequestResponseError::NotConnected`].
256    Reject,
257}
258
259/// Request-response commands.
260pub(crate) enum RequestResponseCommand {
261    /// Send request to remote peer.
262    SendRequest {
263        /// Peer ID.
264        peer: PeerId,
265
266        /// Request ID.
267        ///
268        /// When a response is received or the request fails, the event contains this ID that
269        /// the user protocol can associate with the correct request.
270        ///
271        /// If the user protocol only has one active request per peer, this ID can be safely
272        /// discarded.
273        request_id: RequestId,
274
275        /// Request.
276        request: Vec<u8>,
277
278        /// Dial options, see [`DialOptions`] for more details.
279        dial_options: DialOptions,
280    },
281
282    SendRequestWithFallback {
283        /// Peer ID.
284        peer: PeerId,
285
286        /// Request ID.
287        request_id: RequestId,
288
289        /// Request that is sent over the main protocol, if negotiated.
290        request: Vec<u8>,
291
292        /// Request that is sent over the fallback protocol, if negotiated.
293        fallback: (ProtocolName, Vec<u8>),
294
295        /// Dial options, see [`DialOptions`] for more details.
296        dial_options: DialOptions,
297    },
298
299    /// Cancel outbound request.
300    CancelRequest {
301        /// Request ID.
302        request_id: RequestId,
303    },
304}
305
306/// Handle given to the user protocol which allows it to interact with the request-response
307/// protocol.
308pub struct RequestResponseHandle {
309    /// TX channel for sending commands to the request-response protocol.
310    event_rx: Receiver<InnerRequestResponseEvent>,
311
312    /// RX channel for receiving events from the request-response protocol.
313    command_tx: Sender<RequestResponseCommand>,
314
315    /// Pending responses.
316    pending_responses:
317        HashMap<RequestId, oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>>,
318
319    /// Next ephemeral request ID.
320    next_request_id: Arc<AtomicUsize>,
321}
322
323impl RequestResponseHandle {
324    /// Create new [`RequestResponseHandle`].
325    pub(super) fn new(
326        event_rx: Receiver<InnerRequestResponseEvent>,
327        command_tx: Sender<RequestResponseCommand>,
328        next_request_id: Arc<AtomicUsize>,
329    ) -> Self {
330        Self {
331            event_rx,
332            command_tx,
333            next_request_id,
334            pending_responses: HashMap::new(),
335        }
336    }
337
338    /// Reject an inbound request.
339    ///
340    /// Reject request received from a remote peer. The substream is dropped which signals
341    /// to the remote peer that request was rejected.
342    pub fn reject_request(&mut self, request_id: RequestId) {
343        match self.pending_responses.remove(&request_id) {
344            None => {
345                tracing::debug!(target: LOG_TARGET, ?request_id, "rejected request doesn't exist")
346            }
347            Some(sender) => {
348                tracing::debug!(target: LOG_TARGET, ?request_id, "reject request");
349                drop(sender);
350            }
351        }
352    }
353
354    /// Cancel an outbound request.
355    ///
356    /// Allows canceling an in-flight request if the local node is not interested in the answer
357    /// anymore. If the request was canceled, no event is reported to the user as the cancelation
358    /// always succeeds and it's assumed that the user does the necessary state clean up in their
359    /// end after calling [`RequestResponseHandle::cancel_request()`].
360    pub async fn cancel_request(&mut self, request_id: RequestId) {
361        tracing::trace!(target: LOG_TARGET, ?request_id, "cancel request");
362
363        let _ = self.command_tx.send(RequestResponseCommand::CancelRequest { request_id }).await;
364    }
365
366    /// Get next request ID.
367    fn next_request_id(&self) -> RequestId {
368        let request_id = self.next_request_id.fetch_add(1usize, Ordering::Relaxed);
369        RequestId::from(request_id)
370    }
371
372    /// Send request to remote peer.
373    ///
374    /// While the returned `RequestId` is guaranteed to be unique for this request-response
375    /// protocol, it's not unique across all installed request-response protocols. That is,
376    /// multiple request-response protocols can return the same `RequestId` and this must be
377    /// handled by the calling code correctly if the `RequestId`s are stored somewhere.
378    pub async fn send_request(
379        &mut self,
380        peer: PeerId,
381        request: Vec<u8>,
382        dial_options: DialOptions,
383    ) -> crate::Result<RequestId> {
384        tracing::trace!(target: LOG_TARGET, ?peer, "send request to peer");
385
386        let request_id = self.next_request_id();
387        self.command_tx
388            .send(RequestResponseCommand::SendRequest {
389                peer,
390                request_id,
391                request,
392                dial_options,
393            })
394            .await
395            .map(|_| request_id)
396            .map_err(From::from)
397    }
398
399    /// Attempt to send request to peer and if the channel is clogged, return
400    /// `Error::ChannelClogged`.
401    ///
402    /// While the returned `RequestId` is guaranteed to be unique for this request-response
403    /// protocol, it's not unique across all installed request-response protocols. That is,
404    /// multiple request-response protocols can return the same `RequestId` and this must be
405    /// handled by the calling code correctly if the `RequestId`s are stored somewhere.
406    pub fn try_send_request(
407        &mut self,
408        peer: PeerId,
409        request: Vec<u8>,
410        dial_options: DialOptions,
411    ) -> crate::Result<RequestId> {
412        tracing::trace!(target: LOG_TARGET, ?peer, "send request to peer");
413
414        let request_id = self.next_request_id();
415        self.command_tx
416            .try_send(RequestResponseCommand::SendRequest {
417                peer,
418                request_id,
419                request,
420                dial_options,
421            })
422            .map(|_| request_id)
423            .map_err(|_| Error::ChannelClogged)
424    }
425
426    /// Send request to remote peer with fallback.
427    pub async fn send_request_with_fallback(
428        &mut self,
429        peer: PeerId,
430        request: Vec<u8>,
431        fallback: (ProtocolName, Vec<u8>),
432        dial_options: DialOptions,
433    ) -> crate::Result<RequestId> {
434        tracing::trace!(
435            target: LOG_TARGET,
436            ?peer,
437            fallback = %fallback.0,
438            ?dial_options,
439            "send request with fallback to peer",
440        );
441
442        let request_id = self.next_request_id();
443        self.command_tx
444            .send(RequestResponseCommand::SendRequestWithFallback {
445                peer,
446                request_id,
447                fallback,
448                request,
449                dial_options,
450            })
451            .await
452            .map(|_| request_id)
453            .map_err(From::from)
454    }
455
456    /// Attempt to send request to peer with fallback and if the channel is clogged,
457    /// return `Error::ChannelClogged`.
458    pub fn try_send_request_with_fallback(
459        &mut self,
460        peer: PeerId,
461        request: Vec<u8>,
462        fallback: (ProtocolName, Vec<u8>),
463        dial_options: DialOptions,
464    ) -> crate::Result<RequestId> {
465        tracing::trace!(
466            target: LOG_TARGET,
467            ?peer,
468            fallback = %fallback.0,
469            ?dial_options,
470            "send request with fallback to peer",
471        );
472
473        let request_id = self.next_request_id();
474        self.command_tx
475            .try_send(RequestResponseCommand::SendRequestWithFallback {
476                peer,
477                request_id,
478                fallback,
479                request,
480                dial_options,
481            })
482            .map(|_| request_id)
483            .map_err(|_| Error::ChannelClogged)
484    }
485
486    /// Send response to remote peer.
487    pub fn send_response(&mut self, request_id: RequestId, response: Vec<u8>) {
488        match self.pending_responses.remove(&request_id) {
489            None => {
490                tracing::debug!(target: LOG_TARGET, ?request_id, "pending response doens't exist");
491            }
492            Some(response_tx) => {
493                tracing::trace!(target: LOG_TARGET, ?request_id, "send response to peer");
494
495                if let Err(_) = response_tx.send((response, None)) {
496                    tracing::debug!(target: LOG_TARGET, ?request_id, "substream closed");
497                }
498            }
499        }
500    }
501
502    /// Send response to remote peer with feedback.
503    ///
504    /// The feedback system is inherited from Polkadot SDK's `sc-network` and it's used to notify
505    /// the sender of the response whether it was sent successfully or not. Once the response has
506    /// been sent over the substream successfully, `()` will be sent over the feedback channel
507    /// to the sender to notify them about it. If the substream has been closed or the substream
508    /// failed while sending the response, the feedback channel will be dropped, notifying the
509    /// sender that sending the response failed.
510    pub fn send_response_with_feedback(
511        &mut self,
512        request_id: RequestId,
513        response: Vec<u8>,
514        feedback: channel::oneshot::Sender<()>,
515    ) {
516        match self.pending_responses.remove(&request_id) {
517            None => {
518                tracing::debug!(target: LOG_TARGET, ?request_id, "pending response doens't exist");
519            }
520            Some(response_tx) => {
521                tracing::trace!(target: LOG_TARGET, ?request_id, "send response to peer");
522
523                if let Err(_) = response_tx.send((response, Some(feedback))) {
524                    tracing::debug!(target: LOG_TARGET, ?request_id, "substream closed");
525                }
526            }
527        }
528    }
529}
530
531impl futures::Stream for RequestResponseHandle {
532    type Item = RequestResponseEvent;
533
534    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
535        match futures::ready!(self.event_rx.poll_recv(cx)) {
536            None => Poll::Ready(None),
537            Some(event) => match event {
538                InnerRequestResponseEvent::RequestReceived {
539                    peer,
540                    fallback,
541                    request_id,
542                    request,
543                    response_tx,
544                } => {
545                    self.pending_responses.insert(request_id, response_tx);
546                    Poll::Ready(Some(RequestResponseEvent::RequestReceived {
547                        peer,
548                        fallback,
549                        request_id,
550                        request,
551                    }))
552                }
553                event => Poll::Ready(Some(event.into())),
554            },
555        }
556    }
557}