referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/shim/request_response/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Shim for litep2p's request-response implementation to make it work with `sc_network`'s
20//! request-response API.
21
22use crate::{
23	litep2p::shim::request_response::metrics::RequestResponseMetrics,
24	peer_store::PeerStoreProvider,
25	request_responses::{IncomingRequest, OutgoingResponse},
26	service::{metrics::Metrics, traits::RequestResponseConfig as RequestResponseConfigT},
27	IfDisconnected, OutboundFailure, ProtocolName, RequestFailure,
28};
29
30use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, StreamExt};
31use litep2p::{
32	error::{ImmediateDialError, NegotiationError, SubstreamError},
33	protocol::request_response::{
34		DialOptions, RejectReason, RequestResponseError, RequestResponseEvent,
35		RequestResponseHandle,
36	},
37	types::RequestId,
38};
39
40use sc_network_types::PeerId;
41use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
42
43use std::{
44	collections::HashMap,
45	sync::Arc,
46	time::{Duration, Instant},
47};
48
49mod metrics;
50
51#[cfg(test)]
52mod tests;
53
54/// Logging target for the file.
55const LOG_TARGET: &str = "sub-libp2p::request-response";
56
57/// Type containing information related to an outbound request.
58#[derive(Debug)]
59pub struct OutboundRequest {
60	/// Peer ID.
61	peer: PeerId,
62
63	/// Request.
64	request: Vec<u8>,
65
66	/// Fallback request, if provided.
67	fallback_request: Option<(Vec<u8>, ProtocolName)>,
68
69	/// `oneshot::Sender` for sending the received response, or failure.
70	sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
71
72	/// What should the node do if `peer` is disconnected.
73	dial_behavior: IfDisconnected,
74}
75
76impl OutboundRequest {
77	/// Create new [`OutboundRequest`].
78	pub fn new(
79		peer: PeerId,
80		request: Vec<u8>,
81		sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
82		fallback_request: Option<(Vec<u8>, ProtocolName)>,
83		dial_behavior: IfDisconnected,
84	) -> Self {
85		OutboundRequest { peer, request, sender, fallback_request, dial_behavior }
86	}
87}
88
89/// Pending request.
90struct PendingRequest {
91	tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
92	started: Instant,
93	fallback_request: Option<(Vec<u8>, ProtocolName)>,
94}
95
96impl PendingRequest {
97	/// Create new [`PendingRequest`].
98	fn new(
99		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
100		started: Instant,
101		fallback_request: Option<(Vec<u8>, ProtocolName)>,
102	) -> Self {
103		Self { tx, started, fallback_request }
104	}
105}
106
107/// Request-response protocol configuration.
108///
109/// See [`RequestResponseConfiguration`](crate::request_response::ProtocolConfig) for more details.
110#[derive(Debug)]
111pub struct RequestResponseConfig {
112	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
113	pub protocol_name: ProtocolName,
114
115	/// Fallback on the wire protocol names to support.
116	pub fallback_names: Vec<ProtocolName>,
117
118	/// Maximum allowed size, in bytes, of a request.
119	pub max_request_size: u64,
120
121	/// Maximum allowed size, in bytes, of a response.
122	pub max_response_size: u64,
123
124	/// Duration after which emitted requests are considered timed out.
125	pub request_timeout: Duration,
126
127	/// Channel on which the networking service will send incoming requests.
128	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
129}
130
131impl RequestResponseConfig {
132	/// Create new [`RequestResponseConfig`].
133	pub(crate) fn new(
134		protocol_name: ProtocolName,
135		fallback_names: Vec<ProtocolName>,
136		max_request_size: u64,
137		max_response_size: u64,
138		request_timeout: Duration,
139		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
140	) -> Self {
141		Self {
142			protocol_name,
143			fallback_names,
144			max_request_size,
145			max_response_size,
146			request_timeout,
147			inbound_queue,
148		}
149	}
150}
151
152impl RequestResponseConfigT for RequestResponseConfig {
153	fn protocol_name(&self) -> &ProtocolName {
154		&self.protocol_name
155	}
156}
157
158/// Request-response protocol.
159///
160/// This is slightly different from the `RequestResponsesBehaviour` in that it is protocol-specific,
161/// meaning there is an instance of `RequestResponseProtocol` for each installed request-response
162/// protocol and that instance deals only with the requests and responses of that protocol, nothing
163/// else. It also differs from the other implementation by combining both inbound and outbound
164/// requests under one instance so all request-response-related behavior of any given protocol is
165/// handled through one instance of `RequestResponseProtocol`.
166pub struct RequestResponseProtocol {
167	/// Protocol name.
168	protocol: ProtocolName,
169
170	/// Handle to request-response protocol.
171	handle: RequestResponseHandle,
172
173	/// Inbound queue for sending received requests to protocol implementation in Polkadot SDK.
174	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
175
176	/// Handle to `Peerstore`.
177	peerstore_handle: Arc<dyn PeerStoreProvider>,
178
179	/// Pending responses.
180	pending_inbound_responses: HashMap<RequestId, PendingRequest>,
181
182	/// Pending outbound responses.
183	pending_outbound_responses: FuturesUnordered<
184		BoxFuture<'static, (litep2p::PeerId, RequestId, Result<OutgoingResponse, ()>, Instant)>,
185	>,
186
187	/// RX channel for receiving info for outbound requests.
188	request_rx: TracingUnboundedReceiver<OutboundRequest>,
189
190	/// Map of supported request-response protocols which are used to support fallback requests.
191	///
192	/// If negotiation for the main protocol fails and the request was sent with a fallback,
193	/// [`RequestResponseProtocol`] queries this map and sends the request that protocol for
194	/// processing.
195	request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
196
197	/// Metrics, if enabled.
198	metrics: RequestResponseMetrics,
199}
200
201impl RequestResponseProtocol {
202	/// Create new [`RequestResponseProtocol`].
203	pub fn new(
204		protocol: ProtocolName,
205		handle: RequestResponseHandle,
206		peerstore_handle: Arc<dyn PeerStoreProvider>,
207		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
208		request_rx: TracingUnboundedReceiver<OutboundRequest>,
209		request_tx: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
210		metrics: Option<Metrics>,
211	) -> Self {
212		Self {
213			handle,
214			request_rx,
215			request_tx,
216			inbound_queue,
217			peerstore_handle,
218			protocol: protocol.clone(),
219			pending_inbound_responses: HashMap::new(),
220			pending_outbound_responses: FuturesUnordered::new(),
221			metrics: RequestResponseMetrics::new(metrics, protocol),
222		}
223	}
224
225	/// Send `request` to `peer`.
226	async fn on_send_request(
227		&mut self,
228		peer: PeerId,
229		request: Vec<u8>,
230		fallback_request: Option<(Vec<u8>, ProtocolName)>,
231		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
232		connect: IfDisconnected,
233	) {
234		let dial_options = match connect {
235			IfDisconnected::TryConnect => DialOptions::Dial,
236			IfDisconnected::ImmediateError => DialOptions::Reject,
237		};
238
239		log::trace!(
240			target: LOG_TARGET,
241			"{}: send request to {:?} (fallback {:?}) (dial options: {:?})",
242			self.protocol,
243			peer,
244			fallback_request,
245			dial_options,
246		);
247
248		match self.handle.try_send_request(peer.into(), request, dial_options) {
249			Ok(request_id) => {
250				self.pending_inbound_responses
251					.insert(request_id, PendingRequest::new(tx, Instant::now(), fallback_request));
252			},
253			Err(error) => {
254				log::warn!(
255					target: LOG_TARGET,
256					"{}: failed to send request to {peer:?}: {error:?}",
257					self.protocol,
258				);
259
260				let _ = tx.send(Err(RequestFailure::Refused));
261				self.metrics.register_inbound_request_failure(error.to_string().as_ref());
262			},
263		}
264	}
265
266	/// Handle inbound request from `peer`
267	///
268	/// If the protocol is configured outbound only, reject the request immediately.
269	fn on_inbound_request(
270		&mut self,
271		peer: litep2p::PeerId,
272		fallback: Option<litep2p::ProtocolName>,
273		request_id: RequestId,
274		request: Vec<u8>,
275	) {
276		log::trace!(
277			target: LOG_TARGET,
278			"{}: request received from {peer:?} ({fallback:?} {request_id:?}), request size {:?}",
279			self.protocol,
280			request.len(),
281		);
282
283		let Some(inbound_queue) = &self.inbound_queue else {
284			log::trace!(
285				target: LOG_TARGET,
286				"{}: rejecting inbound request from {peer:?}, protocol configured as outbound only",
287				self.protocol,
288			);
289
290			self.handle.reject_request(request_id);
291			return;
292		};
293
294		if self.peerstore_handle.is_banned(&peer.into()) {
295			log::trace!(
296				target: LOG_TARGET,
297				"{}: rejecting inbound request from banned {peer:?} ({request_id:?})",
298				self.protocol,
299			);
300
301			self.handle.reject_request(request_id);
302			self.metrics.register_inbound_request_failure("banned-peer");
303			return;
304		}
305
306		let (tx, rx) = oneshot::channel();
307
308		match inbound_queue.try_send(IncomingRequest {
309			peer: peer.into(),
310			payload: request,
311			pending_response: tx,
312		}) {
313			Ok(_) => {
314				self.pending_outbound_responses.push(Box::pin(async move {
315					(peer, request_id, rx.await.map_err(|_| ()), Instant::now())
316				}));
317			},
318			Err(error) => {
319				log::trace!(
320					target: LOG_TARGET,
321					"{:?}: dropping request from {peer:?} ({request_id:?}), inbound queue full",
322					self.protocol,
323				);
324
325				self.handle.reject_request(request_id);
326				self.metrics.register_inbound_request_failure(error.to_string().as_ref());
327			},
328		}
329	}
330
331	/// Handle received inbound response.
332	fn on_inbound_response(
333		&mut self,
334		peer: litep2p::PeerId,
335		request_id: RequestId,
336		_fallback: Option<litep2p::ProtocolName>,
337		response: Vec<u8>,
338	) {
339		match self.pending_inbound_responses.remove(&request_id) {
340			None => log::warn!(
341				target: LOG_TARGET,
342				"{:?}: response received for {peer:?} but {request_id:?} doesn't exist",
343				self.protocol,
344			),
345			Some(PendingRequest { tx, started, .. }) => {
346				log::trace!(
347					target: LOG_TARGET,
348					"{:?}: response received for {peer:?} ({request_id:?}), response size {:?}",
349					self.protocol,
350					response.len(),
351				);
352
353				let _ = tx.send(Ok((response, self.protocol.clone())));
354				self.metrics.register_outbound_request_success(started.elapsed());
355			},
356		}
357	}
358
359	/// Handle failed outbound request.
360	fn on_request_failed(
361		&mut self,
362		peer: litep2p::PeerId,
363		request_id: RequestId,
364		error: RequestResponseError,
365	) {
366		log::debug!(
367			target: LOG_TARGET,
368			"{:?}: request failed for {peer:?} ({request_id:?}): {error:?}",
369			self.protocol
370		);
371
372		let Some(PendingRequest { tx, fallback_request, .. }) =
373			self.pending_inbound_responses.remove(&request_id)
374		else {
375			log::warn!(
376				target: LOG_TARGET,
377				"{:?}: request failed for peer {peer:?} but {request_id:?} doesn't exist",
378				self.protocol,
379			);
380
381			return
382		};
383
384		let status = match error {
385			RequestResponseError::NotConnected =>
386				Some((RequestFailure::NotConnected, "not-connected")),
387			RequestResponseError::Rejected(reason) => {
388				let reason = match reason {
389					RejectReason::ConnectionClosed => "connection-closed",
390					RejectReason::SubstreamClosed => "substream-closed",
391					RejectReason::SubstreamOpenError(substream_error) => match substream_error {
392						SubstreamError::NegotiationError(NegotiationError::Timeout) =>
393							"substream-timeout",
394						_ => "substream-open-error",
395					},
396					RejectReason::DialFailed(None) => "dial-failed",
397					RejectReason::DialFailed(Some(ImmediateDialError::AlreadyConnected)) =>
398						"dial-already-connected",
399					RejectReason::DialFailed(Some(ImmediateDialError::PeerIdMissing)) =>
400						"dial-peerid-missing",
401					RejectReason::DialFailed(Some(ImmediateDialError::TriedToDialSelf)) =>
402						"dial-tried-to-dial-self",
403					RejectReason::DialFailed(Some(ImmediateDialError::NoAddressAvailable)) =>
404						"dial-no-address-available",
405					RejectReason::DialFailed(Some(ImmediateDialError::TaskClosed)) =>
406						"dial-task-closed",
407					RejectReason::DialFailed(Some(ImmediateDialError::ChannelClogged)) =>
408						"dial-channel-clogged",
409				};
410
411				Some((RequestFailure::Refused, reason))
412			},
413			RequestResponseError::Timeout =>
414				Some((RequestFailure::Network(OutboundFailure::Timeout), "timeout")),
415			RequestResponseError::Canceled => {
416				log::debug!(
417					target: LOG_TARGET,
418					"{}: request canceled by local node to {peer:?} ({request_id:?})",
419					self.protocol,
420				);
421				None
422			},
423			RequestResponseError::TooLargePayload => {
424				log::warn!(
425					target: LOG_TARGET,
426					"{}: tried to send too large request to {peer:?} ({request_id:?})",
427					self.protocol,
428				);
429				Some((RequestFailure::Refused, "payload-too-large"))
430			},
431			RequestResponseError::UnsupportedProtocol => match fallback_request {
432				Some((request, protocol)) => match self.request_tx.get(&protocol) {
433					Some(sender) => {
434						log::debug!(
435							target: LOG_TARGET,
436							"{}: failed to negotiate protocol with {:?}. Trying the fallback protocol ({})",
437							self.protocol,
438							peer,
439							protocol,
440						);
441
442						let outbound_request = OutboundRequest::new(
443							peer.into(),
444							request,
445							tx,
446							None,
447							IfDisconnected::ImmediateError,
448						);
449
450						// since remote peer doesn't support the main protocol (`self.protocol`),
451						// try to send the request over a fallback protocol by creating a new
452						// `OutboundRequest` from the original data, now with the fallback request
453						// payload, and send it over to the (fallback) request handler like it was
454						// a normal request.
455						let _ = sender.unbounded_send(outbound_request);
456
457						return;
458					},
459					None => {
460						log::warn!(
461							target: LOG_TARGET,
462							"{}: fallback request provided but protocol ({}) doesn't exist (peer {:?})",
463							self.protocol,
464							protocol,
465							peer,
466						);
467
468						Some((RequestFailure::Refused, "invalid-fallback-protocol"))
469					},
470				},
471				None => Some((RequestFailure::Refused, "unsupported-protocol")),
472			},
473		};
474
475		if let Some((error, reason)) = status {
476			self.metrics.register_outbound_request_failure(reason);
477			let _ = tx.send(Err(error));
478		}
479	}
480
481	/// Handle outbound response.
482	fn on_outbound_response(
483		&mut self,
484		peer: litep2p::PeerId,
485		request_id: RequestId,
486		response: OutgoingResponse,
487		started: Instant,
488	) {
489		let OutgoingResponse { result, reputation_changes, sent_feedback } = response;
490
491		for change in reputation_changes {
492			log::trace!(target: LOG_TARGET, "{}: report {peer:?}: {change:?}", self.protocol);
493			self.peerstore_handle.report_peer(peer.into(), change);
494		}
495
496		match result {
497			Err(()) => {
498				log::debug!(
499					target: LOG_TARGET,
500					"{}: response rejected ({request_id:?}) for {peer:?}",
501					self.protocol,
502				);
503
504				self.handle.reject_request(request_id);
505				self.metrics.register_inbound_request_failure("rejected");
506			},
507			Ok(response) => {
508				log::trace!(
509					target: LOG_TARGET,
510					"{}: send response ({request_id:?}) to {peer:?}, response size {}",
511					self.protocol,
512					response.len(),
513				);
514
515				match sent_feedback {
516					None => self.handle.send_response(request_id, response),
517					Some(feedback) =>
518						self.handle.send_response_with_feedback(request_id, response, feedback),
519				}
520
521				self.metrics.register_inbound_request_success(started.elapsed());
522			},
523		}
524	}
525
526	/// Start running event loop of the request-response protocol.
527	pub async fn run(mut self) {
528		loop {
529			tokio::select! {
530				event = self.handle.next() => match event {
531					None => return,
532					Some(RequestResponseEvent::RequestReceived {
533						peer,
534						fallback,
535						request_id,
536						request,
537					}) => self.on_inbound_request(peer, fallback, request_id, request),
538					Some(RequestResponseEvent::ResponseReceived { peer, request_id, fallback, response }) => {
539						self.on_inbound_response(peer, request_id, fallback, response);
540					},
541					Some(RequestResponseEvent::RequestFailed { peer, request_id, error }) => {
542						self.on_request_failed(peer, request_id, error);
543					},
544				},
545				event = self.pending_outbound_responses.next(), if !self.pending_outbound_responses.is_empty() => match event {
546					None => return,
547					Some((peer, request_id, Err(()), _)) => {
548						log::debug!(target: LOG_TARGET, "{}: reject request ({request_id:?}) from {peer:?}", self.protocol);
549
550						self.handle.reject_request(request_id);
551						self.metrics.register_inbound_request_failure("rejected");
552					}
553					Some((peer, request_id, Ok(response), started)) => {
554						self.on_outbound_response(peer, request_id, response, started);
555					}
556				},
557				event = self.request_rx.next() => match event {
558					None => return,
559					Some(outbound_request) => {
560						let OutboundRequest { peer, request, sender, dial_behavior, fallback_request } = outbound_request;
561
562						self.on_send_request(peer, request, fallback_request, sender, dial_behavior).await;
563					}
564				}
565			}
566		}
567	}
568}