1use crate::{
23 litep2p::shim::request_response::metrics::RequestResponseMetrics,
24 peer_store::PeerStoreProvider,
25 request_responses::{IncomingRequest, OutgoingResponse},
26 service::{metrics::Metrics, traits::RequestResponseConfig as RequestResponseConfigT},
27 IfDisconnected, OutboundFailure, ProtocolName, RequestFailure,
28};
29
30use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
31use litep2p::{
32 error::{ImmediateDialError, NegotiationError, SubstreamError},
33 protocol::request_response::{
34 DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
35 RequestResponseHandle,
36 },
37 types::RequestId,
38};
39
40use sc_network_types::PeerId;
41use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
42
43use std::{
44 collections::HashMap,
45 sync::Arc,
46 time::{Duration, Instant},
47};
48
49mod metrics;
50
51#[cfg(test)]
52mod tests;
53
54const LOG_TARGET: &str = "sub-libp2p::request-response";
56
57#[derive(Debug)]
59pub struct OutboundRequest {
60 peer: PeerId,
62
63 request: Vec<u8>,
65
66 fallback_request: Option<(Vec<u8>, ProtocolName)>,
68
69 sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
71
72 dial_behavior: IfDisconnected,
74}
75
76impl OutboundRequest {
77 pub fn new(
79 peer: PeerId,
80 request: Vec<u8>,
81 sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
82 fallback_request: Option<(Vec<u8>, ProtocolName)>,
83 dial_behavior: IfDisconnected,
84 ) -> Self {
85 OutboundRequest { peer, request, sender, fallback_request, dial_behavior }
86 }
87}
88
89struct PendingRequest {
91 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
92 started: Instant,
93 fallback_request: Option<(Vec<u8>, ProtocolName)>,
94}
95
96impl PendingRequest {
97 fn new(
99 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
100 started: Instant,
101 fallback_request: Option<(Vec<u8>, ProtocolName)>,
102 ) -> Self {
103 Self { tx, started, fallback_request }
104 }
105}
106
107#[derive(Debug)]
111pub struct RequestResponseConfig {
112 pub protocol_name: ProtocolName,
114
115 pub fallback_names: Vec<ProtocolName>,
117
118 pub max_request_size: u64,
120
121 pub max_response_size: u64,
123
124 pub request_timeout: Duration,
126
127 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
129}
130
131impl RequestResponseConfig {
132 pub(crate) fn new(
134 protocol_name: ProtocolName,
135 fallback_names: Vec<ProtocolName>,
136 max_request_size: u64,
137 max_response_size: u64,
138 request_timeout: Duration,
139 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
140 ) -> Self {
141 Self {
142 protocol_name,
143 fallback_names,
144 max_request_size,
145 max_response_size,
146 request_timeout,
147 inbound_queue,
148 }
149 }
150}
151
152impl RequestResponseConfigT for RequestResponseConfig {
153 fn protocol_name(&self) -> &ProtocolName {
154 &self.protocol_name
155 }
156}
157
158pub struct RequestResponseProtocol {
167 protocol: ProtocolName,
169
170 handle: RequestResponseHandle,
172
173 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
175
176 peerstore_handle: Arc<dyn PeerStoreProvider>,
178
179 pending_inbound_responses: HashMap<RequestId, PendingRequest>,
181
182 pending_outbound_responses: FuturesUnordered<
184 BoxFuture<'static, (litep2p::PeerId, RequestId, Result<OutgoingResponse, ()>, Instant)>,
185 >,
186
187 request_rx: TracingUnboundedReceiver<OutboundRequest>,
189
190 request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
196
197 metrics: RequestResponseMetrics,
199}
200
201impl RequestResponseProtocol {
202 pub fn new(
204 protocol: ProtocolName,
205 handle: RequestResponseHandle,
206 peerstore_handle: Arc<dyn PeerStoreProvider>,
207 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
208 request_rx: TracingUnboundedReceiver<OutboundRequest>,
209 request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
210 metrics: Option<Metrics>,
211 ) -> Self {
212 Self {
213 handle,
214 request_rx,
215 request_tx,
216 inbound_queue,
217 peerstore_handle,
218 protocol: protocol.clone(),
219 pending_inbound_responses: HashMap::new(),
220 pending_outbound_responses: FuturesUnordered::new(),
221 metrics: RequestResponseMetrics::new(metrics, protocol),
222 }
223 }
224
225 async fn on_send_request(
227 &mut self,
228 peer: PeerId,
229 request: Vec<u8>,
230 fallback_request: Option<(Vec<u8>, ProtocolName)>,
231 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
232 connect: IfDisconnected,
233 ) {
234 let dial_options = match connect {
235 IfDisconnected::TryConnect => DialOptions::Dial,
236 IfDisconnected::ImmediateError => DialOptions::Reject,
237 };
238
239 log::trace!(
240 target: LOG_TARGET,
241 "{}: send request to {:?} (fallback {:?}) (dial options: {:?})",
242 self.protocol,
243 peer,
244 fallback_request,
245 dial_options,
246 );
247
248 match self.handle.try_send_request(peer.into(), request, dial_options) {
249 Ok(request_id) => {
250 self.pending_inbound_responses
251 .insert(request_id, PendingRequest::new(tx, Instant::now(), fallback_request));
252 },
253 Err(error) => {
254 log::warn!(
255 target: LOG_TARGET,
256 "{}: failed to send request to {peer:?}: {error:?}",
257 self.protocol,
258 );
259
260 let _ = tx.send(Err(RequestFailure::Refused));
261 self.metrics.register_inbound_request_failure(error.to_string().as_ref());
262 },
263 }
264 }
265
266 fn on_inbound_request(
270 &mut self,
271 peer: litep2p::PeerId,
272 fallback: Option<litep2p::ProtocolName>,
273 request_id: RequestId,
274 request: Vec<u8>,
275 ) {
276 log::trace!(
277 target: LOG_TARGET,
278 "{}: request received from {peer:?} ({fallback:?} {request_id:?}), request size {:?}",
279 self.protocol,
280 request.len(),
281 );
282
283 let Some(inbound_queue) = &self.inbound_queue else {
284 log::trace!(
285 target: LOG_TARGET,
286 "{}: rejecting inbound request from {peer:?}, protocol configured as outbound only",
287 self.protocol,
288 );
289
290 self.handle.reject_request(request_id);
291 return;
292 };
293
294 if self.peerstore_handle.is_banned(&peer.into()) {
295 log::trace!(
296 target: LOG_TARGET,
297 "{}: rejecting inbound request from banned {peer:?} ({request_id:?})",
298 self.protocol,
299 );
300
301 self.handle.reject_request(request_id);
302 self.metrics.register_inbound_request_failure("banned-peer");
303 return;
304 }
305
306 let (tx, rx) = oneshot::channel();
307
308 match inbound_queue.try_send(IncomingRequest {
309 peer: peer.into(),
310 payload: request,
311 pending_response: tx,
312 }) {
313 Ok(_) => {
314 self.pending_outbound_responses.push(Box::pin(async move {
315 (peer, request_id, rx.await.map_err(|_| ()), Instant::now())
316 }));
317 },
318 Err(error) => {
319 log::trace!(
320 target: LOG_TARGET,
321 "{:?}: dropping request from {peer:?} ({request_id:?}), inbound queue full",
322 self.protocol,
323 );
324
325 self.handle.reject_request(request_id);
326 self.metrics.register_inbound_request_failure(error.to_string().as_ref());
327 },
328 }
329 }
330
331 fn on_inbound_response(
333 &mut self,
334 peer: litep2p::PeerId,
335 request_id: RequestId,
336 _fallback: Option<litep2p::ProtocolName>,
337 response: Vec<u8>,
338 ) {
339 match self.pending_inbound_responses.remove(&request_id) {
340 None => log::warn!(
341 target: LOG_TARGET,
342 "{:?}: response received for {peer:?} but {request_id:?} doesn't exist",
343 self.protocol,
344 ),
345 Some(PendingRequest { tx, started, .. }) => {
346 log::trace!(
347 target: LOG_TARGET,
348 "{:?}: response received for {peer:?} ({request_id:?}), response size {:?}",
349 self.protocol,
350 response.len(),
351 );
352
353 let _ = tx.send(Ok((response, self.protocol.clone())));
354 self.metrics.register_outbound_request_success(started.elapsed());
355 },
356 }
357 }
358
359 fn on_request_failed(
361 &mut self,
362 peer: litep2p::PeerId,
363 request_id: RequestId,
364 error: RequestResponseError,
365 ) {
366 log::debug!(
367 target: LOG_TARGET,
368 "{:?}: request failed for {peer:?} ({request_id:?}): {error:?}",
369 self.protocol
370 );
371
372 let Some(PendingRequest { tx, fallback_request, .. }) =
373 self.pending_inbound_responses.remove(&request_id)
374 else {
375 log::warn!(
376 target: LOG_TARGET,
377 "{:?}: request failed for peer {peer:?} but {request_id:?} doesn't exist",
378 self.protocol,
379 );
380
381 return
382 };
383
384 let status = match error {
385 RequestResponseError::NotConnected =>
386 Some((RequestFailure::NotConnected, "not-connected")),
387 RequestResponseError::Rejected(reason) => {
388 let reason = match reason {
389 RejectReason::ConnectionClosed => "connection-closed",
390 RejectReason::SubstreamClosed => "substream-closed",
391 RejectReason::SubstreamOpenError(substream_error) => match substream_error {
392 SubstreamError::NegotiationError(NegotiationError::Timeout) =>
393 "substream-timeout",
394 _ => "substream-open-error",
395 },
396 RejectReason::DialFailed(None) => "dial-failed",
397 RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
398 "dial-already-connected",
399 RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
400 "dial-peerid-missing",
401 RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
402 "dial-tried-to-dial-self",
403 RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
404 "dial-no-address-available",
405 RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
406 "dial-task-closed",
407 RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
408 "dial-channel-clogged",
409 };
410
411 Some((RequestFailure::Refused, reason))
412 },
413 RequestResponseError::Timeout =>
414 Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
415 RequestResponseError::Canceled => {
416 log::debug!(
417 target: LOG_TARGET,
418 "{}: request canceled by local node to {peer:?} ({request_id:?})",
419 self.protocol,
420 );
421 None
422 },
423 RequestResponseError::TooLargePayload => {
424 log::warn!(
425 target: LOG_TARGET,
426 "{}: tried to send too large request to {peer:?} ({request_id:?})",
427 self.protocol,
428 );
429 Some((RequestFailure::Refused, "payload-too-large"))
430 },
431 RequestResponseError::UnsupportedProtocol => match fallback_request {
432 Some((request, protocol)) => match self.request_tx.get(&protocol) {
433 Some(sender) => {
434 log::debug!(
435 target: LOG_TARGET,
436 "{}: failed to negotiate protocol with {:?}. Trying the fallback protocol ({})",
437 self.protocol,
438 peer,
439 protocol,
440 );
441
442 let outbound_request = OutboundRequest::new(
443 peer.into(),
444 request,
445 tx,
446 None,
447 IfDisconnected::ImmediateError,
448 );
449
450 let _ = sender.unbounded_send(outbound_request);
456
457 return;
458 },
459 None => {
460 log::warn!(
461 target: LOG_TARGET,
462 "{}: fallback request provided but protocol ({}) doesn't exist (peer {:?})",
463 self.protocol,
464 protocol,
465 peer,
466 );
467
468 Some((RequestFailure::Refused, "invalid-fallback-protocol"))
469 },
470 },
471 None => Some((RequestFailure::Refused, "unsupported-protocol")),
472 },
473 };
474
475 if let Some((error, reason)) = status {
476 self.metrics.register_outbound_request_failure(reason);
477 let _ = tx.send(Err(error));
478 }
479 }
480
481 fn on_outbound_response(
483 &mut self,
484 peer: litep2p::PeerId,
485 request_id: RequestId,
486 response: OutgoingResponse,
487 started: Instant,
488 ) {
489 let OutgoingResponse { result, reputation_changes, sent_feedback } = response;
490
491 for change in reputation_changes {
492 log::trace!(target: LOG_TARGET, "{}: report {peer:?}: {change:?}", self.protocol);
493 self.peerstore_handle.report_peer(peer.into(), change);
494 }
495
496 match result {
497 Err(()) => {
498 log::debug!(
499 target: LOG_TARGET,
500 "{}: response rejected ({request_id:?}) for {peer:?}",
501 self.protocol,
502 );
503
504 self.handle.reject_request(request_id);
505 self.metrics.register_inbound_request_failure("rejected");
506 },
507 Ok(response) => {
508 log::trace!(
509 target: LOG_TARGET,
510 "{}: send response ({request_id:?}) to {peer:?}, response size {}",
511 self.protocol,
512 response.len(),
513 );
514
515 match sent_feedback {
516 None => self.handle.send_response(request_id, response),
517 Some(feedback) =>
518 self.handle.send_response_with_feedback(request_id, response, feedback),
519 }
520
521 self.metrics.register_inbound_request_success(started.elapsed());
522 },
523 }
524 }
525
526 pub async fn run(mut self) {
528 loop {
529 tokio::select! {
530 event = self.handle.next() => match event {
531 None => return,
532 Some(RequestResponseEvent::RequestReceived {
533 peer,
534 fallback,
535 request_id,
536 request,
537 }) => self.on_inbound_request(peer, fallback, request_id, request),
538 Some(RequestResponseEvent::ResponseReceived { peer, request_id, fallback, response }) => {
539 self.on_inbound_response(peer, request_id, fallback, response);
540 },
541 Some(RequestResponseEvent::RequestFailed { peer, request_id, error }) => {
542 self.on_request_failed(peer, request_id, error);
543 },
544 },
545 event = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => match event {
546 None => return,
547 Some((peer, request_id, Err(()), _)) => {
548 log::debug!(target: LOG_TARGET, "{}: reject request ({request_id:?}) from {peer:?}", self.protocol);
549
550 self.handle.reject_request(request_id);
551 self.metrics.register_inbound_request_failure("rejected");
552 }
553 Some((peer, request_id, Ok(response), started)) => {
554 self.on_outbound_response(peer, request_id, response, started);
555 }
556 },
557 event = self.request_rx.next() => match event {
558 None => return,
559 Some(outbound_request) => {
560 let OutboundRequest { peer, request, sender, dial_behavior, fallback_request } = outbound_request;
561
562 self.on_send_request(peer, request, fallback_request, sender, dial_behavior).await;
563 }
564 }
565 }
566 }
567 }
568}