1use crate::{
23 litep2p::shim::request_response::metrics::RequestResponseMetrics,
24 peer_store::PeerStoreProvider,
25 request_responses::{IncomingRequest, OutgoingResponse},
26 service::{metrics::Metrics, traits::RequestResponseConfig as RequestResponseConfigT},
27 IfDisconnected, OutboundFailure, ProtocolName, RequestFailure,
28};
29
30use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
31use litep2p::{
32 error::{ImmediateDialError, NegotiationError, SubstreamError},
33 protocol::request_response::{
34 DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
35 RequestResponseHandle,
36 },
37 types::RequestId,
38};
39
40use sc_network_types::PeerId;
41use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
42
43use std::{
44 collections::HashMap,
45 sync::Arc,
46 time::{Duration, Instant},
47};
48
49mod metrics;
50
51#[cfg(test)]
52mod tests;
53
54const LOG_TARGET: &str = "sub-libp2p::request-response";
56
57#[derive(Debug)]
59pub struct OutboundRequest {
60 peer: PeerId,
62
63 request: Vec<u8>,
65
66 fallback_request: Option<(Vec<u8>, ProtocolName)>,
68
69 sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
71
72 dial_behavior: IfDisconnected,
74}
75
76impl OutboundRequest {
77 pub fn new(
79 peer: PeerId,
80 request: Vec<u8>,
81 sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
82 fallback_request: Option<(Vec<u8>, ProtocolName)>,
83 dial_behavior: IfDisconnected,
84 ) -> Self {
85 OutboundRequest { peer, request, sender, fallback_request, dial_behavior }
86 }
87}
88
89struct PendingRequest {
91 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
92 started: Instant,
93 fallback_request: Option<(Vec<u8>, ProtocolName)>,
94}
95
96impl PendingRequest {
97 fn new(
99 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
100 started: Instant,
101 fallback_request: Option<(Vec<u8>, ProtocolName)>,
102 ) -> Self {
103 Self { tx, started, fallback_request }
104 }
105}
106
107#[derive(Debug)]
111pub struct RequestResponseConfig {
112 pub protocol_name: ProtocolName,
114
115 pub fallback_names: Vec<ProtocolName>,
117
118 pub max_request_size: u64,
120
121 pub max_response_size: u64,
123
124 pub request_timeout: Duration,
126
127 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
129}
130
131impl RequestResponseConfig {
132 pub(crate) fn new(
134 protocol_name: ProtocolName,
135 fallback_names: Vec<ProtocolName>,
136 max_request_size: u64,
137 max_response_size: u64,
138 request_timeout: Duration,
139 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
140 ) -> Self {
141 Self {
142 protocol_name,
143 fallback_names,
144 max_request_size,
145 max_response_size,
146 request_timeout,
147 inbound_queue,
148 }
149 }
150}
151
152impl RequestResponseConfigT for RequestResponseConfig {
153 fn protocol_name(&self) -> &ProtocolName {
154 &self.protocol_name
155 }
156}
157
158pub struct RequestResponseProtocol {
167 protocol: ProtocolName,
169
170 handle: RequestResponseHandle,
172
173 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
175
176 peerstore_handle: Arc<dyn PeerStoreProvider>,
178
179 pending_inbound_responses: HashMap<RequestId, PendingRequest>,
181
182 pending_outbound_responses: FuturesUnordered<
184 BoxFuture<'static, (litep2p::PeerId, RequestId, Result<OutgoingResponse, ()>, Instant)>,
185 >,
186
187 request_rx: TracingUnboundedReceiver<OutboundRequest>,
189
190 request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
196
197 metrics: RequestResponseMetrics,
199}
200
201impl RequestResponseProtocol {
202 pub fn new(
204 protocol: ProtocolName,
205 handle: RequestResponseHandle,
206 peerstore_handle: Arc<dyn PeerStoreProvider>,
207 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
208 request_rx: TracingUnboundedReceiver<OutboundRequest>,
209 request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
210 metrics: Option<Metrics>,
211 ) -> Self {
212 Self {
213 handle,
214 request_rx,
215 request_tx,
216 inbound_queue,
217 peerstore_handle,
218 protocol: protocol.clone(),
219 pending_inbound_responses: HashMap::new(),
220 pending_outbound_responses: FuturesUnordered::new(),
221 metrics: RequestResponseMetrics::new(metrics, protocol),
222 }
223 }
224
225 async fn on_send_request(
227 &mut self,
228 peer: PeerId,
229 request: Vec<u8>,
230 fallback_request: Option<(Vec<u8>, ProtocolName)>,
231 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
232 connect: IfDisconnected,
233 ) {
234 let dial_options = match connect {
235 IfDisconnected::TryConnect => DialOptions::Dial,
236 IfDisconnected::ImmediateError => DialOptions::Reject,
237 };
238
239 log::trace!(
240 target: LOG_TARGET,
241 "{}: send request to {:?} (fallback {:?}) (dial options: {:?})",
242 self.protocol,
243 peer,
244 fallback_request,
245 dial_options,
246 );
247
248 match self.handle.try_send_request(peer.into(), request, dial_options) {
249 Ok(request_id) => {
250 self.pending_inbound_responses
251 .insert(request_id, PendingRequest::new(tx, Instant::now(), fallback_request));
252 },
253 Err(error) => {
254 log::warn!(
255 target: LOG_TARGET,
256 "{}: failed to send request to {peer:?}: {error:?}",
257 self.protocol,
258 );
259
260 let _ = tx.send(Err(RequestFailure::Refused));
261 self.metrics.register_inbound_request_failure(error.to_string().as_ref());
262 },
263 }
264 }
265
266 fn on_inbound_request(
270 &mut self,
271 peer: litep2p::PeerId,
272 fallback: Option<litep2p::ProtocolName>,
273 request_id: RequestId,
274 request: Vec<u8>,
275 ) {
276 let Some(inbound_queue) = &self.inbound_queue else {
277 log::trace!(
278 target: LOG_TARGET,
279 "{}: rejecting inbound request from {peer:?}, protocol configured as outbound only",
280 self.protocol,
281 );
282
283 self.handle.reject_request(request_id);
284 return;
285 };
286
287 log::trace!(
288 target: LOG_TARGET,
289 "{}: request received from {peer:?} ({fallback:?} {request_id:?}), request size {:?}",
290 self.protocol,
291 request.len(),
292 );
293 let (tx, rx) = oneshot::channel();
294
295 match inbound_queue.try_send(IncomingRequest {
296 peer: peer.into(),
297 payload: request,
298 pending_response: tx,
299 }) {
300 Ok(_) => {
301 self.pending_outbound_responses.push(Box::pin(async move {
302 (peer, request_id, rx.await.map_err(|_| ()), Instant::now())
303 }));
304 },
305 Err(error) => {
306 log::trace!(
307 target: LOG_TARGET,
308 "{:?}: dropping request from {peer:?} ({request_id:?}), inbound queue full",
309 self.protocol,
310 );
311
312 self.handle.reject_request(request_id);
313 self.metrics.register_inbound_request_failure(error.to_string().as_ref());
314 },
315 }
316 }
317
318 fn on_inbound_response(
320 &mut self,
321 peer: litep2p::PeerId,
322 request_id: RequestId,
323 fallback: Option<litep2p::ProtocolName>,
324 response: Vec<u8>,
325 ) {
326 match self.pending_inbound_responses.remove(&request_id) {
327 None => log::warn!(
328 target: LOG_TARGET,
329 "{:?}: response received for {peer:?} but {request_id:?} doesn't exist",
330 self.protocol,
331 ),
332 Some(PendingRequest { tx, started, .. }) => {
333 log::trace!(
334 target: LOG_TARGET,
335 "{:?}: response received for {peer:?} ({request_id:?}), response size {:?}",
336 self.protocol,
337 response.len(),
338 );
339
340 let _ = tx.send(Ok((
341 response,
342 fallback.map_or_else(|| self.protocol.clone(), Into::into),
343 )));
344 self.metrics.register_outbound_request_success(started.elapsed());
345 },
346 }
347 }
348
349 fn on_request_failed(
351 &mut self,
352 peer: litep2p::PeerId,
353 request_id: RequestId,
354 error: RequestResponseError,
355 ) {
356 log::debug!(
357 target: LOG_TARGET,
358 "{:?}: request failed for {peer:?} ({request_id:?}): {error:?}",
359 self.protocol
360 );
361
362 let Some(PendingRequest { tx, fallback_request, .. }) =
363 self.pending_inbound_responses.remove(&request_id)
364 else {
365 log::warn!(
366 target: LOG_TARGET,
367 "{:?}: request failed for peer {peer:?} but {request_id:?} doesn't exist",
368 self.protocol,
369 );
370
371 return
372 };
373
374 let status = match error {
375 RequestResponseError::NotConnected =>
376 Some((RequestFailure::NotConnected, "not-connected")),
377 RequestResponseError::Rejected(reason) => {
378 let reason = match reason {
379 RejectReason::ConnectionClosed => "connection-closed",
380 RejectReason::SubstreamClosed => "substream-closed",
381 RejectReason::SubstreamOpenError(substream_error) => match substream_error {
382 SubstreamError::NegotiationError(NegotiationError::Timeout) =>
383 "substream-timeout",
384 _ => "substream-open-error",
385 },
386 RejectReason::DialFailed(None) => "dial-failed",
387 RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
388 "dial-already-connected",
389 RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
390 "dial-peerid-missing",
391 RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
392 "dial-tried-to-dial-self",
393 RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
394 "dial-no-address-available",
395 RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
396 "dial-task-closed",
397 RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
398 "dial-channel-clogged",
399 };
400
401 Some((RequestFailure::Refused, reason))
402 },
403 RequestResponseError::Timeout =>
404 Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
405 RequestResponseError::Canceled => {
406 log::debug!(
407 target: LOG_TARGET,
408 "{}: request canceled by local node to {peer:?} ({request_id:?})",
409 self.protocol,
410 );
411 None
412 },
413 RequestResponseError::TooLargePayload => {
414 log::warn!(
415 target: LOG_TARGET,
416 "{}: tried to send too large request to {peer:?} ({request_id:?})",
417 self.protocol,
418 );
419 Some((RequestFailure::Refused, "payload-too-large"))
420 },
421 RequestResponseError::UnsupportedProtocol => match fallback_request {
422 Some((request, protocol)) => match self.request_tx.get(&protocol) {
423 Some(sender) => {
424 log::debug!(
425 target: LOG_TARGET,
426 "{}: failed to negotiate protocol with {:?}, try fallback request: ({})",
427 self.protocol,
428 peer,
429 protocol,
430 );
431
432 let outbound_request = OutboundRequest::new(
433 peer.into(),
434 request,
435 tx,
436 None,
437 IfDisconnected::ImmediateError,
438 );
439
440 let _ = sender.unbounded_send(outbound_request);
446
447 return;
448 },
449 None => {
450 log::warn!(
451 target: LOG_TARGET,
452 "{}: fallback request provided but protocol ({}) doesn't exist (peer {:?})",
453 self.protocol,
454 protocol,
455 peer,
456 );
457
458 Some((RequestFailure::Refused, "invalid-fallback-protocol"))
459 },
460 },
461 None => Some((RequestFailure::Refused, "unsupported-protocol")),
462 },
463 };
464
465 if let Some((error, reason)) = status {
466 self.metrics.register_outbound_request_failure(reason);
467 let _ = tx.send(Err(error));
468 }
469 }
470
471 fn on_outbound_response(
473 &mut self,
474 peer: litep2p::PeerId,
475 request_id: RequestId,
476 response: OutgoingResponse,
477 started: Instant,
478 ) {
479 let OutgoingResponse { result, reputation_changes, sent_feedback } = response;
480
481 for change in reputation_changes {
482 log::trace!(target: LOG_TARGET, "{}: report {peer:?}: {change:?}", self.protocol);
483 self.peerstore_handle.report_peer(peer.into(), change);
484 }
485
486 match result {
487 Err(()) => {
488 log::debug!(
489 target: LOG_TARGET,
490 "{}: response rejected ({request_id:?}) for {peer:?}",
491 self.protocol,
492 );
493
494 self.handle.reject_request(request_id);
495 self.metrics.register_inbound_request_failure("rejected");
496 },
497 Ok(response) => {
498 log::trace!(
499 target: LOG_TARGET,
500 "{}: send response ({request_id:?}) to {peer:?}, response size {}",
501 self.protocol,
502 response.len(),
503 );
504
505 match sent_feedback {
506 None => self.handle.send_response(request_id, response),
507 Some(feedback) =>
508 self.handle.send_response_with_feedback(request_id, response, feedback),
509 }
510
511 self.metrics.register_inbound_request_success(started.elapsed());
512 },
513 }
514 }
515
516 pub async fn run(mut self) {
518 loop {
519 tokio::select! {
520 event = self.handle.next() => match event {
521 None => return,
522 Some(RequestResponseEvent::RequestReceived {
523 peer,
524 fallback,
525 request_id,
526 request,
527 }) => self.on_inbound_request(peer, fallback, request_id, request),
528 Some(RequestResponseEvent::ResponseReceived { peer, request_id, fallback, response }) => {
529 self.on_inbound_response(peer, request_id, fallback, response);
530 },
531 Some(RequestResponseEvent::RequestFailed { peer, request_id, error }) => {
532 self.on_request_failed(peer, request_id, error);
533 },
534 },
535 event = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => match event {
536 None => return,
537 Some((peer, request_id, Err(()), _)) => {
538 log::debug!(target: LOG_TARGET, "{}: reject request ({request_id:?}) from {peer:?}", self.protocol);
539
540 self.handle.reject_request(request_id);
541 self.metrics.register_inbound_request_failure("rejected");
542 }
543 Some((peer, request_id, Ok(response), started)) => {
544 self.on_outbound_response(peer, request_id, response, started);
545 }
546 },
547 event = self.request_rx.next() => match event {
548 None => return,
549 Some(outbound_request) => {
550 let OutboundRequest { peer, request, sender, dial_behavior, fallback_request } = outbound_request;
551
552 self.on_send_request(peer, request, fallback_request, sender, dial_behavior).await;
553 }
554 }
555 }
556 }
557 }
558}