litep2p/protocol/request_response/handle.rs
1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22 error::{ImmediateDialError, SubstreamError},
23 multistream_select::ProtocolError,
24 types::{protocol::ProtocolName, RequestId},
25 Error, PeerId,
26};
27
28use futures::channel;
29use tokio::sync::{
30 mpsc::{Receiver, Sender},
31 oneshot,
32};
33
34use std::{
35 collections::HashMap,
36 io::ErrorKind,
37 pin::Pin,
38 sync::{
39 atomic::{AtomicUsize, Ordering},
40 Arc,
41 },
42 task::{Context, Poll},
43};
44
45/// Logging target for the file.
46const LOG_TARGET: &str = "litep2p::request-response::handle";
47
48/// Request-response error.
49#[derive(Debug, PartialEq)]
50pub enum RequestResponseError {
51 /// Request was rejected.
52 Rejected(RejectReason),
53
54 /// Request was canceled by the local node.
55 Canceled,
56
57 /// Request timed out.
58 Timeout,
59
60 /// The peer is not connected and the dialing option was [`DialOptions::Reject`].
61 NotConnected,
62
63 /// Too large payload.
64 TooLargePayload,
65
66 /// Protocol not supported.
67 UnsupportedProtocol,
68}
69
70/// The reason why a request was rejected.
71#[derive(Debug, PartialEq)]
72pub enum RejectReason {
73 /// Substream error.
74 SubstreamOpenError(SubstreamError),
75
76 /// The peer disconnected before the request was processed.
77 ConnectionClosed,
78
79 /// The substream was closed before the request was processed.
80 SubstreamClosed,
81
82 /// The dial failed.
83 ///
84 /// If the dial failure is immediate, the error is included.
85 ///
86 /// If the dialing process is happening in parallel on multiple
87 /// addresses (potentially with multiple protocols), the dialing
88 /// process is not considered immediate and the given errors are not
89 /// propagated for simplicity.
90 DialFailed(Option<ImmediateDialError>),
91}
92
93impl From<SubstreamError> for RejectReason {
94 fn from(error: SubstreamError) -> Self {
95 // Convert `ErrorKind::NotConnected` to `RejectReason::ConnectionClosed`.
96 match error {
97 SubstreamError::IoError(ErrorKind::NotConnected) => RejectReason::ConnectionClosed,
98 SubstreamError::YamuxError(crate::yamux::ConnectionError::Io(error), _)
99 if error.kind() == ErrorKind::NotConnected =>
100 RejectReason::ConnectionClosed,
101 SubstreamError::NegotiationError(crate::error::NegotiationError::IoError(
102 ErrorKind::NotConnected,
103 )) => RejectReason::ConnectionClosed,
104 SubstreamError::NegotiationError(
105 crate::error::NegotiationError::MultistreamSelectError(
106 crate::multistream_select::NegotiationError::ProtocolError(
107 ProtocolError::IoError(error),
108 ),
109 ),
110 ) if error.kind() == ErrorKind::NotConnected => RejectReason::ConnectionClosed,
111 error => RejectReason::SubstreamOpenError(error),
112 }
113 }
114}
115
116/// Request-response events.
117#[derive(Debug)]
118pub(super) enum InnerRequestResponseEvent {
119 /// Request received from remote
120 RequestReceived {
121 /// Peer Id.
122 peer: PeerId,
123
124 /// Fallback protocol, if the substream was negotiated using a fallback.
125 fallback: Option<ProtocolName>,
126
127 /// Request ID.
128 request_id: RequestId,
129
130 /// Received request.
131 request: Vec<u8>,
132
133 /// `oneshot::Sender` for response.
134 response_tx: oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>,
135 },
136
137 /// Response received.
138 ResponseReceived {
139 /// Peer Id.
140 peer: PeerId,
141
142 /// Fallback protocol, if the substream was negotiated using a fallback.
143 fallback: Option<ProtocolName>,
144
145 /// Request ID.
146 request_id: RequestId,
147
148 /// Received request.
149 response: Vec<u8>,
150 },
151
152 /// Request failed.
153 RequestFailed {
154 /// Peer Id.
155 peer: PeerId,
156
157 /// Request ID.
158 request_id: RequestId,
159
160 /// Request-response error.
161 error: RequestResponseError,
162 },
163}
164
165impl From<InnerRequestResponseEvent> for RequestResponseEvent {
166 fn from(event: InnerRequestResponseEvent) -> Self {
167 match event {
168 InnerRequestResponseEvent::ResponseReceived {
169 peer,
170 request_id,
171 response,
172 fallback,
173 } => RequestResponseEvent::ResponseReceived {
174 peer,
175 request_id,
176 response,
177 fallback,
178 },
179 InnerRequestResponseEvent::RequestFailed {
180 peer,
181 request_id,
182 error,
183 } => RequestResponseEvent::RequestFailed {
184 peer,
185 request_id,
186 error,
187 },
188 _ => panic!("unhandled event"),
189 }
190 }
191}
192
193/// Request-response events.
194#[derive(Debug, PartialEq)]
195pub enum RequestResponseEvent {
196 /// Request received from remote
197 RequestReceived {
198 /// Peer Id.
199 peer: PeerId,
200
201 /// Fallback protocol, if the substream was negotiated using a fallback.
202 fallback: Option<ProtocolName>,
203
204 /// Request ID.
205 ///
206 /// While `request_id` is guaranteed to be unique for this protocols, the request IDs are
207 /// not unique across different request-response protocols, meaning two different
208 /// request-response protocols can both assign `RequestId(123)` for any given request.
209 request_id: RequestId,
210
211 /// Received request.
212 request: Vec<u8>,
213 },
214
215 /// Response received.
216 ResponseReceived {
217 /// Peer Id.
218 peer: PeerId,
219
220 /// Request ID.
221 request_id: RequestId,
222
223 /// Fallback protocol, if the substream was negotiated using a fallback.
224 fallback: Option<ProtocolName>,
225
226 /// Received request.
227 response: Vec<u8>,
228 },
229
230 /// Request failed.
231 RequestFailed {
232 /// Peer Id.
233 peer: PeerId,
234
235 /// Request ID.
236 request_id: RequestId,
237
238 /// Request-response error.
239 error: RequestResponseError,
240 },
241}
242
243/// Dial behavior when sending requests.
244#[derive(Debug)]
245#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
246pub enum DialOptions {
247 /// If the peer is not currently connected, attempt to dial them before sending a request.
248 ///
249 /// If the dial succeeds, the request is sent to the peer once the peer has been registered
250 /// to the protocol.
251 ///
252 /// If the dial fails, [`RequestResponseError::Rejected`] is returned.
253 Dial,
254
255 /// If the peer is not connected, immediately reject the request and return
256 /// [`RequestResponseError::NotConnected`].
257 Reject,
258}
259
260/// Request-response commands.
261#[derive(Debug)]
262#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
263pub enum RequestResponseCommand {
264 /// Send request to remote peer.
265 SendRequest {
266 /// Peer ID.
267 peer: PeerId,
268
269 /// Request ID.
270 ///
271 /// When a response is received or the request fails, the event contains this ID that
272 /// the user protocol can associate with the correct request.
273 ///
274 /// If the user protocol only has one active request per peer, this ID can be safely
275 /// discarded.
276 request_id: RequestId,
277
278 /// Request.
279 request: Vec<u8>,
280
281 /// Dial options, see [`DialOptions`] for more details.
282 dial_options: DialOptions,
283 },
284
285 SendRequestWithFallback {
286 /// Peer ID.
287 peer: PeerId,
288
289 /// Request ID.
290 request_id: RequestId,
291
292 /// Request that is sent over the main protocol, if negotiated.
293 request: Vec<u8>,
294
295 /// Request that is sent over the fallback protocol, if negotiated.
296 fallback: (ProtocolName, Vec<u8>),
297
298 /// Dial options, see [`DialOptions`] for more details.
299 dial_options: DialOptions,
300 },
301
302 /// Cancel outbound request.
303 CancelRequest {
304 /// Request ID.
305 request_id: RequestId,
306 },
307}
308
309/// Handle given to the user protocol which allows it to interact with the request-response
310/// protocol.
311pub struct RequestResponseHandle {
312 /// TX channel for sending commands to the request-response protocol.
313 event_rx: Receiver<InnerRequestResponseEvent>,
314
315 /// RX channel for receiving events from the request-response protocol.
316 command_tx: Sender<RequestResponseCommand>,
317
318 /// Pending responses.
319 pending_responses:
320 HashMap<RequestId, oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>>,
321
322 /// Next ephemeral request ID.
323 next_request_id: Arc<AtomicUsize>,
324}
325
326impl RequestResponseHandle {
327 /// Create new [`RequestResponseHandle`].
328 pub(super) fn new(
329 event_rx: Receiver<InnerRequestResponseEvent>,
330 command_tx: Sender<RequestResponseCommand>,
331 next_request_id: Arc<AtomicUsize>,
332 ) -> Self {
333 Self {
334 event_rx,
335 command_tx,
336 next_request_id,
337 pending_responses: HashMap::new(),
338 }
339 }
340
341 #[cfg(feature = "fuzz")]
342 /// Expose functionality for fuzzing
343 pub async fn fuzz_send_message(
344 &mut self,
345 command: RequestResponseCommand,
346 ) -> crate::Result<RequestId> {
347 let request_id = self.next_request_id();
348 self.command_tx.send(command).await.map(|_| request_id).map_err(From::from)
349 }
350
351 /// Reject an inbound request.
352 ///
353 /// Reject request received from a remote peer. The substream is dropped which signals
354 /// to the remote peer that request was rejected.
355 pub fn reject_request(&mut self, request_id: RequestId) {
356 match self.pending_responses.remove(&request_id) {
357 None => {
358 tracing::debug!(target: LOG_TARGET, ?request_id, "rejected request doesn't exist")
359 }
360 Some(sender) => {
361 tracing::debug!(target: LOG_TARGET, ?request_id, "reject request");
362 drop(sender);
363 }
364 }
365 }
366
367 /// Cancel an outbound request.
368 ///
369 /// Allows canceling an in-flight request if the local node is not interested in the answer
370 /// anymore. If the request was canceled, no event is reported to the user as the cancelation
371 /// always succeeds and it's assumed that the user does the necessary state clean up in their
372 /// end after calling [`RequestResponseHandle::cancel_request()`].
373 pub async fn cancel_request(&mut self, request_id: RequestId) {
374 tracing::trace!(target: LOG_TARGET, ?request_id, "cancel request");
375
376 let _ = self.command_tx.send(RequestResponseCommand::CancelRequest { request_id }).await;
377 }
378
379 /// Get next request ID.
380 fn next_request_id(&self) -> RequestId {
381 let request_id = self.next_request_id.fetch_add(1usize, Ordering::Relaxed);
382 RequestId::from(request_id)
383 }
384
385 /// Send request to remote peer.
386 ///
387 /// While the returned `RequestId` is guaranteed to be unique for this request-response
388 /// protocol, it's not unique across all installed request-response protocols. That is,
389 /// multiple request-response protocols can return the same `RequestId` and this must be
390 /// handled by the calling code correctly if the `RequestId`s are stored somewhere.
391 pub async fn send_request(
392 &mut self,
393 peer: PeerId,
394 request: Vec<u8>,
395 dial_options: DialOptions,
396 ) -> crate::Result<RequestId> {
397 tracing::trace!(target: LOG_TARGET, ?peer, "send request to peer");
398
399 let request_id = self.next_request_id();
400 self.command_tx
401 .send(RequestResponseCommand::SendRequest {
402 peer,
403 request_id,
404 request,
405 dial_options,
406 })
407 .await
408 .map(|_| request_id)
409 .map_err(From::from)
410 }
411
412 /// Attempt to send request to peer and if the channel is clogged, return
413 /// `Error::ChannelClogged`.
414 ///
415 /// While the returned `RequestId` is guaranteed to be unique for this request-response
416 /// protocol, it's not unique across all installed request-response protocols. That is,
417 /// multiple request-response protocols can return the same `RequestId` and this must be
418 /// handled by the calling code correctly if the `RequestId`s are stored somewhere.
419 pub fn try_send_request(
420 &mut self,
421 peer: PeerId,
422 request: Vec<u8>,
423 dial_options: DialOptions,
424 ) -> crate::Result<RequestId> {
425 tracing::trace!(target: LOG_TARGET, ?peer, "send request to peer");
426
427 let request_id = self.next_request_id();
428 self.command_tx
429 .try_send(RequestResponseCommand::SendRequest {
430 peer,
431 request_id,
432 request,
433 dial_options,
434 })
435 .map(|_| request_id)
436 .map_err(|_| Error::ChannelClogged)
437 }
438
439 /// Send request to remote peer with fallback.
440 pub async fn send_request_with_fallback(
441 &mut self,
442 peer: PeerId,
443 request: Vec<u8>,
444 fallback: (ProtocolName, Vec<u8>),
445 dial_options: DialOptions,
446 ) -> crate::Result<RequestId> {
447 tracing::trace!(
448 target: LOG_TARGET,
449 ?peer,
450 fallback = %fallback.0,
451 ?dial_options,
452 "send request with fallback to peer",
453 );
454
455 let request_id = self.next_request_id();
456 self.command_tx
457 .send(RequestResponseCommand::SendRequestWithFallback {
458 peer,
459 request_id,
460 fallback,
461 request,
462 dial_options,
463 })
464 .await
465 .map(|_| request_id)
466 .map_err(From::from)
467 }
468
469 /// Attempt to send request to peer with fallback and if the channel is clogged,
470 /// return `Error::ChannelClogged`.
471 pub fn try_send_request_with_fallback(
472 &mut self,
473 peer: PeerId,
474 request: Vec<u8>,
475 fallback: (ProtocolName, Vec<u8>),
476 dial_options: DialOptions,
477 ) -> crate::Result<RequestId> {
478 tracing::trace!(
479 target: LOG_TARGET,
480 ?peer,
481 fallback = %fallback.0,
482 ?dial_options,
483 "send request with fallback to peer",
484 );
485
486 let request_id = self.next_request_id();
487 self.command_tx
488 .try_send(RequestResponseCommand::SendRequestWithFallback {
489 peer,
490 request_id,
491 fallback,
492 request,
493 dial_options,
494 })
495 .map(|_| request_id)
496 .map_err(|_| Error::ChannelClogged)
497 }
498
499 /// Send response to remote peer.
500 pub fn send_response(&mut self, request_id: RequestId, response: Vec<u8>) {
501 match self.pending_responses.remove(&request_id) {
502 None => {
503 tracing::debug!(target: LOG_TARGET, ?request_id, "pending response doens't exist");
504 }
505 Some(response_tx) => {
506 tracing::trace!(target: LOG_TARGET, ?request_id, "send response to peer");
507
508 if let Err(_) = response_tx.send((response, None)) {
509 tracing::debug!(target: LOG_TARGET, ?request_id, "substream closed");
510 }
511 }
512 }
513 }
514
515 /// Send response to remote peer with feedback.
516 ///
517 /// The feedback system is inherited from Polkadot SDK's `sc-network` and it's used to notify
518 /// the sender of the response whether it was sent successfully or not. Once the response has
519 /// been sent over the substream successfully, `()` will be sent over the feedback channel
520 /// to the sender to notify them about it. If the substream has been closed or the substream
521 /// failed while sending the response, the feedback channel will be dropped, notifying the
522 /// sender that sending the response failed.
523 pub fn send_response_with_feedback(
524 &mut self,
525 request_id: RequestId,
526 response: Vec<u8>,
527 feedback: channel::oneshot::Sender<()>,
528 ) {
529 match self.pending_responses.remove(&request_id) {
530 None => {
531 tracing::debug!(target: LOG_TARGET, ?request_id, "pending response doens't exist");
532 }
533 Some(response_tx) => {
534 tracing::trace!(target: LOG_TARGET, ?request_id, "send response to peer");
535
536 if let Err(_) = response_tx.send((response, Some(feedback))) {
537 tracing::debug!(target: LOG_TARGET, ?request_id, "substream closed");
538 }
539 }
540 }
541 }
542}
543
544impl futures::Stream for RequestResponseHandle {
545 type Item = RequestResponseEvent;
546
547 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
548 match futures::ready!(self.event_rx.poll_recv(cx)) {
549 None => Poll::Ready(None),
550 Some(event) => match event {
551 InnerRequestResponseEvent::RequestReceived {
552 peer,
553 fallback,
554 request_id,
555 request,
556 response_tx,
557 } => {
558 self.pending_responses.insert(request_id, response_tx);
559 Poll::Ready(Some(RequestResponseEvent::RequestReceived {
560 peer,
561 fallback,
562 request_id,
563 request,
564 }))
565 }
566 event => Poll::Ready(Some(event.into())),
567 },
568 }
569 }
570}