litep2p/protocol/request_response/handle.rs
1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22 error::{ImmediateDialError, SubstreamError},
23 multistream_select::ProtocolError,
24 types::{protocol::ProtocolName, RequestId},
25 Error, PeerId,
26};
27
28use futures::channel;
29use tokio::sync::{
30 mpsc::{Receiver, Sender},
31 oneshot,
32};
33
34use std::{
35 collections::HashMap,
36 io::ErrorKind,
37 pin::Pin,
38 sync::{
39 atomic::{AtomicUsize, Ordering},
40 Arc,
41 },
42 task::{Context, Poll},
43};
44
45/// Logging target for the file.
46const LOG_TARGET: &str = "litep2p::request-response::handle";
47
48/// Request-response error.
49#[derive(Debug, PartialEq)]
50pub enum RequestResponseError {
51 /// Request was rejected.
52 Rejected(RejectReason),
53
54 /// Request was canceled by the local node.
55 Canceled,
56
57 /// Request timed out.
58 Timeout,
59
60 /// The peer is not connected and the dialing option was [`DialOptions::Reject`].
61 NotConnected,
62
63 /// Too large payload.
64 TooLargePayload,
65
66 /// Protocol not supported.
67 UnsupportedProtocol,
68}
69
70/// The reason why a request was rejected.
71#[derive(Debug, PartialEq)]
72pub enum RejectReason {
73 /// Substream error.
74 SubstreamOpenError(SubstreamError),
75
76 /// The peer disconnected before the request was processed.
77 ConnectionClosed,
78
79 /// The substream was closed before the request was processed.
80 SubstreamClosed,
81
82 /// The dial failed.
83 ///
84 /// If the dial failure is immediate, the error is included.
85 ///
86 /// If the dialing process is happening in parallel on multiple
87 /// addresses (potentially with multiple protocols), the dialing
88 /// process is not considered immediate and the given errors are not
89 /// propagated for simplicity.
90 DialFailed(Option<ImmediateDialError>),
91}
92
93impl From<SubstreamError> for RejectReason {
94 fn from(error: SubstreamError) -> Self {
95 // Convert `ErrorKind::NotConnected` to `RejectReason::ConnectionClosed`.
96 match error {
97 SubstreamError::IoError(error) if error == ErrorKind::NotConnected =>
98 RejectReason::ConnectionClosed,
99 SubstreamError::YamuxError(crate::yamux::ConnectionError::Io(error), _)
100 if error.kind() == ErrorKind::NotConnected =>
101 RejectReason::ConnectionClosed,
102 SubstreamError::NegotiationError(crate::error::NegotiationError::IoError(error))
103 if error == ErrorKind::NotConnected =>
104 RejectReason::ConnectionClosed,
105 SubstreamError::NegotiationError(
106 crate::error::NegotiationError::MultistreamSelectError(
107 crate::multistream_select::NegotiationError::ProtocolError(
108 ProtocolError::IoError(error),
109 ),
110 ),
111 ) if error.kind() == ErrorKind::NotConnected => RejectReason::ConnectionClosed,
112 error => RejectReason::SubstreamOpenError(error),
113 }
114 }
115}
116
117/// Request-response events.
118pub(super) enum InnerRequestResponseEvent {
119 /// Request received from remote
120 RequestReceived {
121 /// Peer Id.
122 peer: PeerId,
123
124 /// Fallback protocol, if the substream was negotiated using a fallback.
125 fallback: Option<ProtocolName>,
126
127 /// Request ID.
128 request_id: RequestId,
129
130 /// Received request.
131 request: Vec<u8>,
132
133 /// `oneshot::Sender` for response.
134 response_tx: oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>,
135 },
136
137 /// Response received.
138 ResponseReceived {
139 /// Peer Id.
140 peer: PeerId,
141
142 /// Fallback protocol, if the substream was negotiated using a fallback.
143 fallback: Option<ProtocolName>,
144
145 /// Request ID.
146 request_id: RequestId,
147
148 /// Received request.
149 response: Vec<u8>,
150 },
151
152 /// Request failed.
153 RequestFailed {
154 /// Peer Id.
155 peer: PeerId,
156
157 /// Request ID.
158 request_id: RequestId,
159
160 /// Request-response error.
161 error: RequestResponseError,
162 },
163}
164
165impl From<InnerRequestResponseEvent> for RequestResponseEvent {
166 fn from(event: InnerRequestResponseEvent) -> Self {
167 match event {
168 InnerRequestResponseEvent::ResponseReceived {
169 peer,
170 request_id,
171 response,
172 fallback,
173 } => RequestResponseEvent::ResponseReceived {
174 peer,
175 request_id,
176 response,
177 fallback,
178 },
179 InnerRequestResponseEvent::RequestFailed {
180 peer,
181 request_id,
182 error,
183 } => RequestResponseEvent::RequestFailed {
184 peer,
185 request_id,
186 error,
187 },
188 _ => panic!("unhandled event"),
189 }
190 }
191}
192
193/// Request-response events.
194#[derive(Debug, PartialEq)]
195pub enum RequestResponseEvent {
196 /// Request received from remote
197 RequestReceived {
198 /// Peer Id.
199 peer: PeerId,
200
201 /// Fallback protocol, if the substream was negotiated using a fallback.
202 fallback: Option<ProtocolName>,
203
204 /// Request ID.
205 ///
206 /// While `request_id` is guaranteed to be unique for this protocols, the request IDs are
207 /// not unique across different request-response protocols, meaning two different
208 /// request-response protocols can both assign `RequestId(123)` for any given request.
209 request_id: RequestId,
210
211 /// Received request.
212 request: Vec<u8>,
213 },
214
215 /// Response received.
216 ResponseReceived {
217 /// Peer Id.
218 peer: PeerId,
219
220 /// Request ID.
221 request_id: RequestId,
222
223 /// Fallback protocol, if the substream was negotiated using a fallback.
224 fallback: Option<ProtocolName>,
225
226 /// Received request.
227 response: Vec<u8>,
228 },
229
230 /// Request failed.
231 RequestFailed {
232 /// Peer Id.
233 peer: PeerId,
234
235 /// Request ID.
236 request_id: RequestId,
237
238 /// Request-response error.
239 error: RequestResponseError,
240 },
241}
242
243/// Dial behavior when sending requests.
244#[derive(Debug)]
245pub enum DialOptions {
246 /// If the peer is not currently connected, attempt to dial them before sending a request.
247 ///
248 /// If the dial succeeds, the request is sent to the peer once the peer has been registered
249 /// to the protocol.
250 ///
251 /// If the dial fails, [`RequestResponseError::Rejected`] is returned.
252 Dial,
253
254 /// If the peer is not connected, immediately reject the request and return
255 /// [`RequestResponseError::NotConnected`].
256 Reject,
257}
258
259/// Request-response commands.
260pub(crate) enum RequestResponseCommand {
261 /// Send request to remote peer.
262 SendRequest {
263 /// Peer ID.
264 peer: PeerId,
265
266 /// Request ID.
267 ///
268 /// When a response is received or the request fails, the event contains this ID that
269 /// the user protocol can associate with the correct request.
270 ///
271 /// If the user protocol only has one active request per peer, this ID can be safely
272 /// discarded.
273 request_id: RequestId,
274
275 /// Request.
276 request: Vec<u8>,
277
278 /// Dial options, see [`DialOptions`] for more details.
279 dial_options: DialOptions,
280 },
281
282 SendRequestWithFallback {
283 /// Peer ID.
284 peer: PeerId,
285
286 /// Request ID.
287 request_id: RequestId,
288
289 /// Request that is sent over the main protocol, if negotiated.
290 request: Vec<u8>,
291
292 /// Request that is sent over the fallback protocol, if negotiated.
293 fallback: (ProtocolName, Vec<u8>),
294
295 /// Dial options, see [`DialOptions`] for more details.
296 dial_options: DialOptions,
297 },
298
299 /// Cancel outbound request.
300 CancelRequest {
301 /// Request ID.
302 request_id: RequestId,
303 },
304}
305
306/// Handle given to the user protocol which allows it to interact with the request-response
307/// protocol.
308pub struct RequestResponseHandle {
309 /// TX channel for sending commands to the request-response protocol.
310 event_rx: Receiver<InnerRequestResponseEvent>,
311
312 /// RX channel for receiving events from the request-response protocol.
313 command_tx: Sender<RequestResponseCommand>,
314
315 /// Pending responses.
316 pending_responses:
317 HashMap<RequestId, oneshot::Sender<(Vec<u8>, Option<channel::oneshot::Sender<()>>)>>,
318
319 /// Next ephemeral request ID.
320 next_request_id: Arc<AtomicUsize>,
321}
322
323impl RequestResponseHandle {
324 /// Create new [`RequestResponseHandle`].
325 pub(super) fn new(
326 event_rx: Receiver<InnerRequestResponseEvent>,
327 command_tx: Sender<RequestResponseCommand>,
328 next_request_id: Arc<AtomicUsize>,
329 ) -> Self {
330 Self {
331 event_rx,
332 command_tx,
333 next_request_id,
334 pending_responses: HashMap::new(),
335 }
336 }
337
338 /// Reject an inbound request.
339 ///
340 /// Reject request received from a remote peer. The substream is dropped which signals
341 /// to the remote peer that request was rejected.
342 pub fn reject_request(&mut self, request_id: RequestId) {
343 match self.pending_responses.remove(&request_id) {
344 None => {
345 tracing::debug!(target: LOG_TARGET, ?request_id, "rejected request doesn't exist")
346 }
347 Some(sender) => {
348 tracing::debug!(target: LOG_TARGET, ?request_id, "reject request");
349 drop(sender);
350 }
351 }
352 }
353
354 /// Cancel an outbound request.
355 ///
356 /// Allows canceling an in-flight request if the local node is not interested in the answer
357 /// anymore. If the request was canceled, no event is reported to the user as the cancelation
358 /// always succeeds and it's assumed that the user does the necessary state clean up in their
359 /// end after calling [`RequestResponseHandle::cancel_request()`].
360 pub async fn cancel_request(&mut self, request_id: RequestId) {
361 tracing::trace!(target: LOG_TARGET, ?request_id, "cancel request");
362
363 let _ = self.command_tx.send(RequestResponseCommand::CancelRequest { request_id }).await;
364 }
365
366 /// Get next request ID.
367 fn next_request_id(&self) -> RequestId {
368 let request_id = self.next_request_id.fetch_add(1usize, Ordering::Relaxed);
369 RequestId::from(request_id)
370 }
371
372 /// Send request to remote peer.
373 ///
374 /// While the returned `RequestId` is guaranteed to be unique for this request-response
375 /// protocol, it's not unique across all installed request-response protocols. That is,
376 /// multiple request-response protocols can return the same `RequestId` and this must be
377 /// handled by the calling code correctly if the `RequestId`s are stored somewhere.
378 pub async fn send_request(
379 &mut self,
380 peer: PeerId,
381 request: Vec<u8>,
382 dial_options: DialOptions,
383 ) -> crate::Result<RequestId> {
384 tracing::trace!(target: LOG_TARGET, ?peer, "send request to peer");
385
386 let request_id = self.next_request_id();
387 self.command_tx
388 .send(RequestResponseCommand::SendRequest {
389 peer,
390 request_id,
391 request,
392 dial_options,
393 })
394 .await
395 .map(|_| request_id)
396 .map_err(From::from)
397 }
398
399 /// Attempt to send request to peer and if the channel is clogged, return
400 /// `Error::ChannelClogged`.
401 ///
402 /// While the returned `RequestId` is guaranteed to be unique for this request-response
403 /// protocol, it's not unique across all installed request-response protocols. That is,
404 /// multiple request-response protocols can return the same `RequestId` and this must be
405 /// handled by the calling code correctly if the `RequestId`s are stored somewhere.
406 pub fn try_send_request(
407 &mut self,
408 peer: PeerId,
409 request: Vec<u8>,
410 dial_options: DialOptions,
411 ) -> crate::Result<RequestId> {
412 tracing::trace!(target: LOG_TARGET, ?peer, "send request to peer");
413
414 let request_id = self.next_request_id();
415 self.command_tx
416 .try_send(RequestResponseCommand::SendRequest {
417 peer,
418 request_id,
419 request,
420 dial_options,
421 })
422 .map(|_| request_id)
423 .map_err(|_| Error::ChannelClogged)
424 }
425
426 /// Send request to remote peer with fallback.
427 pub async fn send_request_with_fallback(
428 &mut self,
429 peer: PeerId,
430 request: Vec<u8>,
431 fallback: (ProtocolName, Vec<u8>),
432 dial_options: DialOptions,
433 ) -> crate::Result<RequestId> {
434 tracing::trace!(
435 target: LOG_TARGET,
436 ?peer,
437 fallback = %fallback.0,
438 ?dial_options,
439 "send request with fallback to peer",
440 );
441
442 let request_id = self.next_request_id();
443 self.command_tx
444 .send(RequestResponseCommand::SendRequestWithFallback {
445 peer,
446 request_id,
447 fallback,
448 request,
449 dial_options,
450 })
451 .await
452 .map(|_| request_id)
453 .map_err(From::from)
454 }
455
456 /// Attempt to send request to peer with fallback and if the channel is clogged,
457 /// return `Error::ChannelClogged`.
458 pub fn try_send_request_with_fallback(
459 &mut self,
460 peer: PeerId,
461 request: Vec<u8>,
462 fallback: (ProtocolName, Vec<u8>),
463 dial_options: DialOptions,
464 ) -> crate::Result<RequestId> {
465 tracing::trace!(
466 target: LOG_TARGET,
467 ?peer,
468 fallback = %fallback.0,
469 ?dial_options,
470 "send request with fallback to peer",
471 );
472
473 let request_id = self.next_request_id();
474 self.command_tx
475 .try_send(RequestResponseCommand::SendRequestWithFallback {
476 peer,
477 request_id,
478 fallback,
479 request,
480 dial_options,
481 })
482 .map(|_| request_id)
483 .map_err(|_| Error::ChannelClogged)
484 }
485
486 /// Send response to remote peer.
487 pub fn send_response(&mut self, request_id: RequestId, response: Vec<u8>) {
488 match self.pending_responses.remove(&request_id) {
489 None => {
490 tracing::debug!(target: LOG_TARGET, ?request_id, "pending response doens't exist");
491 }
492 Some(response_tx) => {
493 tracing::trace!(target: LOG_TARGET, ?request_id, "send response to peer");
494
495 if let Err(_) = response_tx.send((response, None)) {
496 tracing::debug!(target: LOG_TARGET, ?request_id, "substream closed");
497 }
498 }
499 }
500 }
501
502 /// Send response to remote peer with feedback.
503 ///
504 /// The feedback system is inherited from Polkadot SDK's `sc-network` and it's used to notify
505 /// the sender of the response whether it was sent successfully or not. Once the response has
506 /// been sent over the substream successfully, `()` will be sent over the feedback channel
507 /// to the sender to notify them about it. If the substream has been closed or the substream
508 /// failed while sending the response, the feedback channel will be dropped, notifying the
509 /// sender that sending the response failed.
510 pub fn send_response_with_feedback(
511 &mut self,
512 request_id: RequestId,
513 response: Vec<u8>,
514 feedback: channel::oneshot::Sender<()>,
515 ) {
516 match self.pending_responses.remove(&request_id) {
517 None => {
518 tracing::debug!(target: LOG_TARGET, ?request_id, "pending response doens't exist");
519 }
520 Some(response_tx) => {
521 tracing::trace!(target: LOG_TARGET, ?request_id, "send response to peer");
522
523 if let Err(_) = response_tx.send((response, Some(feedback))) {
524 tracing::debug!(target: LOG_TARGET, ?request_id, "substream closed");
525 }
526 }
527 }
528 }
529}
530
531impl futures::Stream for RequestResponseHandle {
532 type Item = RequestResponseEvent;
533
534 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
535 match futures::ready!(self.event_rx.poll_recv(cx)) {
536 None => Poll::Ready(None),
537 Some(event) => match event {
538 InnerRequestResponseEvent::RequestReceived {
539 peer,
540 fallback,
541 request_id,
542 request,
543 response_tx,
544 } => {
545 self.pending_responses.insert(request_id, response_tx);
546 Poll::Ready(Some(RequestResponseEvent::RequestReceived {
547 peer,
548 fallback,
549 request_id,
550 request,
551 }))
552 }
553 event => Poll::Ready(Some(event.into())),
554 },
555 }
556 }
557}