sc_network/litep2p/shim/notification/
mod.rs1use crate::{
23 error::Error,
24 litep2p::shim::notification::peerset::{OpenResult, Peerset, PeersetNotificationCommand},
25 service::{
26 metrics::NotificationMetrics,
27 traits::{NotificationEvent as SubstrateNotificationEvent, ValidationResult},
28 },
29 MessageSink, NotificationService, ProtocolName,
30};
31
32use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
33use litep2p::protocol::notification::{
34 NotificationEvent, NotificationHandle, NotificationSink,
35 ValidationResult as Litep2pValidationResult,
36};
37use tokio::sync::oneshot;
38
39use sc_network_types::PeerId;
40
41use std::{collections::HashSet, fmt};
42
43pub mod config;
44pub mod peerset;
45
46#[cfg(test)]
47mod tests;
48
49const LOG_TARGET: &str = "sub-libp2p::notification";
51
52pub struct Litep2pMessageSink {
54 protocol: ProtocolName,
56
57 peer: PeerId,
59
60 sink: NotificationSink,
62
63 metrics: NotificationMetrics,
65}
66
67impl Litep2pMessageSink {
68 fn new(
70 peer: PeerId,
71 protocol: ProtocolName,
72 sink: NotificationSink,
73 metrics: NotificationMetrics,
74 ) -> Self {
75 Self { protocol, peer, sink, metrics }
76 }
77}
78
79#[async_trait::async_trait]
80impl MessageSink for Litep2pMessageSink {
81 fn send_sync_notification(&self, notification: Vec<u8>) {
83 let size = notification.len();
84
85 match self.sink.send_sync_notification(notification) {
86 Ok(_) => self.metrics.register_notification_sent(&self.protocol, size),
87 Err(error) => log::trace!(
88 target: LOG_TARGET,
89 "{}: failed to send sync notification to {:?}: {error:?}",
90 self.protocol,
91 self.peer,
92 ),
93 }
94 }
95
96 async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), Error> {
101 let size = notification.len();
102
103 match self.sink.send_async_notification(notification).await {
104 Ok(_) => {
105 self.metrics.register_notification_sent(&self.protocol, size);
106 Ok(())
107 },
108 Err(error) => {
109 log::trace!(
110 target: LOG_TARGET,
111 "{}: failed to send async notification to {:?}: {error:?}",
112 self.protocol,
113 self.peer,
114 );
115
116 Err(Error::Litep2p(error))
117 },
118 }
119 }
120}
121
122pub struct NotificationProtocol {
124 protocol: ProtocolName,
126
127 handle: NotificationHandle,
129
130 peerset: Peerset,
134
135 pending_validations: FuturesUnordered<
137 BoxFuture<'static, (PeerId, Result<ValidationResult, oneshot::error::RecvError>)>,
138 >,
139
140 pending_cancels: HashSet<litep2p::PeerId>,
142
143 metrics: NotificationMetrics,
145}
146
147impl fmt::Debug for NotificationProtocol {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.debug_struct("NotificationProtocol")
150 .field("protocol", &self.protocol)
151 .field("handle", &self.handle)
152 .finish()
153 }
154}
155
156impl NotificationProtocol {
157 pub fn new(
159 protocol: ProtocolName,
160 handle: NotificationHandle,
161 peerset: Peerset,
162 metrics: NotificationMetrics,
163 ) -> Self {
164 Self {
165 protocol,
166 handle,
167 peerset,
168 metrics,
169 pending_cancels: HashSet::new(),
170 pending_validations: FuturesUnordered::new(),
171 }
172 }
173
174 async fn on_peerset_command(&mut self, command: PeersetNotificationCommand) {
176 match command {
177 PeersetNotificationCommand::OpenSubstream { peers } => {
178 log::debug!(target: LOG_TARGET, "{}: open substreams to {peers:?}", self.protocol);
179
180 let _ = self.handle.open_substream_batch(peers.into_iter().map(From::from)).await;
181 },
182 PeersetNotificationCommand::CloseSubstream { peers } => {
183 log::debug!(target: LOG_TARGET, "{}: close substreams to {peers:?}", self.protocol);
184
185 self.handle.close_substream_batch(peers.into_iter().map(From::from)).await;
186 },
187 }
188 }
189}
190
191#[async_trait::async_trait]
192impl NotificationService for NotificationProtocol {
193 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
194 unimplemented!();
195 }
196
197 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
198 unimplemented!();
199 }
200
201 fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
202 let size = notification.len();
203
204 if let Ok(_) = self.handle.send_sync_notification(peer.into(), notification) {
205 self.metrics.register_notification_sent(&self.protocol, size);
206 }
207 }
208
209 async fn send_async_notification(
210 &mut self,
211 peer: &PeerId,
212 notification: Vec<u8>,
213 ) -> Result<(), Error> {
214 let size = notification.len();
215
216 match self.handle.send_async_notification(peer.into(), notification).await {
217 Ok(_) => {
218 self.metrics.register_notification_sent(&self.protocol, size);
219 Ok(())
220 },
221 Err(_) => Err(Error::ChannelClosed),
222 }
223 }
224
225 async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
227 self.handle.set_handshake(handshake);
228
229 Ok(())
230 }
231
232 fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
237 self.handle.set_handshake(handshake);
238
239 Ok(())
240 }
241
242 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
245 unimplemented!("clonable `NotificationService` not supported by `litep2p`");
246 }
247
248 fn protocol(&self) -> &ProtocolName {
250 &self.protocol
251 }
252
253 fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>> {
255 self.handle.notification_sink(peer.into()).map(|sink| {
256 let sink: Box<dyn MessageSink> = Box::new(Litep2pMessageSink::new(
257 *peer,
258 self.protocol.clone(),
259 sink,
260 self.metrics.clone(),
261 ));
262 sink
263 })
264 }
265
266 async fn next_event(&mut self) -> Option<SubstrateNotificationEvent> {
268 loop {
269 tokio::select! {
270 biased;
271
272 event = self.handle.next() => match event? {
273 NotificationEvent::ValidateSubstream { peer, handshake, .. } => {
274 if let ValidationResult::Reject = self.peerset.report_inbound_substream(peer.into()) {
275 self.handle.send_validation_result(peer, Litep2pValidationResult::Reject);
276 continue;
277 }
278
279 let (tx, rx) = oneshot::channel();
280 self.pending_validations.push(Box::pin(async move { (peer.into(), rx.await) }));
281
282 log::trace!(target: LOG_TARGET, "{}: validate substream for {peer:?}", self.protocol);
283
284 return Some(SubstrateNotificationEvent::ValidateInboundSubstream {
285 peer: peer.into(),
286 handshake,
287 result_tx: tx,
288 });
289 }
290 NotificationEvent::NotificationStreamOpened {
291 peer,
292 fallback,
293 handshake,
294 direction,
295 ..
296 } => {
297 self.metrics.register_substream_opened(&self.protocol);
298
299 match self.peerset.report_substream_opened(peer.into(), direction.into()) {
300 OpenResult::Reject => {
301 let _ = self.handle.close_substream_batch(vec![peer].into_iter().map(From::from)).await;
302 self.pending_cancels.insert(peer);
303
304 continue
305 }
306 OpenResult::Accept { direction } => {
307 log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
308
309 return Some(SubstrateNotificationEvent::NotificationStreamOpened {
310 peer: peer.into(),
311 handshake,
312 direction,
313 negotiated_fallback: fallback.map(From::from),
314 });
315 }
316 }
317 }
318 NotificationEvent::NotificationStreamClosed {
319 peer,
320 } => {
321 log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
322
323 self.metrics.register_substream_closed(&self.protocol);
324 self.peerset.report_substream_closed(peer.into());
325
326 if self.pending_cancels.remove(&peer) {
327 log::debug!(
328 target: LOG_TARGET,
329 "{}: substream closed to canceled peer ({peer:?})",
330 self.protocol
331 );
332 continue
333 }
334
335 return Some(SubstrateNotificationEvent::NotificationStreamClosed { peer: peer.into() })
336 }
337 NotificationEvent::NotificationStreamOpenFailure {
338 peer,
339 error,
340 } => {
341 log::trace!(target: LOG_TARGET, "{}: open failure for {peer:?}", self.protocol);
342 self.peerset.report_substream_open_failure(peer.into(), error);
343 }
344 NotificationEvent::NotificationReceived {
345 peer,
346 notification,
347 } => {
348 self.metrics.register_notification_received(&self.protocol, notification.len());
349
350 if !self.pending_cancels.contains(&peer) {
351 return Some(SubstrateNotificationEvent::NotificationReceived {
352 peer: peer.into(),
353 notification: notification.to_vec(),
354 });
355 }
356 }
357 },
358 result = self.pending_validations.next(), if !self.pending_validations.is_empty() => {
359 let (peer, result) = result?;
360 let validation_result = match result {
361 Ok(ValidationResult::Accept) => Litep2pValidationResult::Accept,
362 _ => {
363 self.peerset.report_substream_rejected(peer);
364 Litep2pValidationResult::Reject
365 }
366 };
367
368 self.handle.send_validation_result(peer.into(), validation_result);
369 }
370 command = self.peerset.next() => self.on_peerset_command(command?).await,
371 }
372 }
373 }
374}