referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/shim/notification/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Shim for `litep2p::NotificationHandle` to combine `Peerset`-like behavior
20//! with `NotificationService`.
21
22use crate::{
23	error::Error,
24	litep2p::shim::notification::peerset::{OpenResult, Peerset, PeersetNotificationCommand},
25	service::{
26		metrics::NotificationMetrics,
27		traits::{NotificationEvent as SubstrateNotificationEvent, ValidationResult},
28	},
29	MessageSink, NotificationService, ProtocolName,
30};
31
32use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
33use litep2p::protocol::notification::{
34	NotificationEvent, NotificationHandle, NotificationSink,
35	ValidationResult as Litep2pValidationResult,
36};
37use tokio::sync::oneshot;
38
39use sc_network_types::PeerId;
40
41use std::{collections::HashSet, fmt};
42
43pub mod config;
44pub mod peerset;
45
46#[cfg(test)]
47mod tests;
48
49/// Logging target for the file.
50const LOG_TARGET: &str = "sub-libp2p::notification";
51
52/// Wrapper over `litep2p`'s notification sink.
53pub struct Litep2pMessageSink {
54	/// Protocol.
55	protocol: ProtocolName,
56
57	/// Remote peer ID.
58	peer: PeerId,
59
60	/// Notification sink.
61	sink: NotificationSink,
62
63	/// Notification metrics.
64	metrics: NotificationMetrics,
65}
66
67impl Litep2pMessageSink {
68	/// Create new [`Litep2pMessageSink`].
69	fn new(
70		peer: PeerId,
71		protocol: ProtocolName,
72		sink: NotificationSink,
73		metrics: NotificationMetrics,
74	) -> Self {
75		Self { protocol, peer, sink, metrics }
76	}
77}
78
79#[async_trait::async_trait]
80impl MessageSink for Litep2pMessageSink {
81	/// Send synchronous `notification` to the peer associated with this [`MessageSink`].
82	fn send_sync_notification(&self, notification: Vec<u8>) {
83		let size = notification.len();
84
85		match self.sink.send_sync_notification(notification) {
86			Ok(_) => self.metrics.register_notification_sent(&self.protocol, size),
87			Err(error) => log::trace!(
88				target: LOG_TARGET,
89				"{}: failed to send sync notification to {:?}: {error:?}",
90				self.protocol,
91				self.peer,
92			),
93		}
94	}
95
96	/// Send an asynchronous `notification` to to the peer associated with this [`MessageSink`],
97	/// allowing sender to exercise backpressure.
98	///
99	/// Returns an error if the peer does not exist.
100	async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), Error> {
101		let size = notification.len();
102
103		match self.sink.send_async_notification(notification).await {
104			Ok(_) => {
105				self.metrics.register_notification_sent(&self.protocol, size);
106				Ok(())
107			},
108			Err(error) => {
109				log::trace!(
110					target: LOG_TARGET,
111					"{}: failed to send async notification to {:?}: {error:?}",
112					self.protocol,
113					self.peer,
114				);
115
116				Err(Error::Litep2p(error))
117			},
118		}
119	}
120}
121
122/// Notification protocol implementation.
123pub struct NotificationProtocol {
124	/// Protocol name.
125	protocol: ProtocolName,
126
127	/// `litep2p` notification handle.
128	handle: NotificationHandle,
129
130	/// Peerset for the notification protocol.
131	///
132	/// Listens to peering-related events and either opens or closes substreams to remote peers.
133	peerset: Peerset,
134
135	/// Pending validations for inbound substreams.
136	pending_validations: FuturesUnordered<
137		BoxFuture<'static, (PeerId, Result<ValidationResult, oneshot::error::RecvError>)>,
138	>,
139
140	/// Pending cancels.
141	pending_cancels: HashSet<litep2p::PeerId>,
142
143	/// Notification metrics.
144	metrics: NotificationMetrics,
145}
146
147impl fmt::Debug for NotificationProtocol {
148	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149		f.debug_struct("NotificationProtocol")
150			.field("protocol", &self.protocol)
151			.field("handle", &self.handle)
152			.finish()
153	}
154}
155
156impl NotificationProtocol {
157	/// Create new [`NotificationProtocol`].
158	pub fn new(
159		protocol: ProtocolName,
160		handle: NotificationHandle,
161		peerset: Peerset,
162		metrics: NotificationMetrics,
163	) -> Self {
164		Self {
165			protocol,
166			handle,
167			peerset,
168			metrics,
169			pending_cancels: HashSet::new(),
170			pending_validations: FuturesUnordered::new(),
171		}
172	}
173
174	/// Handle `Peerset` command.
175	async fn on_peerset_command(&mut self, command: PeersetNotificationCommand) {
176		match command {
177			PeersetNotificationCommand::OpenSubstream { peers } => {
178				log::debug!(target: LOG_TARGET, "{}: open substreams to {peers:?}", self.protocol);
179
180				let _ = self.handle.open_substream_batch(peers.into_iter().map(From::from)).await;
181			},
182			PeersetNotificationCommand::CloseSubstream { peers } => {
183				log::debug!(target: LOG_TARGET, "{}: close substreams to {peers:?}", self.protocol);
184
185				self.handle.close_substream_batch(peers.into_iter().map(From::from)).await;
186			},
187		}
188	}
189}
190
191#[async_trait::async_trait]
192impl NotificationService for NotificationProtocol {
193	async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
194		unimplemented!();
195	}
196
197	async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
198		unimplemented!();
199	}
200
201	fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
202		let size = notification.len();
203
204		if let Ok(_) = self.handle.send_sync_notification(peer.into(), notification) {
205			self.metrics.register_notification_sent(&self.protocol, size);
206		}
207	}
208
209	async fn send_async_notification(
210		&mut self,
211		peer: &PeerId,
212		notification: Vec<u8>,
213	) -> Result<(), Error> {
214		let size = notification.len();
215
216		match self.handle.send_async_notification(peer.into(), notification).await {
217			Ok(_) => {
218				self.metrics.register_notification_sent(&self.protocol, size);
219				Ok(())
220			},
221			Err(_) => Err(Error::ChannelClosed),
222		}
223	}
224
225	/// Set handshake for the notification protocol replacing the old handshake.
226	async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
227		self.handle.set_handshake(handshake);
228
229		Ok(())
230	}
231
232	/// Set handshake for the notification protocol replacing the old handshake.
233	///
234	/// For `litep2p` this is identical to `NotificationService::set_handshake()` since `litep2p`
235	/// allows updating the handshake synchronously.
236	fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
237		self.handle.set_handshake(handshake);
238
239		Ok(())
240	}
241
242	/// Make a copy of the object so it can be shared between protocol components
243	/// who wish to have access to the same underlying notification protocol.
244	fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
245		unimplemented!("clonable `NotificationService` not supported by `litep2p`");
246	}
247
248	/// Get protocol name of the `NotificationService`.
249	fn protocol(&self) -> &ProtocolName {
250		&self.protocol
251	}
252
253	/// Get message sink of the peer.
254	fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>> {
255		self.handle.notification_sink(peer.into()).map(|sink| {
256			let sink: Box<dyn MessageSink> = Box::new(Litep2pMessageSink::new(
257				*peer,
258				self.protocol.clone(),
259				sink,
260				self.metrics.clone(),
261			));
262			sink
263		})
264	}
265
266	/// Get next event from the `Notifications` event stream.
267	async fn next_event(&mut self) -> Option<SubstrateNotificationEvent> {
268		loop {
269			tokio::select! {
270				biased;
271
272				event = self.handle.next() => match event? {
273					NotificationEvent::ValidateSubstream { peer, handshake, .. } => {
274						if let ValidationResult::Reject = self.peerset.report_inbound_substream(peer.into()) {
275							self.handle.send_validation_result(peer, Litep2pValidationResult::Reject);
276							continue;
277						}
278
279						let (tx, rx) = oneshot::channel();
280						self.pending_validations.push(Box::pin(async move { (peer.into(), rx.await) }));
281
282						log::trace!(target: LOG_TARGET, "{}: validate substream for {peer:?}", self.protocol);
283
284						return Some(SubstrateNotificationEvent::ValidateInboundSubstream {
285							peer: peer.into(),
286							handshake,
287							result_tx: tx,
288						});
289					}
290					NotificationEvent::NotificationStreamOpened {
291						peer,
292						fallback,
293						handshake,
294						direction,
295						..
296					} => {
297						self.metrics.register_substream_opened(&self.protocol);
298
299						match self.peerset.report_substream_opened(peer.into(), direction.into()) {
300							OpenResult::Reject => {
301								let _ = self.handle.close_substream_batch(vec![peer].into_iter().map(From::from)).await;
302								self.pending_cancels.insert(peer);
303
304								continue
305							}
306							OpenResult::Accept { direction } => {
307								log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
308
309								return Some(SubstrateNotificationEvent::NotificationStreamOpened {
310									peer: peer.into(),
311									handshake,
312									direction,
313									negotiated_fallback: fallback.map(From::from),
314								});
315							}
316						}
317					}
318					NotificationEvent::NotificationStreamClosed {
319						peer,
320					} => {
321						log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
322
323						self.metrics.register_substream_closed(&self.protocol);
324						self.peerset.report_substream_closed(peer.into());
325
326						if self.pending_cancels.remove(&peer) {
327							log::debug!(
328								target: LOG_TARGET,
329								"{}: substream closed to canceled peer ({peer:?})",
330								self.protocol
331							);
332							continue
333						}
334
335						return Some(SubstrateNotificationEvent::NotificationStreamClosed { peer: peer.into() })
336					}
337					NotificationEvent::NotificationStreamOpenFailure {
338						peer,
339						error,
340					} => {
341						log::trace!(target: LOG_TARGET, "{}: open failure for {peer:?}", self.protocol);
342						self.peerset.report_substream_open_failure(peer.into(), error);
343					}
344					NotificationEvent::NotificationReceived {
345						peer,
346						notification,
347					} => {
348						self.metrics.register_notification_received(&self.protocol, notification.len());
349
350						if !self.pending_cancels.contains(&peer) {
351							return Some(SubstrateNotificationEvent::NotificationReceived {
352								peer: peer.into(),
353								notification: notification.to_vec(),
354							});
355						}
356					}
357				},
358				result = self.pending_validations.next(), if !self.pending_validations.is_empty() => {
359					let (peer, result) = result?;
360					let validation_result = match result {
361						Ok(ValidationResult::Accept) => Litep2pValidationResult::Accept,
362						_ => {
363							self.peerset.report_substream_rejected(peer);
364							Litep2pValidationResult::Reject
365						}
366					};
367
368					self.handle.send_validation_result(peer.into(), validation_result);
369				}
370				command = self.peerset.next() => self.on_peerset_command(command?).await,
371			}
372		}
373	}
374}