litep2p/protocol/notification/mod.rs
1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Notification protocol implementation.
22
23use crate::{
24 error::{Error, SubstreamError},
25 executor::Executor,
26 protocol::{
27 self,
28 notification::{
29 connection::Connection,
30 handle::NotificationEventHandle,
31 negotiation::{HandshakeEvent, HandshakeService},
32 },
33 TransportEvent, TransportService,
34 },
35 substream::Substream,
36 types::{protocol::ProtocolName, SubstreamId},
37 PeerId, DEFAULT_CHANNEL_SIZE,
38};
39
40use bytes::BytesMut;
41use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
42use multiaddr::Multiaddr;
43use tokio::sync::{
44 mpsc::{channel, Receiver, Sender},
45 oneshot,
46};
47
48use std::{collections::HashMap, sync::Arc, time::Duration};
49
50pub use config::{Config, ConfigBuilder};
51pub use handle::{NotificationHandle, NotificationSink};
52pub use types::{
53 Direction, NotificationCommand, NotificationError, NotificationEvent, ValidationResult,
54};
55
56mod config;
57mod connection;
58mod handle;
59mod negotiation;
60mod types;
61
62#[cfg(test)]
63mod tests;
64
65/// Logging target for the file.
66const LOG_TARGET: &str = "litep2p::notification";
67
68/// Connection state.
69///
70/// Used to track transport level connectivity state when there is a pending validation.
71/// See [`PeerState::ValidationPending`] for more details.
72#[derive(Debug, PartialEq, Eq)]
73enum ConnectionState {
74 /// There is a active, transport-level connection open to the peer.
75 Open,
76
77 /// There is no transport-level connection open to the peer.
78 Closed,
79}
80
81/// Inbound substream state.
82#[derive(Debug)]
83enum InboundState {
84 /// Substream is closed.
85 Closed,
86
87 /// Handshake is being read from the remote node.
88 ReadingHandshake,
89
90 /// Substream and its handshake are being validated by the user protocol.
91 Validating {
92 /// Inbound substream.
93 inbound: Substream,
94 },
95
96 /// Handshake is being sent to the remote node.
97 SendingHandshake,
98
99 /// Substream is open.
100 Open {
101 /// Inbound substream.
102 inbound: Substream,
103 },
104}
105
106/// Outbound substream state.
107#[derive(Debug)]
108enum OutboundState {
109 /// Substream is closed.
110 Closed,
111
112 /// Outbound substream initiated.
113 OutboundInitiated {
114 /// Substream ID.
115 substream: SubstreamId,
116 },
117
118 /// Substream is in the state of being negotiated.
119 ///
120 /// This process entails sending local node's handshake and reading back the remote node's
121 /// handshake if they've accepted the substream or detecting that the substream was closed
122 /// in case the substream was rejected.
123 Negotiating,
124
125 /// Substream is open.
126 Open {
127 /// Received handshake.
128 handshake: Vec<u8>,
129
130 /// Outbound substream.
131 outbound: Substream,
132 },
133}
134
135impl OutboundState {
136 /// Get pending outboud substream ID, if it exists.
137 fn pending_open(&self) -> Option<SubstreamId> {
138 match &self {
139 OutboundState::OutboundInitiated { substream } => Some(*substream),
140 _ => None,
141 }
142 }
143}
144
145#[derive(Debug)]
146enum PeerState {
147 /// Peer state is poisoned due to invalid state transition.
148 Poisoned,
149
150 /// Validation for an inbound substream is still pending.
151 ///
152 /// In order to enforce valid state transitions, `NotificationProtocol` keeps track of pending
153 /// validations across connectivity events (open/closed) and enforces that no activity happens
154 /// for any peer that is still awaiting validation for their inbound substream.
155 ///
156 /// If connection closes while the substream is being validated, instead of removing peer from
157 /// `peers`, the peer state is set as `ValidationPending` which indicates to the state machine
158 /// that a response for a inbound substream is pending validation. The substream itself will be
159 /// dead by the time validation is received if the peer state is `ValidationPending` since the
160 /// substream was part of a previous, now-closed substream but this state allows
161 /// `NotificationProtocol` to enforce correct state transitions by, e.g., rejecting new inbound
162 /// substream while a previous substream is still being validated or rejecting outbound
163 /// substreams on new connections if that same condition holds.
164 ValidationPending {
165 /// What is current connectivity state of the peer.
166 ///
167 /// If `state` is `ConnectionState::Closed` when the validation is finally received, peer
168 /// is removed from `peer` and if the `state` is `ConnectionState::Open`, peer is moved to
169 /// state `PeerState::Closed` and user is allowed to retry opening an outbound substream.
170 state: ConnectionState,
171 },
172
173 /// Connection to peer is closed.
174 Closed {
175 /// Connection might have been closed while there was an outbound substream still pending.
176 ///
177 /// To handle this state transition correctly in case the substream opens after the
178 /// connection is considered closed, store the `SubstreamId` to that it can be verified in
179 /// case the substream ever opens.
180 pending_open: Option<SubstreamId>,
181 },
182
183 /// Peer is being dialed in order to open an outbound substream to them.
184 Dialing,
185
186 /// Outbound substream initiated.
187 OutboundInitiated {
188 /// Substream ID.
189 substream: SubstreamId,
190 },
191
192 /// Substream is being validated.
193 Validating {
194 /// Protocol.
195 protocol: ProtocolName,
196
197 /// Fallback protocol, if the substream was negotiated using a fallback name.
198 fallback: Option<ProtocolName>,
199
200 /// Outbound protocol state.
201 outbound: OutboundState,
202
203 /// Inbound protocol state.
204 inbound: InboundState,
205
206 /// Direction.
207 direction: Direction,
208 },
209
210 /// Notification stream has been opened.
211 Open {
212 /// `Oneshot::Sender` for shutting down the connection.
213 shutdown: oneshot::Sender<()>,
214 },
215}
216
217/// Peer context.
218#[derive(Debug)]
219struct PeerContext {
220 /// Peer state.
221 state: PeerState,
222}
223
224impl PeerContext {
225 /// Create new [`PeerContext`].
226 fn new() -> Self {
227 Self {
228 state: PeerState::Closed { pending_open: None },
229 }
230 }
231}
232
233pub(crate) struct NotificationProtocol {
234 /// Transport service.
235 service: TransportService,
236
237 /// Protocol.
238 protocol: ProtocolName,
239
240 /// Auto accept inbound substream if the outbound substream was initiated by the local node.
241 auto_accept: bool,
242
243 /// TX channel passed to the protocol used for sending events.
244 event_handle: NotificationEventHandle,
245
246 /// TX channel for sending shut down notifications from connection handlers to
247 /// [`NotificationProtocol`].
248 shutdown_tx: Sender<PeerId>,
249
250 /// RX channel for receiving shutdown notifications from the connection handlers.
251 shutdown_rx: Receiver<PeerId>,
252
253 /// RX channel passed to the protocol used for receiving commands.
254 command_rx: Receiver<NotificationCommand>,
255
256 /// TX channel given to connection handlers for sending notifications.
257 notif_tx: Sender<(PeerId, BytesMut)>,
258
259 /// Connected peers.
260 peers: HashMap<PeerId, PeerContext>,
261
262 /// Pending outbound substreams.
263 pending_outbound: HashMap<SubstreamId, PeerId>,
264
265 /// Handshaking service which reads and writes the handshakes to inbound
266 /// and outbound substreams asynchronously.
267 negotiation: HandshakeService,
268
269 /// Synchronous channel size.
270 sync_channel_size: usize,
271
272 /// Asynchronous channel size.
273 async_channel_size: usize,
274
275 /// Executor for connection handlers.
276 executor: Arc<dyn Executor>,
277
278 /// Pending substream validations.
279 pending_validations: FuturesUnordered<BoxFuture<'static, (PeerId, ValidationResult)>>,
280
281 /// Timers for pending outbound substreams.
282 timers: FuturesUnordered<BoxFuture<'static, PeerId>>,
283
284 /// Should `NotificationProtocol` attempt to dial the peer.
285 should_dial: bool,
286}
287
288impl NotificationProtocol {
289 pub(crate) fn new(
290 service: TransportService,
291 config: Config,
292 executor: Arc<dyn Executor>,
293 ) -> Self {
294 let (shutdown_tx, shutdown_rx) = channel(DEFAULT_CHANNEL_SIZE);
295
296 Self {
297 service,
298 shutdown_tx,
299 shutdown_rx,
300 executor,
301 peers: HashMap::new(),
302 protocol: config.protocol_name,
303 auto_accept: config.auto_accept,
304 pending_validations: FuturesUnordered::new(),
305 timers: FuturesUnordered::new(),
306 event_handle: NotificationEventHandle::new(config.event_tx),
307 notif_tx: config.notif_tx,
308 command_rx: config.command_rx,
309 pending_outbound: HashMap::new(),
310 negotiation: HandshakeService::new(config.handshake),
311 sync_channel_size: config.sync_channel_size,
312 async_channel_size: config.async_channel_size,
313 should_dial: config.should_dial,
314 }
315 }
316
317 /// Connection established to remote node.
318 ///
319 /// If the peer already exists, the only valid state for it is `Dialing` as it indicates that
320 /// the user tried to open a substream to a peer who was not connected to local node.
321 ///
322 /// Any other state indicates that there's an error in the state transition logic.
323 async fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
324 tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection established");
325
326 let Some(context) = self.peers.get_mut(&peer) else {
327 self.peers.insert(peer, PeerContext::new());
328 return Ok(());
329 };
330
331 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
332 PeerState::Dialing => {
333 tracing::trace!(
334 target: LOG_TARGET,
335 ?peer,
336 protocol = %self.protocol,
337 "dial succeeded, open substream to peer",
338 );
339
340 context.state = PeerState::Closed { pending_open: None };
341 self.on_open_substream(peer).await
342 }
343 // connection established but validation is still pending
344 //
345 // update the connection state so that `NotificationProtocol` can proceed
346 // to correct state after the validation result has beern received
347 PeerState::ValidationPending { state } => {
348 debug_assert_eq!(state, ConnectionState::Closed);
349
350 tracing::debug!(
351 target: LOG_TARGET,
352 ?peer,
353 protocol = %self.protocol,
354 "new connection established while validation still pending",
355 );
356
357 context.state = PeerState::ValidationPending {
358 state: ConnectionState::Open,
359 };
360
361 Ok(())
362 }
363 state => {
364 tracing::error!(
365 target: LOG_TARGET,
366 ?peer,
367 protocol = %self.protocol,
368 ?state,
369 "state mismatch: peer already exists",
370 );
371 debug_assert!(false);
372 Err(Error::PeerAlreadyExists(peer))
373 }
374 }
375 }
376
377 /// Connection closed to remote node.
378 ///
379 /// If the connection was considered open (both substreams were open), user is notified that
380 /// the notification stream was closed.
381 ///
382 /// If the connection was still in progress (either substream was not fully open), the user is
383 /// reported about it only if they had opened an outbound substream (outbound is either fully
384 /// open, it had been initiated or the substream was under negotiation).
385 async fn on_connection_closed(&mut self, peer: PeerId) -> crate::Result<()> {
386 tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
387
388 self.pending_outbound.retain(|_, p| p != &peer);
389
390 let Some(context) = self.peers.remove(&peer) else {
391 tracing::error!(
392 target: LOG_TARGET,
393 ?peer,
394 protocol = %self.protocol,
395 "state mismatch: peer doesn't exist",
396 );
397 debug_assert!(false);
398 return Err(Error::PeerDoesntExist(peer));
399 };
400
401 // clean up all pending state for the peer
402 self.negotiation.remove_outbound(&peer);
403 self.negotiation.remove_inbound(&peer);
404
405 match context.state {
406 // outbound initiated, report open failure to peer
407 PeerState::OutboundInitiated { .. } => {
408 self.event_handle
409 .report_notification_stream_open_failure(peer, NotificationError::Rejected)
410 .await;
411 }
412 // substream fully open, report that the notification stream is closed
413 PeerState::Open { shutdown } => {
414 let _ = shutdown.send(());
415 }
416 // if the substream was being validated, user must be notified that the substream is
417 // now considered rejected if they had been made aware of the existence of the pending
418 // connection
419 PeerState::Validating {
420 outbound, inbound, ..
421 } => {
422 match (outbound, inbound) {
423 // substream was being validated by the protocol when the connection was closed
424 (OutboundState::Closed, InboundState::Validating { .. }) => {
425 tracing::debug!(
426 target: LOG_TARGET,
427 ?peer,
428 protocol = %self.protocol,
429 "connection closed while validation pending",
430 );
431
432 self.peers.insert(
433 peer,
434 PeerContext {
435 state: PeerState::ValidationPending {
436 state: ConnectionState::Closed,
437 },
438 },
439 );
440 }
441 // user either initiated an outbound substream or an outbound substream was
442 // opened/being opened as a result of an accepted inbound substream but was not
443 // yet fully open
444 //
445 // to have consistent state tracking in the user protocol, substream rejection
446 // must be reported to the user
447 (
448 OutboundState::OutboundInitiated { .. }
449 | OutboundState::Negotiating
450 | OutboundState::Open { .. },
451 _,
452 ) => {
453 tracing::debug!(
454 target: LOG_TARGET,
455 ?peer,
456 protocol = %self.protocol,
457 "connection closed outbound substream under negotiation",
458 );
459
460 self.event_handle
461 .report_notification_stream_open_failure(
462 peer,
463 NotificationError::Rejected,
464 )
465 .await;
466 }
467 (_, _) => {}
468 }
469 }
470 // pending validations must be tracked across connection open/close events
471 PeerState::ValidationPending { .. } => {
472 tracing::debug!(
473 target: LOG_TARGET,
474 ?peer,
475 protocol = %self.protocol,
476 "validation pending while connection closed",
477 );
478
479 self.peers.insert(
480 peer,
481 PeerContext {
482 state: PeerState::ValidationPending {
483 state: ConnectionState::Closed,
484 },
485 },
486 );
487 }
488 _ => {}
489 }
490
491 Ok(())
492 }
493
494 /// Local node opened a substream to remote node.
495 ///
496 /// The connection can be in three different states:
497 /// - this is the first substream that was opened and thus the connection was initiated by the
498 /// local node
499 /// - this is a response to a previously received inbound substream which the local node
500 /// accepted and as a result, opened its own substream
501 /// - local and remote nodes opened substreams at the same time
502 ///
503 /// In the first case, the local node's handshake is sent to remote node and the substream is
504 /// polled in the background until they either send their handshake or close the substream.
505 ///
506 /// For the second case, the connection was initiated by the remote node and the substream was
507 /// accepted by the local node which initiated an outbound substream to the remote node.
508 /// The only valid states for this case are [`InboundState::Open`],
509 /// and [`InboundState::SendingHandshake`] as they imply
510 /// that the inbound substream have been accepted by the local node and this opened outbound
511 /// substream is a result of a valid state transition.
512 ///
513 /// For the third case, if the nodes have opened substreams at the same time, the outbound state
514 /// must be [`OutboundState::OutboundInitiated`] to ascertain that the an outbound substream was
515 /// actually opened. Any other state would be a state mismatch and would mean that the
516 /// connection is opening substreams without the permission of the protocol handler.
517 async fn on_outbound_substream(
518 &mut self,
519 protocol: ProtocolName,
520 fallback: Option<ProtocolName>,
521 peer: PeerId,
522 substream_id: SubstreamId,
523 outbound: Substream,
524 ) -> crate::Result<()> {
525 tracing::debug!(
526 target: LOG_TARGET,
527 ?peer,
528 ?protocol,
529 ?substream_id,
530 "handle outbound substream",
531 );
532
533 // peer must exist since an outbound substream was received from them
534 let Some(context) = self.peers.get_mut(&peer) else {
535 tracing::error!(target: LOG_TARGET, ?peer, "peer doesn't exist for outbound substream");
536 debug_assert!(false);
537 return Err(Error::PeerDoesntExist(peer));
538 };
539
540 let pending_peer = self.pending_outbound.remove(&substream_id);
541
542 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
543 // the connection was initiated by the local node, send handshake to remote and wait to
544 // receive their handshake back
545 PeerState::OutboundInitiated { substream } => {
546 debug_assert!(substream == substream_id);
547 debug_assert!(pending_peer == Some(peer));
548
549 tracing::trace!(
550 target: LOG_TARGET,
551 ?peer,
552 protocol = %self.protocol,
553 ?fallback,
554 ?substream_id,
555 "negotiate outbound protocol",
556 );
557
558 self.negotiation.negotiate_outbound(peer, outbound);
559 context.state = PeerState::Validating {
560 protocol,
561 fallback,
562 inbound: InboundState::Closed,
563 outbound: OutboundState::Negotiating,
564 direction: Direction::Outbound,
565 };
566 }
567 PeerState::Validating {
568 protocol,
569 fallback,
570 inbound,
571 direction,
572 outbound: outbound_state,
573 } => {
574 // the inbound substream has been accepted by the local node since the handshake has
575 // been read and the local handshake has either already been sent or
576 // it's in the process of being sent.
577 match inbound {
578 InboundState::SendingHandshake | InboundState::Open { .. } => {
579 context.state = PeerState::Validating {
580 protocol,
581 fallback,
582 inbound,
583 direction,
584 outbound: OutboundState::Negotiating,
585 };
586 self.negotiation.negotiate_outbound(peer, outbound);
587 }
588 // nodes have opened substreams at the same time
589 inbound_state => match outbound_state {
590 OutboundState::OutboundInitiated { substream } => {
591 debug_assert!(substream == substream_id);
592
593 context.state = PeerState::Validating {
594 protocol,
595 fallback,
596 direction,
597 inbound: inbound_state,
598 outbound: OutboundState::Negotiating,
599 };
600 self.negotiation.negotiate_outbound(peer, outbound);
601 }
602 // invalid state: more than one outbound substream has been opened
603 inner_state => {
604 tracing::error!(
605 target: LOG_TARGET,
606 ?peer,
607 %protocol,
608 ?substream_id,
609 ?inbound_state,
610 ?inner_state,
611 "invalid state, expected `OutboundInitiated`",
612 );
613
614 let _ = outbound.close().await;
615 debug_assert!(false);
616 }
617 },
618 }
619 }
620 // the connection may have been closed while an outbound substream was pending
621 // if the outbound substream was initiated successfully, close it and reset
622 // `pending_open`
623 PeerState::Closed { pending_open } if pending_open == Some(substream_id) => {
624 let _ = outbound.close().await;
625
626 context.state = PeerState::Closed { pending_open: None };
627 }
628 state => {
629 tracing::error!(
630 target: LOG_TARGET,
631 ?peer,
632 %protocol,
633 ?substream_id,
634 ?state,
635 "invalid state: more than one outbound substream opened",
636 );
637
638 let _ = outbound.close().await;
639 debug_assert!(false);
640 }
641 }
642
643 Ok(())
644 }
645
646 /// Remote opened a substream to local node.
647 ///
648 /// The peer can be in four different states for the inbound substream to be considered valid:
649 /// - the connection is closed
650 /// - conneection is open but substream validation from a previous connection is still pending
651 /// - outbound substream has been opened but not yet acknowledged by the remote peer
652 /// - outbound substream has been opened and acknowledged by the remote peer and it's being
653 /// negotiated
654 ///
655 /// If remote opened more than one substream, the new substream is simply discarded.
656 async fn on_inbound_substream(
657 &mut self,
658 protocol: ProtocolName,
659 fallback: Option<ProtocolName>,
660 peer: PeerId,
661 substream: Substream,
662 ) -> crate::Result<()> {
663 // peer must exist since an inbound substream was received from them
664 let Some(context) = self.peers.get_mut(&peer) else {
665 tracing::error!(target: LOG_TARGET, ?peer, "peer doesn't exist for inbound substream");
666 debug_assert!(false);
667 return Err(Error::PeerDoesntExist(peer));
668 };
669
670 tracing::debug!(
671 target: LOG_TARGET,
672 ?peer,
673 %protocol,
674 ?fallback,
675 state = ?context.state,
676 "handle inbound substream",
677 );
678
679 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
680 // inbound substream of a previous connection is still pending validation,
681 // reject any new inbound substreams until an answer is heard from the user
682 state @ PeerState::ValidationPending { .. } => {
683 tracing::debug!(
684 target: LOG_TARGET,
685 ?peer,
686 %protocol,
687 ?fallback,
688 ?state,
689 "validation for previous substream still pending",
690 );
691
692 let _ = substream.close().await;
693 context.state = state;
694 }
695 // outbound substream for previous connection still pending, reject inbound substream
696 // and wait for the outbound substream state to conclude as either succeeded or failed
697 // before accepting any inbound substreams.
698 PeerState::Closed {
699 pending_open: Some(substream_id),
700 } => {
701 tracing::debug!(
702 target: LOG_TARGET,
703 ?peer,
704 protocol = %self.protocol,
705 "received inbound substream while outbound substream opening, rejecting",
706 );
707 let _ = substream.close().await;
708
709 context.state = PeerState::Closed {
710 pending_open: Some(substream_id),
711 };
712 }
713 // the peer state is closed so this is a fresh inbound substream.
714 PeerState::Closed { pending_open: None } => {
715 self.negotiation.read_handshake(peer, substream);
716
717 context.state = PeerState::Validating {
718 protocol,
719 fallback,
720 direction: Direction::Inbound,
721 inbound: InboundState::ReadingHandshake,
722 outbound: OutboundState::Closed,
723 };
724 }
725 // if the connection is under validation (so an outbound substream has been opened and
726 // it's still pending or under negotiation), the only valid state for the
727 // inbound state is closed as it indicates that there isn't an inbound substream yet for
728 // the remote node duplicate substreams are prohibited.
729 PeerState::Validating {
730 protocol,
731 fallback,
732 outbound,
733 direction,
734 inbound: InboundState::Closed,
735 } => {
736 self.negotiation.read_handshake(peer, substream);
737
738 context.state = PeerState::Validating {
739 protocol,
740 fallback,
741 outbound,
742 direction,
743 inbound: InboundState::ReadingHandshake,
744 };
745 }
746 // outbound substream may have been initiated by the local node while a remote node also
747 // opened a substream roughly at the same time
748 PeerState::OutboundInitiated {
749 substream: outbound,
750 } => {
751 self.negotiation.read_handshake(peer, substream);
752
753 context.state = PeerState::Validating {
754 protocol,
755 fallback,
756 direction: Direction::Outbound,
757 outbound: OutboundState::OutboundInitiated {
758 substream: outbound,
759 },
760 inbound: InboundState::ReadingHandshake,
761 };
762 }
763 // new inbound substream opend while validation for the previous substream was still
764 // pending
765 //
766 // the old substream can be considered dead because remote wouldn't open a new substream
767 // to us unless they had discarded the previous substream.
768 //
769 // set peer state to `ValidationPending` to indicate that the peer is "blocked" until a
770 // validation for the substream is heard, blocking any further activity for
771 // the connection and once the validation is received and in case the
772 // substream is accepted, it will be reported as open failure to to the peer
773 // because the states have gone out of sync.
774 PeerState::Validating {
775 outbound: OutboundState::Closed,
776 inbound:
777 InboundState::Validating {
778 inbound: pending_substream,
779 },
780 ..
781 } => {
782 tracing::debug!(
783 target: LOG_TARGET,
784 ?peer,
785 protocol = %self.protocol,
786 "remote opened substream while previous was still pending, connection failed",
787 );
788 let _ = substream.close().await;
789 let _ = pending_substream.close().await;
790
791 context.state = PeerState::ValidationPending {
792 state: ConnectionState::Open,
793 };
794 }
795 // remote opened another inbound substream, close it and otherwise ignore the event
796 // as this is a non-serious protocol violation.
797 state => {
798 tracing::debug!(
799 target: LOG_TARGET,
800 ?peer,
801 %protocol,
802 ?fallback,
803 ?state,
804 "remote opened more than one inbound substreams, discarding",
805 );
806
807 let _ = substream.close().await;
808 context.state = state;
809 }
810 }
811
812 Ok(())
813 }
814
815 /// Failed to open substream to remote node.
816 ///
817 /// If the substream was initiated by the local node, it must be reported that the substream
818 /// failed to open. Otherwise the peer state can silently be converted to `Closed`.
819 async fn on_substream_open_failure(
820 &mut self,
821 substream_id: SubstreamId,
822 error: SubstreamError,
823 ) {
824 tracing::debug!(
825 target: LOG_TARGET,
826 protocol = %self.protocol,
827 ?substream_id,
828 ?error,
829 "failed to open substream"
830 );
831
832 let Some(peer) = self.pending_outbound.remove(&substream_id) else {
833 tracing::warn!(
834 target: LOG_TARGET,
835 protocol = %self.protocol,
836 ?substream_id,
837 "pending outbound substream doesn't exist",
838 );
839 debug_assert!(false);
840 return;
841 };
842
843 // peer must exist since an outbound substream failure was received from them
844 let Some(context) = self.peers.get_mut(&peer) else {
845 tracing::warn!(target: LOG_TARGET, ?peer, "peer doesn't exist");
846 debug_assert!(false);
847 return;
848 };
849
850 match &mut context.state {
851 PeerState::OutboundInitiated { .. } => {
852 context.state = PeerState::Closed { pending_open: None };
853
854 self.event_handle
855 .report_notification_stream_open_failure(peer, NotificationError::Rejected)
856 .await;
857 }
858 // if the substream was accepted by the local node and as a result, an outbound
859 // substream was accepted as a result this should not be reported to local node
860 PeerState::Validating { outbound, .. } => {
861 self.negotiation.remove_inbound(&peer);
862 self.negotiation.remove_outbound(&peer);
863
864 let pending_open = match outbound {
865 OutboundState::Closed => None,
866 OutboundState::OutboundInitiated { substream } => {
867 self.event_handle
868 .report_notification_stream_open_failure(
869 peer,
870 NotificationError::Rejected,
871 )
872 .await;
873
874 Some(*substream)
875 }
876 OutboundState::Negotiating | OutboundState::Open { .. } => {
877 self.event_handle
878 .report_notification_stream_open_failure(
879 peer,
880 NotificationError::Rejected,
881 )
882 .await;
883
884 None
885 }
886 };
887
888 context.state = PeerState::Closed { pending_open };
889 }
890 PeerState::Closed { pending_open } => {
891 tracing::debug!(
892 target: LOG_TARGET,
893 protocol = %self.protocol,
894 ?substream_id,
895 "substream open failure for a closed connection",
896 );
897 debug_assert_eq!(pending_open, &Some(substream_id));
898 context.state = PeerState::Closed { pending_open: None };
899 }
900 state => {
901 tracing::warn!(
902 target: LOG_TARGET,
903 protocol = %self.protocol,
904 ?substream_id,
905 ?state,
906 "invalid state for outbound substream open failure",
907 );
908 context.state = PeerState::Closed { pending_open: None };
909 debug_assert!(false);
910 }
911 }
912 }
913
914 /// Open substream to remote `peer`.
915 ///
916 /// Outbound substream can opened only if the `PeerState` is `Closed`.
917 /// By forcing the substream to be opened only if the state is currently closed,
918 /// `NotificationProtocol` can enfore more predictable state transitions.
919 ///
920 /// Other states either imply an invalid state transition ([`PeerState::Open`]) or that an
921 /// inbound substream has already been received and its currently being validated by the user.
922 async fn on_open_substream(&mut self, peer: PeerId) -> crate::Result<()> {
923 tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "open substream");
924
925 let Some(context) = self.peers.get_mut(&peer) else {
926 if !self.should_dial {
927 tracing::debug!(
928 target: LOG_TARGET,
929 ?peer,
930 protocol = %self.protocol,
931 "connection to peer not open and dialing disabled",
932 );
933
934 self.event_handle
935 .report_notification_stream_open_failure(peer, NotificationError::DialFailure)
936 .await;
937 return Ok(());
938 }
939
940 match self.service.dial(&peer) {
941 Err(error) => {
942 tracing::debug!(
943 target: LOG_TARGET,
944 ?peer,
945 protocol = %self.protocol,
946 ?error,
947 "failed to dial peer",
948 );
949
950 self.event_handle
951 .report_notification_stream_open_failure(
952 peer,
953 NotificationError::DialFailure,
954 )
955 .await;
956
957 return Err(error.into());
958 }
959 Ok(()) => {
960 tracing::trace!(
961 target: LOG_TARGET,
962 ?peer,
963 protocol = %self.protocol,
964 "started to dial peer",
965 );
966
967 self.peers.insert(
968 peer,
969 PeerContext {
970 state: PeerState::Dialing,
971 },
972 );
973 return Ok(());
974 }
975 }
976 };
977
978 match context.state {
979 // protocol can only request a new outbound substream to be opened if the state is
980 // `Closed` other states imply that it's already open
981 PeerState::Closed {
982 pending_open: Some(substream_id),
983 } => {
984 tracing::trace!(
985 target: LOG_TARGET,
986 ?peer,
987 protocol = %self.protocol,
988 ?substream_id,
989 "outbound substream opening, reusing pending open substream",
990 );
991
992 self.pending_outbound.insert(substream_id, peer);
993 context.state = PeerState::OutboundInitiated {
994 substream: substream_id,
995 };
996 }
997 PeerState::Closed { .. } => match self.service.open_substream(peer) {
998 Ok(substream_id) => {
999 tracing::trace!(
1000 target: LOG_TARGET,
1001 ?peer,
1002 protocol = %self.protocol,
1003 ?substream_id,
1004 "outbound substream opening",
1005 );
1006
1007 self.pending_outbound.insert(substream_id, peer);
1008 context.state = PeerState::OutboundInitiated {
1009 substream: substream_id,
1010 };
1011 }
1012 Err(error) => {
1013 tracing::debug!(
1014 target: LOG_TARGET,
1015 ?peer,
1016 protocol = %self.protocol,
1017 ?error,
1018 "failed to open substream",
1019 );
1020
1021 self.event_handle
1022 .report_notification_stream_open_failure(
1023 peer,
1024 NotificationError::NoConnection,
1025 )
1026 .await;
1027 context.state = PeerState::Closed { pending_open: None };
1028 }
1029 },
1030 // while a validation is pending for an inbound substream, user is not allowed to open
1031 // any outbound substreams until the old inbond substream is either accepted or rejected
1032 PeerState::ValidationPending { .. } => {
1033 tracing::trace!(
1034 target: LOG_TARGET,
1035 ?peer,
1036 protocol = %self.protocol,
1037 "validation still pending, rejecting outbound substream request",
1038 );
1039
1040 self.event_handle
1041 .report_notification_stream_open_failure(
1042 peer,
1043 NotificationError::ValidationPending,
1044 )
1045 .await;
1046 }
1047 _ => {}
1048 }
1049
1050 Ok(())
1051 }
1052
1053 /// Close substream to remote `peer`.
1054 ///
1055 /// This function can only be called if the substream was actually open, any other state is
1056 /// unreachable as the user is unable to emit this command to [`NotificationProtocol`] unless
1057 /// the connection has been fully opened.
1058 async fn on_close_substream(&mut self, peer: PeerId) {
1059 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "close substream");
1060
1061 let Some(context) = self.peers.get_mut(&peer) else {
1062 tracing::debug!(target: LOG_TARGET, ?peer, "peer doesn't exist");
1063 return;
1064 };
1065
1066 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1067 PeerState::Open { shutdown } => {
1068 let _ = shutdown.send(());
1069
1070 context.state = PeerState::Closed { pending_open: None };
1071 }
1072 state => {
1073 tracing::debug!(
1074 target: LOG_TARGET,
1075 ?peer,
1076 protocol = %self.protocol,
1077 ?state,
1078 "substream already closed",
1079 );
1080 context.state = state;
1081 }
1082 }
1083 }
1084
1085 /// Handle validation result.
1086 ///
1087 /// The validation result binary (accept/reject). If the node is rejected, the substreams are
1088 /// discarded and state is set to `PeerState::Closed`. If there was an outbound substream in
1089 /// progress while the connection was rejected by the user, the oubound state is discarded,
1090 /// except for the substream ID of the substream which is kept for later use, in case the
1091 /// substream happens to open.
1092 ///
1093 /// If the node is accepted and there is no outbound substream to them open yet, a new substream
1094 /// is opened and once it opens, the local handshake will be sent to the remote peer and if
1095 /// they also accept the substream the connection is considered fully open.
1096 async fn on_validation_result(
1097 &mut self,
1098 peer: PeerId,
1099 result: ValidationResult,
1100 ) -> crate::Result<()> {
1101 tracing::trace!(
1102 target: LOG_TARGET,
1103 ?peer,
1104 protocol = %self.protocol,
1105 ?result,
1106 "handle validation result",
1107 );
1108
1109 let Some(context) = self.peers.get_mut(&peer) else {
1110 tracing::debug!(target: LOG_TARGET, ?peer, "peer doesn't exist");
1111 return Err(Error::PeerDoesntExist(peer));
1112 };
1113
1114 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1115 PeerState::Validating {
1116 protocol,
1117 fallback,
1118 outbound,
1119 direction,
1120 inbound: InboundState::Validating { inbound },
1121 } => match result {
1122 // substream was rejected by the local node, if an outbound substream was under
1123 // negotation, discard that data and if an outbound substream was
1124 // initiated, save the `SubstreamId` of that substream and later if the substream
1125 // is opened, the state can be corrected to `pending_open: None`.
1126 ValidationResult::Reject => {
1127 let _ = inbound.close().await;
1128 self.negotiation.remove_outbound(&peer);
1129 self.negotiation.remove_inbound(&peer);
1130 context.state = PeerState::Closed {
1131 pending_open: outbound.pending_open(),
1132 };
1133
1134 Ok(())
1135 }
1136 ValidationResult::Accept => match outbound {
1137 // no outbound substream exists so initiate a new substream open and send the
1138 // local handshake to remote node, indicating that the
1139 // connection was accepted by the local node
1140 OutboundState::Closed => match self.service.open_substream(peer) {
1141 Ok(substream) => {
1142 self.negotiation.send_handshake(peer, inbound);
1143 self.pending_outbound.insert(substream, peer);
1144
1145 context.state = PeerState::Validating {
1146 protocol,
1147 fallback,
1148 direction,
1149 inbound: InboundState::SendingHandshake,
1150 outbound: OutboundState::OutboundInitiated { substream },
1151 };
1152 Ok(())
1153 }
1154 // failed to open outbound substream after accepting an inbound substream
1155 //
1156 // since the user was notified of this substream and they accepted it,
1157 // they expecting some kind of answer (open success/failure).
1158 //
1159 // report to user that the substream failed to open so they can track the
1160 // state transitions of the peer correctly
1161 Err(error) => {
1162 tracing::trace!(
1163 target: LOG_TARGET,
1164 ?peer,
1165 protocol = %self.protocol,
1166 ?result,
1167 ?error,
1168 "failed to open outbound substream for accepted substream",
1169 );
1170
1171 let _ = inbound.close().await;
1172 context.state = PeerState::Closed { pending_open: None };
1173
1174 self.event_handle
1175 .report_notification_stream_open_failure(
1176 peer,
1177 NotificationError::Rejected,
1178 )
1179 .await;
1180
1181 Err(error.into())
1182 }
1183 },
1184 // here the state is one of `OutboundState::{OutboundInitiated, Negotiating,
1185 // Open}` so that state can be safely ignored and all that
1186 // has to be done is to send the local handshake to remote
1187 // node to indicate that the connection was accepted.
1188 _ => {
1189 self.negotiation.send_handshake(peer, inbound);
1190
1191 context.state = PeerState::Validating {
1192 protocol,
1193 fallback,
1194 direction,
1195 inbound: InboundState::SendingHandshake,
1196 outbound,
1197 };
1198 Ok(())
1199 }
1200 },
1201 },
1202 // validation result received for an inbound substream which is now considered dead
1203 // because while the substream was being validated, the connection had closed.
1204 //
1205 // if the substream was rejected and there is no active connection to the peer,
1206 // just remove the peer from `peers` without informing user
1207 //
1208 // if the substream was accepted, the user must be informed that the substream failed to
1209 // open. Depending on whether there is currently a connection open to the peer, either
1210 // report `Rejected`/`NoConnection` and let the user try again.
1211 PeerState::ValidationPending { state } => {
1212 if let Some(error) = match state {
1213 ConnectionState::Open => {
1214 context.state = PeerState::Closed { pending_open: None };
1215
1216 std::matches!(result, ValidationResult::Accept)
1217 .then_some(NotificationError::Rejected)
1218 }
1219 ConnectionState::Closed => {
1220 self.peers.remove(&peer);
1221
1222 std::matches!(result, ValidationResult::Accept)
1223 .then_some(NotificationError::NoConnection)
1224 }
1225 } {
1226 self.event_handle.report_notification_stream_open_failure(peer, error).await;
1227 }
1228
1229 Ok(())
1230 }
1231 // if the user incorrectly send a validation result for a peer that doesn't require
1232 // validation, set state back to what it was and ignore the event
1233 //
1234 // the user protocol may send a stale validation result not because of a programming
1235 // error but because it has a backlock of unhandled events, with one event potentially
1236 // nullifying the need for substream validation, and is just temporarily out of sync
1237 // with `NotificationProtocol`
1238 state => {
1239 tracing::debug!(
1240 target: LOG_TARGET,
1241 ?peer,
1242 protocol = %self.protocol,
1243 ?state,
1244 "validation result received for peer that doesn't require validation",
1245 );
1246
1247 context.state = state;
1248 Ok(())
1249 }
1250 }
1251 }
1252
1253 /// Handle handshake event.
1254 ///
1255 /// There are three different handshake event types:
1256 /// - outbound substream negotiated
1257 /// - inbound substream negotiated
1258 /// - substream negotiation error
1259 ///
1260 /// Neither outbound nor inbound substream negotiated automatically means that the connection is
1261 /// considered open as both substreams must be fully negotiated for that to be the case. That is
1262 /// why the peer state for inbound and outbound are set separately and at the end of the
1263 /// function is the collective state of the substreams checked and if both substreams are
1264 /// negotiated, the user informed that the connection is open.
1265 ///
1266 /// If the negotiation fails, the user may have to be informed of that. Outbound substream
1267 /// failure always results in user getting notified since the existence of an outbound substream
1268 /// means that the user has either initiated an outbound substreams or has accepted an inbound
1269 /// substreams, resulting in an outbound substreams.
1270 ///
1271 /// Negotiation failure for inbound substreams which are in the state
1272 /// [`InboundState::ReadingHandshake`] don't result in any notification because while the
1273 /// handshake is being read from the substream, the user is oblivious to the fact that an
1274 /// inbound substream has even been received.
1275 async fn on_handshake_event(&mut self, peer: PeerId, event: HandshakeEvent) {
1276 let Some(context) = self.peers.get_mut(&peer) else {
1277 tracing::error!(target: LOG_TARGET, "invalid state: negotiation event received but peer doesn't exist");
1278 debug_assert!(false);
1279 return;
1280 };
1281
1282 tracing::trace!(
1283 target: LOG_TARGET,
1284 ?peer,
1285 protocol = %self.protocol,
1286 ?event,
1287 "handle handshake event",
1288 );
1289
1290 match event {
1291 // either an inbound or outbound substream has been negotiated successfully
1292 HandshakeEvent::Negotiated {
1293 peer,
1294 handshake,
1295 substream,
1296 direction,
1297 } => match direction {
1298 // outbound substream was negotiated, the only valid state for peer is `Validating`
1299 // and only valid state for `OutboundState` is `Negotiating`
1300 negotiation::Direction::Outbound => {
1301 self.negotiation.remove_outbound(&peer);
1302
1303 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1304 PeerState::Validating {
1305 protocol,
1306 fallback,
1307 direction,
1308 outbound: OutboundState::Negotiating,
1309 inbound,
1310 } => {
1311 context.state = PeerState::Validating {
1312 protocol,
1313 fallback,
1314 direction,
1315 outbound: OutboundState::Open {
1316 handshake,
1317 outbound: substream,
1318 },
1319 inbound,
1320 };
1321 }
1322 state => {
1323 tracing::warn!(
1324 target: LOG_TARGET,
1325 ?peer,
1326 ?state,
1327 "outbound substream negotiated but peer has invalid state",
1328 );
1329 debug_assert!(false);
1330 }
1331 }
1332 }
1333 // inbound negotiation event completed
1334 //
1335 // the negotiation event can be on of two different types:
1336 // - remote handshake was read from the substream
1337 // - local handshake has been sent to remote node
1338 //
1339 // For the first case, the substream has to be validated by the local node.
1340 // This means reporting the protocol name, potential negotiated fallback and the
1341 // handshake. Local node will then either accept or reject the substream which is
1342 // handled by [`NotificationProtocol::on_validation_result()`]. Compared to
1343 // Substrate, litep2p requires both peers to validate the inbound handshake to allow
1344 // more complex connection validation. If this is not necessary and the protocol
1345 // wishes to auto-accept the inbound substreams that are a result of
1346 // an outbound substream already accepted by the remote node, the
1347 // substream validation is skipped and the local handshake is sent
1348 // right away.
1349 //
1350 // For the second case, the local handshake was sent to remote node successfully and
1351 // the inbound substream is considered open and if the outbound
1352 // substream is open as well, the connection is fully open.
1353 //
1354 // Only valid states for [`InboundState`] are [`InboundState::ReadingHandshake`] and
1355 // [`InboundState::SendingHandshake`] because otherwise the inbound
1356 // substream cannot be in [`HandshakeService`](super::negotiation::HandshakeService)
1357 // unless there is a logic bug in the state machine.
1358 negotiation::Direction::Inbound => {
1359 self.negotiation.remove_inbound(&peer);
1360
1361 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1362 PeerState::Validating {
1363 protocol,
1364 fallback,
1365 direction,
1366 outbound,
1367 inbound: InboundState::ReadingHandshake,
1368 } => {
1369 if !std::matches!(outbound, OutboundState::Closed) && self.auto_accept {
1370 tracing::trace!(
1371 target: LOG_TARGET,
1372 ?peer,
1373 %protocol,
1374 ?fallback,
1375 ?direction,
1376 ?outbound,
1377 "auto-accept inbound substream",
1378 );
1379
1380 self.negotiation.send_handshake(peer, substream);
1381 context.state = PeerState::Validating {
1382 protocol,
1383 fallback,
1384 direction,
1385 inbound: InboundState::SendingHandshake,
1386 outbound,
1387 };
1388
1389 return;
1390 }
1391
1392 tracing::trace!(
1393 target: LOG_TARGET,
1394 ?peer,
1395 %protocol,
1396 ?fallback,
1397 ?outbound,
1398 "send inbound protocol for validation",
1399 );
1400
1401 context.state = PeerState::Validating {
1402 protocol: protocol.clone(),
1403 fallback: fallback.clone(),
1404 inbound: InboundState::Validating { inbound: substream },
1405 outbound,
1406 direction,
1407 };
1408
1409 let (tx, rx) = oneshot::channel();
1410 self.pending_validations.push(Box::pin(async move {
1411 match rx.await {
1412 Ok(ValidationResult::Accept) =>
1413 (peer, ValidationResult::Accept),
1414 _ => (peer, ValidationResult::Reject),
1415 }
1416 }));
1417
1418 self.event_handle
1419 .report_inbound_substream(protocol, fallback, peer, handshake, tx)
1420 .await;
1421 }
1422 PeerState::Validating {
1423 protocol,
1424 fallback,
1425 direction,
1426 inbound: InboundState::SendingHandshake,
1427 outbound,
1428 } => {
1429 tracing::trace!(
1430 target: LOG_TARGET,
1431 ?peer,
1432 %protocol,
1433 ?fallback,
1434 "inbound substream negotiated, waiting for outbound substream to complete",
1435 );
1436
1437 context.state = PeerState::Validating {
1438 protocol: protocol.clone(),
1439 fallback: fallback.clone(),
1440 inbound: InboundState::Open { inbound: substream },
1441 outbound,
1442 direction,
1443 };
1444 }
1445 _state => debug_assert!(false),
1446 }
1447 }
1448 },
1449 // error occurred during negotiation, eitehr for inbound or outbound substream
1450 // user is notified of the error only if they've either initiated an outbound substream
1451 // or if they accepted an inbound substream and as a result initiated an outbound
1452 // substream.
1453 HandshakeEvent::NegotiationError { peer, direction } => {
1454 tracing::debug!(
1455 target: LOG_TARGET,
1456 ?peer,
1457 protocol = %self.protocol,
1458 ?direction,
1459 state = ?context.state,
1460 "failed to negotiate substream",
1461 );
1462 let _ = self.negotiation.remove_outbound(&peer);
1463 let _ = self.negotiation.remove_inbound(&peer);
1464
1465 // if an outbound substream had been initiated (whatever its state is), it means
1466 // that the user knows about the connection and must be notified that it failed to
1467 // negotiate.
1468 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1469 PeerState::Validating { outbound, .. } => {
1470 context.state = PeerState::Closed {
1471 pending_open: outbound.pending_open(),
1472 };
1473
1474 // notify user if the outbound substream is not considered closed
1475 if !std::matches!(outbound, OutboundState::Closed) {
1476 return self
1477 .event_handle
1478 .report_notification_stream_open_failure(
1479 peer,
1480 NotificationError::Rejected,
1481 )
1482 .await;
1483 }
1484 }
1485 _state => debug_assert!(false),
1486 }
1487 }
1488 }
1489
1490 // if both inbound and outbound substreams are considered open, notify the user that
1491 // a notification stream has been opened and set up for sending and receiving
1492 // notifications to and from remote node
1493 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1494 PeerState::Validating {
1495 protocol,
1496 fallback,
1497 direction,
1498 outbound:
1499 OutboundState::Open {
1500 handshake,
1501 outbound,
1502 },
1503 inbound: InboundState::Open { inbound },
1504 } => {
1505 tracing::debug!(
1506 target: LOG_TARGET,
1507 ?peer,
1508 %protocol,
1509 ?fallback,
1510 "notification stream opened",
1511 );
1512
1513 let (async_tx, async_rx) = channel(self.async_channel_size);
1514 let (sync_tx, sync_rx) = channel(self.sync_channel_size);
1515 let sink = NotificationSink::new(peer, sync_tx, async_tx);
1516
1517 // start connection handler for the peer which only deals with sending/receiving
1518 // notifications
1519 //
1520 // the connection handler must be started only after the newly opened notification
1521 // substream is reported to user because the connection handler
1522 // might exit immediately after being started if remote closed the connection.
1523 //
1524 // if the order of events (open & close) is not ensured to be correct, the code
1525 // handling the connectivity logic on the `NotificationHandle` side
1526 // might get confused about the current state of the connection.
1527 let shutdown_tx = self.shutdown_tx.clone();
1528 let (connection, shutdown) = Connection::new(
1529 peer,
1530 inbound,
1531 outbound,
1532 self.event_handle.clone(),
1533 shutdown_tx.clone(),
1534 self.notif_tx.clone(),
1535 async_rx,
1536 sync_rx,
1537 );
1538
1539 context.state = PeerState::Open { shutdown };
1540 self.event_handle
1541 .report_notification_stream_opened(
1542 protocol, fallback, direction, peer, handshake, sink,
1543 )
1544 .await;
1545
1546 self.executor.run(Box::pin(async move {
1547 connection.start().await;
1548 }));
1549 }
1550 state => {
1551 tracing::trace!(
1552 target: LOG_TARGET,
1553 ?peer,
1554 protocol = %self.protocol,
1555 ?state,
1556 "validation for substream still pending",
1557 );
1558 self.timers.push(Box::pin(async move {
1559 futures_timer::Delay::new(Duration::from_secs(5)).await;
1560 peer
1561 }));
1562
1563 context.state = state;
1564 }
1565 }
1566 }
1567
1568 /// Handle dial failure.
1569 async fn on_dial_failure(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
1570 tracing::trace!(
1571 target: LOG_TARGET,
1572 ?peer,
1573 protocol = %self.protocol,
1574 ?addresses,
1575 "handle dial failure",
1576 );
1577
1578 let Some(context) = self.peers.remove(&peer) else {
1579 tracing::trace!(
1580 target: LOG_TARGET,
1581 ?peer,
1582 protocol = %self.protocol,
1583 ?addresses,
1584 "dial failure for an unknown peer",
1585 );
1586 return;
1587 };
1588
1589 match context.state {
1590 PeerState::Dialing => {
1591 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, ?addresses, "failed to dial peer");
1592 self.event_handle
1593 .report_notification_stream_open_failure(peer, NotificationError::DialFailure)
1594 .await;
1595 }
1596 state => {
1597 tracing::trace!(
1598 target: LOG_TARGET,
1599 ?peer,
1600 protocol = %self.protocol,
1601 ?state,
1602 "dial failure for peer that's not being dialed",
1603 );
1604 self.peers.insert(peer, PeerContext { state });
1605 }
1606 }
1607 }
1608
1609 /// Handle next notification event.
1610 ///
1611 /// Returns `true` when the user command stream was dropped.
1612 async fn next_event(&mut self) -> bool {
1613 // biased select is used because the substream events must be prioritized above other events
1614 // that is because a closed substream is detected by either `substreams` or `negotiation`
1615 // and if that event is not handled with priority but, e.g., inbound substream is
1616 // handled before, it can create a situation where the state machine gets confused
1617 // about the peer's state.
1618 tokio::select! {
1619 biased;
1620
1621 event = self.negotiation.next(), if !self.negotiation.is_empty() => {
1622 if let Some((peer, event)) = event {
1623 self.on_handshake_event(peer, event).await;
1624 } else {
1625 tracing::error!(target: LOG_TARGET, "`HandshakeService` expected to return `Some(..)`");
1626 debug_assert!(false);
1627 };
1628 }
1629 event = self.shutdown_rx.recv() => match event {
1630 None => (),
1631 Some(peer) => {
1632 if let Some(context) = self.peers.get_mut(&peer) {
1633 tracing::trace!(
1634 target: LOG_TARGET,
1635 ?peer,
1636 protocol = %self.protocol,
1637 "notification stream to peer closed",
1638 );
1639 context.state = PeerState::Closed { pending_open: None };
1640 }
1641 }
1642 },
1643 // TODO: https://github.com/paritytech/litep2p/issues/338 this could be combined with `Negotiation`
1644 peer = self.timers.next(), if !self.timers.is_empty() => match peer {
1645 Some(peer) => {
1646 match self.peers.get_mut(&peer) {
1647 Some(context) => match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1648 PeerState::Validating {
1649 outbound: OutboundState::Open { outbound, .. },
1650 inbound: InboundState::Closed,
1651 ..
1652 } => {
1653 tracing::debug!(
1654 target: LOG_TARGET,
1655 ?peer,
1656 protocol = %self.protocol,
1657 "peer didn't answer in 10 seconds, canceling substream and closing connection",
1658 );
1659 context.state = PeerState::Closed { pending_open: None };
1660
1661 let _ = outbound.close().await;
1662 self.event_handle
1663 .report_notification_stream_open_failure(peer, NotificationError::Rejected)
1664 .await;
1665
1666 // NOTE: this is used to work around an issue in Substrate where the protocol
1667 // is not notified if an inbound substream is closed. That indicates that remote
1668 // wishes the close the connection but `Notifications` still keeps the substream state
1669 // as `Open` until the outbound substream is closed (even though the outbound substream
1670 // is also closed at that point). This causes a further issue: inbound substreams
1671 // are automatically opened when state is `Open`, even if the inbound substream belongs
1672 // to a new "connection" (pair of substreams).
1673 //
1674 // basically what happens (from Substrate's PoV) is there are pair of substreams (`inbound1`, `outbound1`),
1675 // litep2p closes both substreams so both `inbound1` and outbound1 become non-readable/writable.
1676 // Substrate doesn't detect this an instead only marks `inbound1` is closed while still keeping
1677 // the (now-closed) `outbound1` active and it will be detected closed only when Substrate tries to
1678 // write something into that substream. If now litep2p tries to open a new connection to Substrate,
1679 // the outbound substream from litep2p's PoV will be automatically accepted (https://github.com/paritytech/polkadot-sdk/blob/59b2661444de2a25f2125a831bd786035a9fac4b/substrate/client/network/src/protocol/notifications/handler.rs#L544-L556)
1680 // but since Substrate thinks `outbound1` is still active, it won't open a new outbound substream
1681 // and it ends up having (`inbound2`, `outbound1`) as its pair of substreams which doens't make sense.
1682 //
1683 // since litep2p is expecting to receive an inbound substream from Substrate and never receives it,
1684 // it basically can't make progress with the substream open request because litep2p can't force Substrate
1685 // to detect that `outbound1` is closed. Easiest (and very hacky at the same time) way to reset the substream
1686 // state is to close the connection. This is not an appropriate way to fix the issue and causes issues with,
1687 // e.g., smoldot which at the time of writing this doesn't support the transaction protocol. The only way to fix
1688 // this cleanly is to make Substrate detect closed substreams correctly.
1689 if let Err(error) = self.service.force_close(peer) {
1690 tracing::debug!(
1691 target: LOG_TARGET,
1692 ?peer,
1693 protocol = %self.protocol,
1694 ?error,
1695 "failed to force close connection",
1696 );
1697 }
1698 }
1699 state => {
1700 tracing::trace!(
1701 target: LOG_TARGET,
1702 ?peer,
1703 protocol = %self.protocol,
1704 ?state,
1705 "ignore expired timer for peer",
1706 );
1707 context.state = state;
1708 }
1709 }
1710 None => tracing::debug!(
1711 target: LOG_TARGET,
1712 ?peer,
1713 protocol = %self.protocol,
1714 "peer doesn't exist anymore",
1715 ),
1716 }
1717 }
1718 None => (),
1719 },
1720 event = self.service.next() => match event {
1721 Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
1722 if let Err(error) = self.on_connection_established(peer).await {
1723 tracing::debug!(
1724 target: LOG_TARGET,
1725 ?peer,
1726 ?error,
1727 "failed to register peer",
1728 );
1729 }
1730 }
1731 Some(TransportEvent::ConnectionClosed { peer }) => {
1732 if let Err(error) = self.on_connection_closed(peer).await {
1733 tracing::debug!(
1734 target: LOG_TARGET,
1735 ?peer,
1736 ?error,
1737 "failed to disconnect peer",
1738 );
1739 }
1740 }
1741 Some(TransportEvent::SubstreamOpened {
1742 peer,
1743 substream,
1744 direction,
1745 protocol,
1746 fallback,
1747 }) => match direction {
1748 protocol::Direction::Inbound => {
1749 if let Err(error) = self.on_inbound_substream(protocol, fallback, peer, substream).await {
1750 tracing::debug!(
1751 target: LOG_TARGET,
1752 ?peer,
1753 ?error,
1754 "failed to handle inbound substream",
1755 );
1756 }
1757 }
1758 protocol::Direction::Outbound(substream_id) => {
1759 if let Err(error) = self
1760 .on_outbound_substream(protocol, fallback, peer, substream_id, substream)
1761 .await
1762 {
1763 tracing::debug!(
1764 target: LOG_TARGET,
1765 ?peer,
1766 ?error,
1767 "failed to handle outbound substream",
1768 );
1769 }
1770 }
1771 },
1772 Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
1773 self.on_substream_open_failure(substream, error).await;
1774 }
1775 Some(TransportEvent::DialFailure { peer, addresses }) => self.on_dial_failure(peer, addresses).await,
1776 None => (),
1777 },
1778 result = self.pending_validations.select_next_some(), if !self.pending_validations.is_empty() => {
1779 if let Err(error) = self.on_validation_result(result.0, result.1).await {
1780 tracing::debug!(
1781 target: LOG_TARGET,
1782 peer = ?result.0,
1783 result = ?result.1,
1784 ?error,
1785 "failed to handle validation result",
1786 );
1787 }
1788 }
1789
1790 // User commands.
1791 command = self.command_rx.recv() => match command {
1792 None => {
1793 tracing::debug!(
1794 target: LOG_TARGET,
1795 protocol = %self.protocol,
1796 "user protocol has exited, exiting"
1797 );
1798
1799 self.service.unregister_protocol();
1800
1801 return true;
1802 }
1803 Some(command) => match command {
1804 NotificationCommand::OpenSubstream { peers } => {
1805 for peer in peers {
1806 if let Err(error) = self.on_open_substream(peer).await {
1807 tracing::debug!(
1808 target: LOG_TARGET,
1809 ?peer,
1810 ?error,
1811 "failed to open substream",
1812 );
1813 }
1814 }
1815 }
1816 NotificationCommand::CloseSubstream { peers } => {
1817 for peer in peers {
1818 self.on_close_substream(peer).await;
1819 }
1820 }
1821 NotificationCommand::ForceClose { peer } => {
1822 let _ = self.service.force_close(peer);
1823 }
1824 #[cfg(feature = "fuzz")]
1825 NotificationCommand::SendNotification{ .. } => unreachable!()
1826 }
1827 },
1828 }
1829
1830 false
1831 }
1832
1833 /// Start [`NotificationProtocol`] event loop.
1834 pub(crate) async fn run(mut self) {
1835 tracing::debug!(target: LOG_TARGET, "starting notification event loop");
1836
1837 while !self.next_event().await {}
1838 }
1839}