litep2p/protocol/request_response/
handle.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    error::{ImmediateDialError, SubstreamError},
23    multistream_select::ProtocolError,
24    types::{protocol::ProtocolName, RequestId},
25    Error, PeerId,
26};
27
28use futures::channel;
29use tokio::sync::{
30    mpsc::{Receiver, Sender},
31    oneshot,
32};
33
34use std::{
35    collections::HashMap,
36    io::ErrorKind,
37    pin::Pin,
38    sync::{
39        atomic::{AtomicUsize, Ordering},
40        Arc,
41    },
42    task::{Context, Poll},
43};
44
45/// Logging target for the file.
46const LOG_TARGET: &str = "litep2p::request-response::handle";
47
48/// Request-response error.
49#[derive(Debug, PartialEq)]
50pub enum RequestResponseError {
51    /// Request was rejected.
52    Rejected(RejectReason),
53
54    /// Request was canceled by the local node.
55    Canceled,
56
57    /// Request timed out.
58    Timeout,
59
60    /// The peer is not connected and the dialing option was [`DialOptions::Reject`].
61    NotConnected,
62
63    /// Too large payload.
64    TooLargePayload,
65
66    /// Protocol not supported.
67    UnsupportedProtocol,
68}
69
70/// The reason why a request was rejected.
71#[derive(Debug, PartialEq)]
72pub enum RejectReason {
73    /// Substream error.
74    SubstreamOpenError(SubstreamError),
75
76    /// The peer disconnected before the request was processed.
77    ConnectionClosed,
78
79    /// The substream was closed before the request was processed.
80    SubstreamClosed,
81
82    /// The dial failed.
83    ///
84    /// If the dial failure is immediate, the error is included.
85    ///
86    /// If the dialing process is happening in parallel on multiple
87    /// addresses (potentially with multiple protocols), the dialing
88    /// process is not considered immediate and the given errors are not
89    /// propagated for simplicity.
90    DialFailed(Option<ImmediateDialError>),
91}
92
93impl From<SubstreamError> for RejectReason {
94    fn from(error: SubstreamError) -> Self {
95        // Convert `ErrorKind::NotConnected` to `RejectReason::ConnectionClosed`.
96        match error {
97            SubstreamError::IoError(ErrorKind::NotConnected) => RejectReason::ConnectionClosed,
98            SubstreamError::YamuxError(crate::yamux::ConnectionError::Io(error), _)
99                if error.kind() == ErrorKind::NotConnected =>
100                RejectReason::ConnectionClosed,
101            SubstreamError::NegotiationError(crate::error::NegotiationError::IoError(
102                ErrorKind::NotConnected,
103            )) => RejectReason::ConnectionClosed,
104            SubstreamError::NegotiationError(
105                crate::error::NegotiationError::MultistreamSelectError(
106                    crate::multistream_select::NegotiationError::ProtocolError(
107                        ProtocolError::IoError(error),
108                    ),
109                ),
110            ) if error.kind() == ErrorKind::NotConnected => RejectReason::ConnectionClosed,
111            error => RejectReason::SubstreamOpenError(error),
112        }
113    }
114}
115
116/// Request-response events.
117#[derive(Debug)]
118pub(super) enum InnerRequestResponseEvent {
119    /// Request received from remote
120    RequestReceived {
121        /// Peer Id.
122        peer: PeerId,
123
124        /// Fallback protocol, if the substream was negotiated using a fallback.
125        fallback: Option<ProtocolName>,
126
127        /// Request ID.
128        request_id: RequestId,
129
130        /// Received request.
131        request: Vec<u8>,
132
133        /// `oneshot::Sender` for response.
134        response_tx: oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>,
135    },
136
137    /// Response received.
138    ResponseReceived {
139        /// Peer Id.
140        peer: PeerId,
141
142        /// Fallback protocol, if the substream was negotiated using a fallback.
143        fallback: Option<ProtocolName>,
144
145        /// Request ID.
146        request_id: RequestId,
147
148        /// Received request.
149        response: Vec<u8>,
150    },
151
152    /// Request failed.
153    RequestFailed {
154        /// Peer Id.
155        peer: PeerId,
156
157        /// Request ID.
158        request_id: RequestId,
159
160        /// Request-response error.
161        error: RequestResponseError,
162    },
163}
164
165impl From<InnerRequestResponseEvent> for RequestResponseEvent {
166    fn from(event: InnerRequestResponseEvent) -> Self {
167        match event {
168            InnerRequestResponseEvent::ResponseReceived {
169                peer,
170                request_id,
171                response,
172                fallback,
173            } => RequestResponseEvent::ResponseReceived {
174                peer,
175                request_id,
176                response,
177                fallback,
178            },
179            InnerRequestResponseEvent::RequestFailed {
180                peer,
181                request_id,
182                error,
183            } => RequestResponseEvent::RequestFailed {
184                peer,
185                request_id,
186                error,
187            },
188            _ => panic!("unhandled event"),
189        }
190    }
191}
192
193/// Request-response events.
194#[derive(Debug, PartialEq)]
195pub enum RequestResponseEvent {
196    /// Request received from remote
197    RequestReceived {
198        /// Peer Id.
199        peer: PeerId,
200
201        /// Fallback protocol, if the substream was negotiated using a fallback.
202        fallback: Option<ProtocolName>,
203
204        /// Request ID.
205        ///
206        /// While `request_id` is guaranteed to be unique for this protocols, the request IDs are
207        /// not unique across different request-response protocols, meaning two different
208        /// request-response protocols can both assign `RequestId(123)` for any given request.
209        request_id: RequestId,
210
211        /// Received request.
212        request: Vec<u8>,
213    },
214
215    /// Response received.
216    ResponseReceived {
217        /// Peer Id.
218        peer: PeerId,
219
220        /// Request ID.
221        request_id: RequestId,
222
223        /// Fallback protocol, if the substream was negotiated using a fallback.
224        fallback: Option<ProtocolName>,
225
226        /// Received request.
227        response: Vec<u8>,
228    },
229
230    /// Request failed.
231    RequestFailed {
232        /// Peer Id.
233        peer: PeerId,
234
235        /// Request ID.
236        request_id: RequestId,
237
238        /// Request-response error.
239        error: RequestResponseError,
240    },
241}
242
243/// Dial behavior when sending requests.
244#[derive(Debug)]
245#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
246pub enum DialOptions {
247    /// If the peer is not currently connected, attempt to dial them before sending a request.
248    ///
249    /// If the dial succeeds, the request is sent to the peer once the peer has been registered
250    /// to the protocol.
251    ///
252    /// If the dial fails, [`RequestResponseError::Rejected`] is returned.
253    Dial,
254
255    /// If the peer is not connected, immediately reject the request and return
256    /// [`RequestResponseError::NotConnected`].
257    Reject,
258}
259
260/// Request-response commands.
261#[derive(Debug)]
262#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
263pub enum RequestResponseCommand {
264    /// Send request to remote peer.
265    SendRequest {
266        /// Peer ID.
267        peer: PeerId,
268
269        /// Request ID.
270        ///
271        /// When a response is received or the request fails, the event contains this ID that
272        /// the user protocol can associate with the correct request.
273        ///
274        /// If the user protocol only has one active request per peer, this ID can be safely
275        /// discarded.
276        request_id: RequestId,
277
278        /// Request.
279        request: Vec<u8>,
280
281        /// Dial options, see [`DialOptions`] for more details.
282        dial_options: DialOptions,
283    },
284
285    SendRequestWithFallback {
286        /// Peer ID.
287        peer: PeerId,
288
289        /// Request ID.
290        request_id: RequestId,
291
292        /// Request that is sent over the main protocol, if negotiated.
293        request: Vec<u8>,
294
295        /// Request that is sent over the fallback protocol, if negotiated.
296        fallback: (ProtocolName, Vec<u8>),
297
298        /// Dial options, see [`DialOptions`] for more details.
299        dial_options: DialOptions,
300    },
301
302    /// Cancel outbound request.
303    CancelRequest {
304        /// Request ID.
305        request_id: RequestId,
306    },
307}
308
309/// Handle given to the user protocol which allows it to interact with the request-response
310/// protocol.
311pub struct RequestResponseHandle {
312    /// TX channel for sending commands to the request-response protocol.
313    event_rx: Receiver<InnerRequestResponseEvent>,
314
315    /// RX channel for receiving events from the request-response protocol.
316    command_tx: Sender<RequestResponseCommand>,
317
318    /// Pending responses.
319    pending_responses:
320        HashMap<RequestId, oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>>,
321
322    /// Next ephemeral request ID.
323    next_request_id: Arc<AtomicUsize>,
324}
325
326impl RequestResponseHandle {
327    /// Create new [`RequestResponseHandle`].
328    pub(super) fn new(
329        event_rx: Receiver<InnerRequestResponseEvent>,
330        command_tx: Sender<RequestResponseCommand>,
331        next_request_id: Arc<AtomicUsize>,
332    ) -> Self {
333        Self {
334            event_rx,
335            command_tx,
336            next_request_id,
337            pending_responses: HashMap::new(),
338        }
339    }
340
341    #[cfg(feature = "fuzz")]
342    /// Expose functionality for fuzzing
343    pub async fn fuzz_send_message(
344        &mut self,
345        command: RequestResponseCommand,
346    ) -> crate::Result<RequestId> {
347        let request_id = self.next_request_id();
348        self.command_tx.send(command).await.map(|_| request_id).map_err(From::from)
349    }
350
351    /// Reject an inbound request.
352    ///
353    /// Reject request received from a remote peer. The substream is dropped which signals
354    /// to the remote peer that request was rejected.
355    pub fn reject_request(&mut self, request_id: RequestId) {
356        match self.pending_responses.remove(&request_id) {
357            None => {
358                tracing::debug!(target: LOG_TARGET, ?request_id, "rejected request doesn't exist")
359            }
360            Some(sender) => {
361                tracing::debug!(target: LOG_TARGET, ?request_id, "reject request");
362                drop(sender);
363            }
364        }
365    }
366
367    /// Cancel an outbound request.
368    ///
369    /// Allows canceling an in-flight request if the local node is not interested in the answer
370    /// anymore. If the request was canceled, no event is reported to the user as the cancelation
371    /// always succeeds and it's assumed that the user does the necessary state clean up in their
372    /// end after calling [`RequestResponseHandle::cancel_request()`].
373    pub async fn cancel_request(&mut self, request_id: RequestId) {
374        tracing::trace!(target: LOG_TARGET, ?request_id, "cancel request");
375
376        let _ = self.command_tx.send(RequestResponseCommand::CancelRequest { request_id }).await;
377    }
378
379    /// Get next request ID.
380    fn next_request_id(&self) -> RequestId {
381        let request_id = self.next_request_id.fetch_add(1usize, Ordering::Relaxed);
382        RequestId::from(request_id)
383    }
384
385    /// Send request to remote peer.
386    ///
387    /// While the returned `RequestId` is guaranteed to be unique for this request-response
388    /// protocol, it's not unique across all installed request-response protocols. That is,
389    /// multiple request-response protocols can return the same `RequestId` and this must be
390    /// handled by the calling code correctly if the `RequestId`s are stored somewhere.
391    pub async fn send_request(
392        &mut self,
393        peer: PeerId,
394        request: Vec<u8>,
395        dial_options: DialOptions,
396    ) -> crate::Result<RequestId> {
397        tracing::trace!(target: LOG_TARGET, ?peer, "send request to peer");
398
399        let request_id = self.next_request_id();
400        self.command_tx
401            .send(RequestResponseCommand::SendRequest {
402                peer,
403                request_id,
404                request,
405                dial_options,
406            })
407            .await
408            .map(|_| request_id)
409            .map_err(From::from)
410    }
411
412    /// Attempt to send request to peer and if the channel is clogged, return
413    /// `Error::ChannelClogged`.
414    ///
415    /// While the returned `RequestId` is guaranteed to be unique for this request-response
416    /// protocol, it's not unique across all installed request-response protocols. That is,
417    /// multiple request-response protocols can return the same `RequestId` and this must be
418    /// handled by the calling code correctly if the `RequestId`s are stored somewhere.
419    pub fn try_send_request(
420        &mut self,
421        peer: PeerId,
422        request: Vec<u8>,
423        dial_options: DialOptions,
424    ) -> crate::Result<RequestId> {
425        tracing::trace!(target: LOG_TARGET, ?peer, "send request to peer");
426
427        let request_id = self.next_request_id();
428        self.command_tx
429            .try_send(RequestResponseCommand::SendRequest {
430                peer,
431                request_id,
432                request,
433                dial_options,
434            })
435            .map(|_| request_id)
436            .map_err(|_| Error::ChannelClogged)
437    }
438
439    /// Send request to remote peer with fallback.
440    pub async fn send_request_with_fallback(
441        &mut self,
442        peer: PeerId,
443        request: Vec<u8>,
444        fallback: (ProtocolName, Vec<u8>),
445        dial_options: DialOptions,
446    ) -> crate::Result<RequestId> {
447        tracing::trace!(
448            target: LOG_TARGET,
449            ?peer,
450            fallback = %fallback.0,
451            ?dial_options,
452            "send request with fallback to peer",
453        );
454
455        let request_id = self.next_request_id();
456        self.command_tx
457            .send(RequestResponseCommand::SendRequestWithFallback {
458                peer,
459                request_id,
460                fallback,
461                request,
462                dial_options,
463            })
464            .await
465            .map(|_| request_id)
466            .map_err(From::from)
467    }
468
469    /// Attempt to send request to peer with fallback and if the channel is clogged,
470    /// return `Error::ChannelClogged`.
471    pub fn try_send_request_with_fallback(
472        &mut self,
473        peer: PeerId,
474        request: Vec<u8>,
475        fallback: (ProtocolName, Vec<u8>),
476        dial_options: DialOptions,
477    ) -> crate::Result<RequestId> {
478        tracing::trace!(
479            target: LOG_TARGET,
480            ?peer,
481            fallback = %fallback.0,
482            ?dial_options,
483            "send request with fallback to peer",
484        );
485
486        let request_id = self.next_request_id();
487        self.command_tx
488            .try_send(RequestResponseCommand::SendRequestWithFallback {
489                peer,
490                request_id,
491                fallback,
492                request,
493                dial_options,
494            })
495            .map(|_| request_id)
496            .map_err(|_| Error::ChannelClogged)
497    }
498
499    /// Send response to remote peer.
500    pub fn send_response(&mut self, request_id: RequestId, response: Vec<u8>) {
501        match self.pending_responses.remove(&request_id) {
502            None => {
503                tracing::debug!(target: LOG_TARGET, ?request_id, "pending response doens't exist");
504            }
505            Some(response_tx) => {
506                tracing::trace!(target: LOG_TARGET, ?request_id, "send response to peer");
507
508                if let Err(_) = response_tx.send((response, None)) {
509                    tracing::debug!(target: LOG_TARGET, ?request_id, "substream closed");
510                }
511            }
512        }
513    }
514
515    /// Send response to remote peer with feedback.
516    ///
517    /// The feedback system is inherited from Polkadot SDK's `sc-network` and it's used to notify
518    /// the sender of the response whether it was sent successfully or not. Once the response has
519    /// been sent over the substream successfully, `()` will be sent over the feedback channel
520    /// to the sender to notify them about it. If the substream has been closed or the substream
521    /// failed while sending the response, the feedback channel will be dropped, notifying the
522    /// sender that sending the response failed.
523    pub fn send_response_with_feedback(
524        &mut self,
525        request_id: RequestId,
526        response: Vec<u8>,
527        feedback: channel::oneshot::Sender<()>,
528    ) {
529        match self.pending_responses.remove(&request_id) {
530            None => {
531                tracing::debug!(target: LOG_TARGET, ?request_id, "pending response doens't exist");
532            }
533            Some(response_tx) => {
534                tracing::trace!(target: LOG_TARGET, ?request_id, "send response to peer");
535
536                if let Err(_) = response_tx.send((response, Some(feedback))) {
537                    tracing::debug!(target: LOG_TARGET, ?request_id, "substream closed");
538                }
539            }
540        }
541    }
542}
543
544impl futures::Stream for RequestResponseHandle {
545    type Item = RequestResponseEvent;
546
547    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
548        match futures::ready!(self.event_rx.poll_recv(cx)) {
549            None => Poll::Ready(None),
550            Some(event) => match event {
551                InnerRequestResponseEvent::RequestReceived {
552                    peer,
553                    fallback,
554                    request_id,
555                    request,
556                    response_tx,
557                } => {
558                    self.pending_responses.insert(request_id, response_tx);
559                    Poll::Ready(Some(RequestResponseEvent::RequestReceived {
560                        peer,
561                        fallback,
562                        request_id,
563                        request,
564                    }))
565                }
566                event => Poll::Ready(Some(event.into())),
567            },
568        }
569    }
570}