use crate::{
litep2p::shim::request_response::metrics::RequestResponseMetrics,
peer_store::PeerStoreProvider,
request_responses::{IncomingRequest, OutgoingResponse},
service::{metrics::Metrics, traits::RequestResponseConfig as RequestResponseConfigT},
IfDisconnected, OutboundFailure, ProtocolName, RequestFailure,
};
use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use litep2p::{
error::{ImmediateDialError, NegotiationError, SubstreamError},
protocol::request_response::{
DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
RequestResponseHandle,
},
types::RequestId,
};
use sc_network_types::PeerId;
use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
mod metrics;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "sub-libp2p::request-response";
#[derive(Debug)]
pub struct OutboundRequest {
peer: PeerId,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
dial_behavior: IfDisconnected,
}
impl OutboundRequest {
pub fn new(
peer: PeerId,
request: Vec<u8>,
sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
dial_behavior: IfDisconnected,
) -> Self {
OutboundRequest { peer, request, sender, fallback_request, dial_behavior }
}
}
struct PendingRequest {
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
started: Instant,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
}
impl PendingRequest {
fn new(
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
started: Instant,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
) -> Self {
Self { tx, started, fallback_request }
}
}
#[derive(Debug)]
pub struct RequestResponseConfig {
pub protocol_name: ProtocolName,
pub fallback_names: Vec<ProtocolName>,
pub max_request_size: u64,
pub max_response_size: u64,
pub request_timeout: Duration,
pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
}
impl RequestResponseConfig {
pub(crate) fn new(
protocol_name: ProtocolName,
fallback_names: Vec<ProtocolName>,
max_request_size: u64,
max_response_size: u64,
request_timeout: Duration,
inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
) -> Self {
Self {
protocol_name,
fallback_names,
max_request_size,
max_response_size,
request_timeout,
inbound_queue,
}
}
}
impl RequestResponseConfigT for RequestResponseConfig {
fn protocol_name(&self) -> &ProtocolName {
&self.protocol_name
}
}
pub struct RequestResponseProtocol {
protocol: ProtocolName,
handle: RequestResponseHandle,
inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
peerstore_handle: Arc<dyn PeerStoreProvider>,
pending_inbound_responses: HashMap<RequestId, PendingRequest>,
pending_outbound_responses: FuturesUnordered<
BoxFuture<'static, (litep2p::PeerId, RequestId, Result<OutgoingResponse, ()>, Instant)>,
>,
request_rx: TracingUnboundedReceiver<OutboundRequest>,
request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
metrics: RequestResponseMetrics,
}
impl RequestResponseProtocol {
pub fn new(
protocol: ProtocolName,
handle: RequestResponseHandle,
peerstore_handle: Arc<dyn PeerStoreProvider>,
inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
request_rx: TracingUnboundedReceiver<OutboundRequest>,
request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
metrics: Option<Metrics>,
) -> Self {
Self {
handle,
request_rx,
request_tx,
inbound_queue,
peerstore_handle,
protocol: protocol.clone(),
pending_inbound_responses: HashMap::new(),
pending_outbound_responses: FuturesUnordered::new(),
metrics: RequestResponseMetrics::new(metrics, protocol),
}
}
async fn on_send_request(
&mut self,
peer: PeerId,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
let dial_options = match connect {
IfDisconnected::TryConnect => DialOptions::Dial,
IfDisconnected::ImmediateError => DialOptions::Reject,
};
log::trace!(
target: LOG_TARGET,
"{}: send request to {:?} (fallback {:?}) (dial options: {:?})",
self.protocol,
peer,
fallback_request,
dial_options,
);
match self.handle.try_send_request(peer.into(), request, dial_options) {
Ok(request_id) => {
self.pending_inbound_responses
.insert(request_id, PendingRequest::new(tx, Instant::now(), fallback_request));
},
Err(error) => {
log::warn!(
target: LOG_TARGET,
"{}: failed to send request to {peer:?}: {error:?}",
self.protocol,
);
let _ = tx.send(Err(RequestFailure::Refused));
self.metrics.register_inbound_request_failure(error.to_string().as_ref());
},
}
}
fn on_inbound_request(
&mut self,
peer: litep2p::PeerId,
fallback: Option<litep2p::ProtocolName>,
request_id: RequestId,
request: Vec<u8>,
) {
let Some(inbound_queue) = &self.inbound_queue else {
log::trace!(
target: LOG_TARGET,
"{}: rejecting inbound request from {peer:?}, protocol configured as outbound only",
self.protocol,
);
self.handle.reject_request(request_id);
return;
};
log::trace!(
target: LOG_TARGET,
"{}: request received from {peer:?} ({fallback:?} {request_id:?}), request size {:?}",
self.protocol,
request.len(),
);
let (tx, rx) = oneshot::channel();
match inbound_queue.try_send(IncomingRequest {
peer: peer.into(),
payload: request,
pending_response: tx,
}) {
Ok(_) => {
self.pending_outbound_responses.push(Box::pin(async move {
(peer, request_id, rx.await.map_err(|_| ()), Instant::now())
}));
},
Err(error) => {
log::trace!(
target: LOG_TARGET,
"{:?}: dropping request from {peer:?} ({request_id:?}), inbound queue full",
self.protocol,
);
self.handle.reject_request(request_id);
self.metrics.register_inbound_request_failure(error.to_string().as_ref());
},
}
}
fn on_inbound_response(
&mut self,
peer: litep2p::PeerId,
request_id: RequestId,
_fallback: Option<litep2p::ProtocolName>,
response: Vec<u8>,
) {
match self.pending_inbound_responses.remove(&request_id) {
None => log::warn!(
target: LOG_TARGET,
"{:?}: response received for {peer:?} but {request_id:?} doesn't exist",
self.protocol,
),
Some(PendingRequest { tx, started, .. }) => {
log::trace!(
target: LOG_TARGET,
"{:?}: response received for {peer:?} ({request_id:?}), response size {:?}",
self.protocol,
response.len(),
);
let _ = tx.send(Ok((response, self.protocol.clone())));
self.metrics.register_outbound_request_success(started.elapsed());
},
}
}
fn on_request_failed(
&mut self,
peer: litep2p::PeerId,
request_id: RequestId,
error: RequestResponseError,
) {
log::debug!(
target: LOG_TARGET,
"{:?}: request failed for {peer:?} ({request_id:?}): {error:?}",
self.protocol
);
let Some(PendingRequest { tx, fallback_request, .. }) =
self.pending_inbound_responses.remove(&request_id)
else {
log::warn!(
target: LOG_TARGET,
"{:?}: request failed for peer {peer:?} but {request_id:?} doesn't exist",
self.protocol,
);
return
};
let status = match error {
RequestResponseError::NotConnected =>
Some((RequestFailure::NotConnected, "not-connected")),
RequestResponseError::Rejected(reason) => {
let reason = match reason {
RejectReason::ConnectionClosed => "connection-closed",
RejectReason::SubstreamClosed => "substream-closed",
RejectReason::SubstreamOpenError(substream_error) => match substream_error {
SubstreamError::NegotiationError(NegotiationError::Timeout) =>
"substream-timeout",
_ => "substream-open-error",
},
RejectReason::DialFailed(None) => "dial-failed",
RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
"dial-already-connected",
RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
"dial-peerid-missing",
RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
"dial-tried-to-dial-self",
RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
"dial-no-address-available",
RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
"dial-task-closed",
RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
"dial-channel-clogged",
};
Some((RequestFailure::Refused, reason))
},
RequestResponseError::Timeout =>
Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
RequestResponseError::Canceled => {
log::debug!(
target: LOG_TARGET,
"{}: request canceled by local node to {peer:?} ({request_id:?})",
self.protocol,
);
None
},
RequestResponseError::TooLargePayload => {
log::warn!(
target: LOG_TARGET,
"{}: tried to send too large request to {peer:?} ({request_id:?})",
self.protocol,
);
Some((RequestFailure::Refused, "payload-too-large"))
},
RequestResponseError::UnsupportedProtocol => match fallback_request {
Some((request, protocol)) => match self.request_tx.get(&protocol) {
Some(sender) => {
log::debug!(
target: LOG_TARGET,
"{}: failed to negotiate protocol with {:?}, try fallback request: ({})",
self.protocol,
peer,
protocol,
);
let outbound_request = OutboundRequest::new(
peer.into(),
request,
tx,
None,
IfDisconnected::ImmediateError,
);
let _ = sender.unbounded_send(outbound_request);
return;
},
None => {
log::warn!(
target: LOG_TARGET,
"{}: fallback request provided but protocol ({}) doesn't exist (peer {:?})",
self.protocol,
protocol,
peer,
);
Some((RequestFailure::Refused, "invalid-fallback-protocol"))
},
},
None => Some((RequestFailure::Refused, "unsupported-protocol")),
},
};
if let Some((error, reason)) = status {
self.metrics.register_outbound_request_failure(reason);
let _ = tx.send(Err(error));
}
}
fn on_outbound_response(
&mut self,
peer: litep2p::PeerId,
request_id: RequestId,
response: OutgoingResponse,
started: Instant,
) {
let OutgoingResponse { result, reputation_changes, sent_feedback } = response;
for change in reputation_changes {
log::trace!(target: LOG_TARGET, "{}: report {peer:?}: {change:?}", self.protocol);
self.peerstore_handle.report_peer(peer.into(), change);
}
match result {
Err(()) => {
log::debug!(
target: LOG_TARGET,
"{}: response rejected ({request_id:?}) for {peer:?}",
self.protocol,
);
self.handle.reject_request(request_id);
self.metrics.register_inbound_request_failure("rejected");
},
Ok(response) => {
log::trace!(
target: LOG_TARGET,
"{}: send response ({request_id:?}) to {peer:?}, response size {}",
self.protocol,
response.len(),
);
match sent_feedback {
None => self.handle.send_response(request_id, response),
Some(feedback) =>
self.handle.send_response_with_feedback(request_id, response, feedback),
}
self.metrics.register_inbound_request_success(started.elapsed());
},
}
}
pub async fn run(mut self) {
loop {
tokio::select! {
event = self.handle.next() => match event {
None => return,
Some(RequestResponseEvent::RequestReceived {
peer,
fallback,
request_id,
request,
}) => self.on_inbound_request(peer, fallback, request_id, request),
Some(RequestResponseEvent::ResponseReceived { peer, request_id, fallback, response }) => {
self.on_inbound_response(peer, request_id, fallback, response);
},
Some(RequestResponseEvent::RequestFailed { peer, request_id, error }) => {
self.on_request_failed(peer, request_id, error);
},
},
event = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => match event {
None => return,
Some((peer, request_id, Err(()), _)) => {
log::debug!(target: LOG_TARGET, "{}: reject request ({request_id:?}) from {peer:?}", self.protocol);
self.handle.reject_request(request_id);
self.metrics.register_inbound_request_failure("rejected");
}
Some((peer, request_id, Ok(response), started)) => {
self.on_outbound_response(peer, request_id, response, started);
}
},
event = self.request_rx.next() => match event {
None => return,
Some(outbound_request) => {
let OutboundRequest { peer, request, sender, dial_behavior, fallback_request } = outbound_request;
self.on_send_request(peer, request, fallback_request, sender, dial_behavior).await;
}
}
}
}
}
}