litep2p/protocol/notification/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Notification protocol implementation.
22
23use crate::{
24    error::{Error, SubstreamError},
25    executor::Executor,
26    protocol::{
27        self,
28        notification::{
29            connection::Connection,
30            handle::NotificationEventHandle,
31            negotiation::{HandshakeEvent, HandshakeService},
32        },
33        TransportEvent, TransportService,
34    },
35    substream::Substream,
36    types::{protocol::ProtocolName, SubstreamId},
37    PeerId, DEFAULT_CHANNEL_SIZE,
38};
39
40use bytes::BytesMut;
41use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
42use multiaddr::Multiaddr;
43use tokio::sync::{
44    mpsc::{channel, Receiver, Sender},
45    oneshot,
46};
47
48use std::{collections::HashMap, sync::Arc, time::Duration};
49
50pub use config::{Config, ConfigBuilder};
51pub use handle::{NotificationHandle, NotificationSink};
52pub use types::{
53    Direction, NotificationCommand, NotificationError, NotificationEvent, ValidationResult,
54};
55
56mod config;
57mod connection;
58mod handle;
59mod negotiation;
60mod types;
61
62#[cfg(test)]
63mod tests;
64
65/// Logging target for the file.
66const LOG_TARGET: &str = "litep2p::notification";
67
68/// Connection state.
69///
70/// Used to track transport level connectivity state when there is a pending validation.
71/// See [`PeerState::ValidationPending`] for more details.
72#[derive(Debug, PartialEq, Eq)]
73enum ConnectionState {
74    /// There is a active, transport-level connection open to the peer.
75    Open,
76
77    /// There is no transport-level connection open to the peer.
78    Closed,
79}
80
81/// Inbound substream state.
82#[derive(Debug)]
83enum InboundState {
84    /// Substream is closed.
85    Closed,
86
87    /// Handshake is being read from the remote node.
88    ReadingHandshake,
89
90    /// Substream and its handshake are being validated by the user protocol.
91    Validating {
92        /// Inbound substream.
93        inbound: Substream,
94    },
95
96    /// Handshake is being sent to the remote node.
97    SendingHandshake,
98
99    /// Substream is open.
100    Open {
101        /// Inbound substream.
102        inbound: Substream,
103    },
104}
105
106/// Outbound substream state.
107#[derive(Debug)]
108enum OutboundState {
109    /// Substream is closed.
110    Closed,
111
112    /// Outbound substream initiated.
113    OutboundInitiated {
114        /// Substream ID.
115        substream: SubstreamId,
116    },
117
118    /// Substream is in the state of being negotiated.
119    ///
120    /// This process entails sending local node's handshake and reading back the remote node's
121    /// handshake if they've accepted the substream or detecting that the substream was closed
122    /// in case the substream was rejected.
123    Negotiating,
124
125    /// Substream is open.
126    Open {
127        /// Received handshake.
128        handshake: Vec<u8>,
129
130        /// Outbound substream.
131        outbound: Substream,
132    },
133}
134
135impl OutboundState {
136    /// Get pending outboud substream ID, if it exists.
137    fn pending_open(&self) -> Option<SubstreamId> {
138        match &self {
139            OutboundState::OutboundInitiated { substream } => Some(*substream),
140            _ => None,
141        }
142    }
143}
144
145#[derive(Debug)]
146enum PeerState {
147    /// Peer state is poisoned due to invalid state transition.
148    Poisoned,
149
150    /// Validation for an inbound substream is still pending.
151    ///
152    /// In order to enforce valid state transitions, `NotificationProtocol` keeps track of pending
153    /// validations across connectivity events (open/closed) and enforces that no activity happens
154    /// for any peer that is still awaiting validation for their inbound substream.
155    ///
156    /// If connection closes while the substream is being validated, instead of removing peer from
157    /// `peers`, the peer state is set as `ValidationPending` which indicates to the state machine
158    /// that a response for a inbound substream is pending validation. The substream itself will be
159    /// dead by the time validation is received if the peer state is `ValidationPending` since the
160    /// substream was part of a previous, now-closed substream but this state allows
161    /// `NotificationProtocol` to enforce correct state transitions by, e.g., rejecting new inbound
162    /// substream while a previous substream is still being validated or rejecting outbound
163    /// substreams on new connections if that same condition holds.
164    ValidationPending {
165        /// What is current connectivity state of the peer.
166        ///
167        /// If `state` is `ConnectionState::Closed` when the validation is finally received, peer
168        /// is removed from `peer` and if the `state` is `ConnectionState::Open`, peer is moved to
169        /// state `PeerState::Closed` and user is allowed to retry opening an outbound substream.
170        state: ConnectionState,
171    },
172
173    /// Connection to peer is closed.
174    Closed {
175        /// Connection might have been closed while there was an outbound substream still pending.
176        ///
177        /// To handle this state transition correctly in case the substream opens after the
178        /// connection is considered closed, store the `SubstreamId` to that it can be verified in
179        /// case the substream ever opens.
180        pending_open: Option<SubstreamId>,
181    },
182
183    /// Peer is being dialed in order to open an outbound substream to them.
184    Dialing,
185
186    /// Outbound substream initiated.
187    OutboundInitiated {
188        /// Substream ID.
189        substream: SubstreamId,
190    },
191
192    /// Substream is being validated.
193    Validating {
194        /// Protocol.
195        protocol: ProtocolName,
196
197        /// Fallback protocol, if the substream was negotiated using a fallback name.
198        fallback: Option<ProtocolName>,
199
200        /// Outbound protocol state.
201        outbound: OutboundState,
202
203        /// Inbound protocol state.
204        inbound: InboundState,
205
206        /// Direction.
207        direction: Direction,
208    },
209
210    /// Notification stream has been opened.
211    Open {
212        /// `Oneshot::Sender` for shutting down the connection.
213        shutdown: oneshot::Sender<()>,
214    },
215}
216
217/// Peer context.
218#[derive(Debug)]
219struct PeerContext {
220    /// Peer state.
221    state: PeerState,
222}
223
224impl PeerContext {
225    /// Create new [`PeerContext`].
226    fn new() -> Self {
227        Self {
228            state: PeerState::Closed { pending_open: None },
229        }
230    }
231}
232
233pub(crate) struct NotificationProtocol {
234    /// Transport service.
235    service: TransportService,
236
237    /// Protocol.
238    protocol: ProtocolName,
239
240    /// Auto accept inbound substream if the outbound substream was initiated by the local node.
241    auto_accept: bool,
242
243    /// TX channel passed to the protocol used for sending events.
244    event_handle: NotificationEventHandle,
245
246    /// TX channel for sending shut down notifications from connection handlers to
247    /// [`NotificationProtocol`].
248    shutdown_tx: Sender<PeerId>,
249
250    /// RX channel for receiving shutdown notifications from the connection handlers.
251    shutdown_rx: Receiver<PeerId>,
252
253    /// RX channel passed to the protocol used for receiving commands.
254    command_rx: Receiver<NotificationCommand>,
255
256    /// TX channel given to connection handlers for sending notifications.
257    notif_tx: Sender<(PeerId, BytesMut)>,
258
259    /// Connected peers.
260    peers: HashMap<PeerId, PeerContext>,
261
262    /// Pending outbound substreams.
263    pending_outbound: HashMap<SubstreamId, PeerId>,
264
265    /// Handshaking service which reads and writes the handshakes to inbound
266    /// and outbound substreams asynchronously.
267    negotiation: HandshakeService,
268
269    /// Synchronous channel size.
270    sync_channel_size: usize,
271
272    /// Asynchronous channel size.
273    async_channel_size: usize,
274
275    /// Executor for connection handlers.
276    executor: Arc<dyn Executor>,
277
278    /// Pending substream validations.
279    pending_validations: FuturesUnordered<BoxFuture<'static, (PeerId, ValidationResult)>>,
280
281    /// Timers for pending outbound substreams.
282    timers: FuturesUnordered<BoxFuture<'static, PeerId>>,
283
284    /// Should `NotificationProtocol` attempt to dial the peer.
285    should_dial: bool,
286}
287
288impl NotificationProtocol {
289    pub(crate) fn new(
290        service: TransportService,
291        config: Config,
292        executor: Arc<dyn Executor>,
293    ) -> Self {
294        let (shutdown_tx, shutdown_rx) = channel(DEFAULT_CHANNEL_SIZE);
295
296        Self {
297            service,
298            shutdown_tx,
299            shutdown_rx,
300            executor,
301            peers: HashMap::new(),
302            protocol: config.protocol_name,
303            auto_accept: config.auto_accept,
304            pending_validations: FuturesUnordered::new(),
305            timers: FuturesUnordered::new(),
306            event_handle: NotificationEventHandle::new(config.event_tx),
307            notif_tx: config.notif_tx,
308            command_rx: config.command_rx,
309            pending_outbound: HashMap::new(),
310            negotiation: HandshakeService::new(config.handshake),
311            sync_channel_size: config.sync_channel_size,
312            async_channel_size: config.async_channel_size,
313            should_dial: config.should_dial,
314        }
315    }
316
317    /// Connection established to remote node.
318    ///
319    /// If the peer already exists, the only valid state for it is `Dialing` as it indicates that
320    /// the user tried to open a substream to a peer who was not connected to local node.
321    ///
322    /// Any other state indicates that there's an error in the state transition logic.
323    async fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
324        tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection established");
325
326        let Some(context) = self.peers.get_mut(&peer) else {
327            self.peers.insert(peer, PeerContext::new());
328            return Ok(());
329        };
330
331        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
332            PeerState::Dialing => {
333                tracing::trace!(
334                    target: LOG_TARGET,
335                    ?peer,
336                    protocol = %self.protocol,
337                    "dial succeeded, open substream to peer",
338                );
339
340                context.state = PeerState::Closed { pending_open: None };
341                self.on_open_substream(peer).await
342            }
343            // connection established but validation is still pending
344            //
345            // update the connection state so that `NotificationProtocol` can proceed
346            // to correct state after the validation result has beern received
347            PeerState::ValidationPending { state } => {
348                debug_assert_eq!(state, ConnectionState::Closed);
349
350                tracing::debug!(
351                    target: LOG_TARGET,
352                    ?peer,
353                    protocol = %self.protocol,
354                    "new connection established while validation still pending",
355                );
356
357                context.state = PeerState::ValidationPending {
358                    state: ConnectionState::Open,
359                };
360
361                Ok(())
362            }
363            state => {
364                tracing::error!(
365                    target: LOG_TARGET,
366                    ?peer,
367                    protocol = %self.protocol,
368                    ?state,
369                    "state mismatch: peer already exists",
370                );
371                debug_assert!(false);
372                Err(Error::PeerAlreadyExists(peer))
373            }
374        }
375    }
376
377    /// Connection closed to remote node.
378    ///
379    /// If the connection was considered open (both substreams were open), user is notified that
380    /// the notification stream was closed.
381    ///
382    /// If the connection was still in progress (either substream was not fully open), the user is
383    /// reported about it only if they had opened an outbound substream (outbound is either fully
384    /// open, it had been initiated or the substream was under negotiation).
385    async fn on_connection_closed(&mut self, peer: PeerId) -> crate::Result<()> {
386        tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
387
388        self.pending_outbound.retain(|_, p| p != &peer);
389
390        let Some(context) = self.peers.remove(&peer) else {
391            tracing::error!(
392                target: LOG_TARGET,
393                ?peer,
394                protocol = %self.protocol,
395                "state mismatch: peer doesn't exist",
396            );
397            debug_assert!(false);
398            return Err(Error::PeerDoesntExist(peer));
399        };
400
401        // clean up all pending state for the peer
402        self.negotiation.remove_outbound(&peer);
403        self.negotiation.remove_inbound(&peer);
404
405        match context.state {
406            // outbound initiated, report open failure to peer
407            PeerState::OutboundInitiated { .. } => {
408                self.event_handle
409                    .report_notification_stream_open_failure(peer, NotificationError::Rejected)
410                    .await;
411            }
412            // substream fully open, report that the notification stream is closed
413            PeerState::Open { shutdown } => {
414                let _ = shutdown.send(());
415            }
416            // if the substream was being validated, user must be notified that the substream is
417            // now considered rejected if they had been made aware of the existence of the pending
418            // connection
419            PeerState::Validating {
420                outbound, inbound, ..
421            } => {
422                match (outbound, inbound) {
423                    // substream was being validated by the protocol when the connection was closed
424                    (OutboundState::Closed, InboundState::Validating { .. }) => {
425                        tracing::debug!(
426                            target: LOG_TARGET,
427                            ?peer,
428                            protocol = %self.protocol,
429                            "connection closed while validation pending",
430                        );
431
432                        self.peers.insert(
433                            peer,
434                            PeerContext {
435                                state: PeerState::ValidationPending {
436                                    state: ConnectionState::Closed,
437                                },
438                            },
439                        );
440                    }
441                    // user either initiated an outbound substream or an outbound substream was
442                    // opened/being opened as a result of an accepted inbound substream but was not
443                    // yet fully open
444                    //
445                    // to have consistent state tracking in the user protocol, substream rejection
446                    // must be reported to the user
447                    (
448                        OutboundState::OutboundInitiated { .. }
449                        | OutboundState::Negotiating
450                        | OutboundState::Open { .. },
451                        _,
452                    ) => {
453                        tracing::debug!(
454                            target: LOG_TARGET,
455                            ?peer,
456                            protocol = %self.protocol,
457                            "connection closed outbound substream under negotiation",
458                        );
459
460                        self.event_handle
461                            .report_notification_stream_open_failure(
462                                peer,
463                                NotificationError::Rejected,
464                            )
465                            .await;
466                    }
467                    (_, _) => {}
468                }
469            }
470            // pending validations must be tracked across connection open/close events
471            PeerState::ValidationPending { .. } => {
472                tracing::debug!(
473                    target: LOG_TARGET,
474                    ?peer,
475                    protocol = %self.protocol,
476                    "validation pending while connection closed",
477                );
478
479                self.peers.insert(
480                    peer,
481                    PeerContext {
482                        state: PeerState::ValidationPending {
483                            state: ConnectionState::Closed,
484                        },
485                    },
486                );
487            }
488            _ => {}
489        }
490
491        Ok(())
492    }
493
494    /// Local node opened a substream to remote node.
495    ///
496    /// The connection can be in three different states:
497    ///   - this is the first substream that was opened and thus the connection was initiated by the
498    ///     local node
499    ///   - this is a response to a previously received inbound substream which the local node
500    ///     accepted and as a result, opened its own substream
501    ///   - local and remote nodes opened substreams at the same time
502    ///
503    /// In the first case, the local node's handshake is sent to remote node and the substream is
504    /// polled in the background until they either send their handshake or close the substream.
505    ///
506    /// For the second case, the connection was initiated by the remote node and the substream was
507    /// accepted by the local node which initiated an outbound substream to the remote node.
508    /// The only valid states for this case are [`InboundState::Open`],
509    /// and [`InboundState::SendingHandshake`] as they imply
510    /// that the inbound substream have been accepted by the local node and this opened outbound
511    /// substream is a result of a valid state transition.
512    ///
513    /// For the third case, if the nodes have opened substreams at the same time, the outbound state
514    /// must be [`OutboundState::OutboundInitiated`] to ascertain that the an outbound substream was
515    /// actually opened. Any other state would be a state mismatch and would mean that the
516    /// connection is opening substreams without the permission of the protocol handler.
517    async fn on_outbound_substream(
518        &mut self,
519        protocol: ProtocolName,
520        fallback: Option<ProtocolName>,
521        peer: PeerId,
522        substream_id: SubstreamId,
523        outbound: Substream,
524    ) -> crate::Result<()> {
525        tracing::debug!(
526            target: LOG_TARGET,
527            ?peer,
528            ?protocol,
529            ?substream_id,
530            "handle outbound substream",
531        );
532
533        // peer must exist since an outbound substream was received from them
534        let Some(context) = self.peers.get_mut(&peer) else {
535            tracing::error!(target: LOG_TARGET, ?peer, "peer doesn't exist for outbound substream");
536            debug_assert!(false);
537            return Err(Error::PeerDoesntExist(peer));
538        };
539
540        let pending_peer = self.pending_outbound.remove(&substream_id);
541
542        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
543            // the connection was initiated by the local node, send handshake to remote and wait to
544            // receive their handshake back
545            PeerState::OutboundInitiated { substream } => {
546                debug_assert!(substream == substream_id);
547                debug_assert!(pending_peer == Some(peer));
548
549                tracing::trace!(
550                    target: LOG_TARGET,
551                    ?peer,
552                    protocol = %self.protocol,
553                    ?fallback,
554                    ?substream_id,
555                    "negotiate outbound protocol",
556                );
557
558                self.negotiation.negotiate_outbound(peer, outbound);
559                context.state = PeerState::Validating {
560                    protocol,
561                    fallback,
562                    inbound: InboundState::Closed,
563                    outbound: OutboundState::Negotiating,
564                    direction: Direction::Outbound,
565                };
566            }
567            PeerState::Validating {
568                protocol,
569                fallback,
570                inbound,
571                direction,
572                outbound: outbound_state,
573            } => {
574                // the inbound substream has been accepted by the local node since the handshake has
575                // been read and the local handshake has either already been sent or
576                // it's in the process of being sent.
577                match inbound {
578                    InboundState::SendingHandshake | InboundState::Open { .. } => {
579                        context.state = PeerState::Validating {
580                            protocol,
581                            fallback,
582                            inbound,
583                            direction,
584                            outbound: OutboundState::Negotiating,
585                        };
586                        self.negotiation.negotiate_outbound(peer, outbound);
587                    }
588                    // nodes have opened substreams at the same time
589                    inbound_state => match outbound_state {
590                        OutboundState::OutboundInitiated { substream } => {
591                            debug_assert!(substream == substream_id);
592
593                            context.state = PeerState::Validating {
594                                protocol,
595                                fallback,
596                                direction,
597                                inbound: inbound_state,
598                                outbound: OutboundState::Negotiating,
599                            };
600                            self.negotiation.negotiate_outbound(peer, outbound);
601                        }
602                        // invalid state: more than one outbound substream has been opened
603                        inner_state => {
604                            tracing::error!(
605                                target: LOG_TARGET,
606                                ?peer,
607                                %protocol,
608                                ?substream_id,
609                                ?inbound_state,
610                                ?inner_state,
611                                "invalid state, expected `OutboundInitiated`",
612                            );
613
614                            let _ = outbound.close().await;
615                            debug_assert!(false);
616                        }
617                    },
618                }
619            }
620            // the connection may have been closed while an outbound substream was pending
621            // if the outbound substream was initiated successfully, close it and reset
622            // `pending_open`
623            PeerState::Closed { pending_open } if pending_open == Some(substream_id) => {
624                let _ = outbound.close().await;
625
626                context.state = PeerState::Closed { pending_open: None };
627            }
628            state => {
629                tracing::error!(
630                    target: LOG_TARGET,
631                    ?peer,
632                    %protocol,
633                    ?substream_id,
634                    ?state,
635                    "invalid state: more than one outbound substream opened",
636                );
637
638                let _ = outbound.close().await;
639                debug_assert!(false);
640            }
641        }
642
643        Ok(())
644    }
645
646    /// Remote opened a substream to local node.
647    ///
648    /// The peer can be in four different states for the inbound substream to be considered valid:
649    ///   - the connection is closed
650    ///   - conneection is open but substream validation from a previous connection is still pending
651    ///   - outbound substream has been opened but not yet acknowledged by the remote peer
652    ///   - outbound substream has been opened and acknowledged by the remote peer and it's being
653    ///     negotiated
654    ///
655    /// If remote opened more than one substream, the new substream is simply discarded.
656    async fn on_inbound_substream(
657        &mut self,
658        protocol: ProtocolName,
659        fallback: Option<ProtocolName>,
660        peer: PeerId,
661        substream: Substream,
662    ) -> crate::Result<()> {
663        // peer must exist since an inbound substream was received from them
664        let Some(context) = self.peers.get_mut(&peer) else {
665            tracing::error!(target: LOG_TARGET, ?peer, "peer doesn't exist for inbound substream");
666            debug_assert!(false);
667            return Err(Error::PeerDoesntExist(peer));
668        };
669
670        tracing::debug!(
671            target: LOG_TARGET,
672            ?peer,
673            %protocol,
674            ?fallback,
675            state = ?context.state,
676            "handle inbound substream",
677        );
678
679        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
680            // inbound substream of a previous connection is still pending validation,
681            // reject any new inbound substreams until an answer is heard from the user
682            state @ PeerState::ValidationPending { .. } => {
683                tracing::debug!(
684                    target: LOG_TARGET,
685                    ?peer,
686                    %protocol,
687                    ?fallback,
688                    ?state,
689                    "validation for previous substream still pending",
690                );
691
692                let _ = substream.close().await;
693                context.state = state;
694            }
695            // outbound substream for previous connection still pending, reject inbound substream
696            // and wait for the outbound substream state to conclude as either succeeded or failed
697            // before accepting any inbound substreams.
698            PeerState::Closed {
699                pending_open: Some(substream_id),
700            } => {
701                tracing::debug!(
702                    target: LOG_TARGET,
703                    ?peer,
704                    protocol = %self.protocol,
705                    "received inbound substream while outbound substream opening, rejecting",
706                );
707                let _ = substream.close().await;
708
709                context.state = PeerState::Closed {
710                    pending_open: Some(substream_id),
711                };
712            }
713            // the peer state is closed so this is a fresh inbound substream.
714            PeerState::Closed { pending_open: None } => {
715                self.negotiation.read_handshake(peer, substream);
716
717                context.state = PeerState::Validating {
718                    protocol,
719                    fallback,
720                    direction: Direction::Inbound,
721                    inbound: InboundState::ReadingHandshake,
722                    outbound: OutboundState::Closed,
723                };
724            }
725            // if the connection is under validation (so an outbound substream has been opened and
726            // it's still pending or under negotiation), the only valid state for the
727            // inbound state is closed as it indicates that there isn't an inbound substream yet for
728            // the remote node duplicate substreams are prohibited.
729            PeerState::Validating {
730                protocol,
731                fallback,
732                outbound,
733                direction,
734                inbound: InboundState::Closed,
735            } => {
736                self.negotiation.read_handshake(peer, substream);
737
738                context.state = PeerState::Validating {
739                    protocol,
740                    fallback,
741                    outbound,
742                    direction,
743                    inbound: InboundState::ReadingHandshake,
744                };
745            }
746            // outbound substream may have been initiated by the local node while a remote node also
747            // opened a substream roughly at the same time
748            PeerState::OutboundInitiated {
749                substream: outbound,
750            } => {
751                self.negotiation.read_handshake(peer, substream);
752
753                context.state = PeerState::Validating {
754                    protocol,
755                    fallback,
756                    direction: Direction::Outbound,
757                    outbound: OutboundState::OutboundInitiated {
758                        substream: outbound,
759                    },
760                    inbound: InboundState::ReadingHandshake,
761                };
762            }
763            // new inbound substream opend while validation for the previous substream was still
764            // pending
765            //
766            // the old substream can be considered dead because remote wouldn't open a new substream
767            // to us unless they had discarded the previous substream.
768            //
769            // set peer state to `ValidationPending` to indicate that the peer is "blocked" until a
770            // validation for the substream is heard, blocking any further activity for
771            // the connection and once the validation is received and in case the
772            // substream is accepted, it will be reported as open failure to to the peer
773            // because the states have gone out of sync.
774            PeerState::Validating {
775                outbound: OutboundState::Closed,
776                inbound:
777                    InboundState::Validating {
778                        inbound: pending_substream,
779                    },
780                ..
781            } => {
782                tracing::debug!(
783                    target: LOG_TARGET,
784                    ?peer,
785                    protocol = %self.protocol,
786                    "remote opened substream while previous was still pending, connection failed",
787                );
788                let _ = substream.close().await;
789                let _ = pending_substream.close().await;
790
791                context.state = PeerState::ValidationPending {
792                    state: ConnectionState::Open,
793                };
794            }
795            // remote opened another inbound substream, close it and otherwise ignore the event
796            // as this is a non-serious protocol violation.
797            state => {
798                tracing::debug!(
799                    target: LOG_TARGET,
800                    ?peer,
801                    %protocol,
802                    ?fallback,
803                    ?state,
804                    "remote opened more than one inbound substreams, discarding",
805                );
806
807                let _ = substream.close().await;
808                context.state = state;
809            }
810        }
811
812        Ok(())
813    }
814
815    /// Failed to open substream to remote node.
816    ///
817    /// If the substream was initiated by the local node, it must be reported that the substream
818    /// failed to open. Otherwise the peer state can silently be converted to `Closed`.
819    async fn on_substream_open_failure(
820        &mut self,
821        substream_id: SubstreamId,
822        error: SubstreamError,
823    ) {
824        tracing::debug!(
825            target: LOG_TARGET,
826            protocol = %self.protocol,
827            ?substream_id,
828            ?error,
829            "failed to open substream"
830        );
831
832        let Some(peer) = self.pending_outbound.remove(&substream_id) else {
833            tracing::warn!(
834                target: LOG_TARGET,
835                protocol = %self.protocol,
836                ?substream_id,
837                "pending outbound substream doesn't exist",
838            );
839            debug_assert!(false);
840            return;
841        };
842
843        // peer must exist since an outbound substream failure was received from them
844        let Some(context) = self.peers.get_mut(&peer) else {
845            tracing::warn!(target: LOG_TARGET, ?peer, "peer doesn't exist");
846            debug_assert!(false);
847            return;
848        };
849
850        match &mut context.state {
851            PeerState::OutboundInitiated { .. } => {
852                context.state = PeerState::Closed { pending_open: None };
853
854                self.event_handle
855                    .report_notification_stream_open_failure(peer, NotificationError::Rejected)
856                    .await;
857            }
858            // if the substream was accepted by the local node and as a result, an outbound
859            // substream was accepted as a result this should not be reported to local node
860            PeerState::Validating { outbound, .. } => {
861                self.negotiation.remove_inbound(&peer);
862                self.negotiation.remove_outbound(&peer);
863
864                let pending_open = match outbound {
865                    OutboundState::Closed => None,
866                    OutboundState::OutboundInitiated { substream } => {
867                        self.event_handle
868                            .report_notification_stream_open_failure(
869                                peer,
870                                NotificationError::Rejected,
871                            )
872                            .await;
873
874                        Some(*substream)
875                    }
876                    OutboundState::Negotiating | OutboundState::Open { .. } => {
877                        self.event_handle
878                            .report_notification_stream_open_failure(
879                                peer,
880                                NotificationError::Rejected,
881                            )
882                            .await;
883
884                        None
885                    }
886                };
887
888                context.state = PeerState::Closed { pending_open };
889            }
890            PeerState::Closed { pending_open } => {
891                tracing::debug!(
892                    target: LOG_TARGET,
893                    protocol = %self.protocol,
894                    ?substream_id,
895                    "substream open failure for a closed connection",
896                );
897                debug_assert_eq!(pending_open, &Some(substream_id));
898                context.state = PeerState::Closed { pending_open: None };
899            }
900            state => {
901                tracing::warn!(
902                    target: LOG_TARGET,
903                    protocol = %self.protocol,
904                    ?substream_id,
905                    ?state,
906                    "invalid state for outbound substream open failure",
907                );
908                context.state = PeerState::Closed { pending_open: None };
909                debug_assert!(false);
910            }
911        }
912    }
913
914    /// Open substream to remote `peer`.
915    ///
916    /// Outbound substream can opened only if the `PeerState` is `Closed`.
917    /// By forcing the substream to be opened only if the state is currently closed,
918    /// `NotificationProtocol` can enfore more predictable state transitions.
919    ///
920    /// Other states either imply an invalid state transition ([`PeerState::Open`]) or that an
921    /// inbound substream has already been received and its currently being validated by the user.
922    async fn on_open_substream(&mut self, peer: PeerId) -> crate::Result<()> {
923        tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "open substream");
924
925        let Some(context) = self.peers.get_mut(&peer) else {
926            if !self.should_dial {
927                tracing::debug!(
928                    target: LOG_TARGET,
929                    ?peer,
930                    protocol = %self.protocol,
931                    "connection to peer not open and dialing disabled",
932                );
933
934                self.event_handle
935                    .report_notification_stream_open_failure(peer, NotificationError::DialFailure)
936                    .await;
937                return Ok(());
938            }
939
940            match self.service.dial(&peer) {
941                Err(error) => {
942                    tracing::debug!(
943                        target: LOG_TARGET,
944                        ?peer,
945                        protocol = %self.protocol,
946                        ?error,
947                        "failed to dial peer",
948                    );
949
950                    self.event_handle
951                        .report_notification_stream_open_failure(
952                            peer,
953                            NotificationError::DialFailure,
954                        )
955                        .await;
956
957                    return Err(error.into());
958                }
959                Ok(()) => {
960                    tracing::trace!(
961                        target: LOG_TARGET,
962                        ?peer,
963                        protocol = %self.protocol,
964                        "started to dial peer",
965                    );
966
967                    self.peers.insert(
968                        peer,
969                        PeerContext {
970                            state: PeerState::Dialing,
971                        },
972                    );
973                    return Ok(());
974                }
975            }
976        };
977
978        match context.state {
979            // protocol can only request a new outbound substream to be opened if the state is
980            // `Closed` other states imply that it's already open
981            PeerState::Closed {
982                pending_open: Some(substream_id),
983            } => {
984                tracing::trace!(
985                    target: LOG_TARGET,
986                    ?peer,
987                    protocol = %self.protocol,
988                    ?substream_id,
989                    "outbound substream opening, reusing pending open substream",
990                );
991
992                self.pending_outbound.insert(substream_id, peer);
993                context.state = PeerState::OutboundInitiated {
994                    substream: substream_id,
995                };
996            }
997            PeerState::Closed { .. } => match self.service.open_substream(peer) {
998                Ok(substream_id) => {
999                    tracing::trace!(
1000                        target: LOG_TARGET,
1001                        ?peer,
1002                        protocol = %self.protocol,
1003                        ?substream_id,
1004                        "outbound substream opening",
1005                    );
1006
1007                    self.pending_outbound.insert(substream_id, peer);
1008                    context.state = PeerState::OutboundInitiated {
1009                        substream: substream_id,
1010                    };
1011                }
1012                Err(error) => {
1013                    tracing::debug!(
1014                        target: LOG_TARGET,
1015                        ?peer,
1016                        protocol = %self.protocol,
1017                        ?error,
1018                        "failed to open substream",
1019                    );
1020
1021                    self.event_handle
1022                        .report_notification_stream_open_failure(
1023                            peer,
1024                            NotificationError::NoConnection,
1025                        )
1026                        .await;
1027                    context.state = PeerState::Closed { pending_open: None };
1028                }
1029            },
1030            // while a validation is pending for an inbound substream, user is not allowed to open
1031            // any outbound substreams until the old inbond substream is either accepted or rejected
1032            PeerState::ValidationPending { .. } => {
1033                tracing::trace!(
1034                    target: LOG_TARGET,
1035                    ?peer,
1036                    protocol = %self.protocol,
1037                    "validation still pending, rejecting outbound substream request",
1038                );
1039
1040                self.event_handle
1041                    .report_notification_stream_open_failure(
1042                        peer,
1043                        NotificationError::ValidationPending,
1044                    )
1045                    .await;
1046            }
1047            _ => {}
1048        }
1049
1050        Ok(())
1051    }
1052
1053    /// Close substream to remote `peer`.
1054    ///
1055    /// This function can only be called if the substream was actually open, any other state is
1056    /// unreachable as the user is unable to emit this command to [`NotificationProtocol`] unless
1057    /// the connection has been fully opened.
1058    async fn on_close_substream(&mut self, peer: PeerId) {
1059        tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "close substream");
1060
1061        let Some(context) = self.peers.get_mut(&peer) else {
1062            tracing::debug!(target: LOG_TARGET, ?peer, "peer doesn't exist");
1063            return;
1064        };
1065
1066        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1067            PeerState::Open { shutdown } => {
1068                let _ = shutdown.send(());
1069
1070                context.state = PeerState::Closed { pending_open: None };
1071            }
1072            state => {
1073                tracing::debug!(
1074                    target: LOG_TARGET,
1075                    ?peer,
1076                    protocol = %self.protocol,
1077                    ?state,
1078                    "substream already closed",
1079                );
1080                context.state = state;
1081            }
1082        }
1083    }
1084
1085    /// Handle validation result.
1086    ///
1087    /// The validation result binary (accept/reject). If the node is rejected, the substreams are
1088    /// discarded and state is set to `PeerState::Closed`. If there was an outbound substream in
1089    /// progress while the connection was rejected by the user, the oubound state is discarded,
1090    /// except for the substream ID of the substream which is kept for later use, in case the
1091    /// substream happens to open.
1092    ///
1093    /// If the node is accepted and there is no outbound substream to them open yet, a new substream
1094    /// is opened and once it opens, the local handshake will be sent to the remote peer and if
1095    /// they also accept the substream the connection is considered fully open.
1096    async fn on_validation_result(
1097        &mut self,
1098        peer: PeerId,
1099        result: ValidationResult,
1100    ) -> crate::Result<()> {
1101        tracing::trace!(
1102            target: LOG_TARGET,
1103            ?peer,
1104            protocol = %self.protocol,
1105            ?result,
1106            "handle validation result",
1107        );
1108
1109        let Some(context) = self.peers.get_mut(&peer) else {
1110            tracing::debug!(target: LOG_TARGET, ?peer, "peer doesn't exist");
1111            return Err(Error::PeerDoesntExist(peer));
1112        };
1113
1114        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1115            PeerState::Validating {
1116                protocol,
1117                fallback,
1118                outbound,
1119                direction,
1120                inbound: InboundState::Validating { inbound },
1121            } => match result {
1122                // substream was rejected by the local node, if an outbound substream was under
1123                // negotation, discard that data and if an outbound substream was
1124                // initiated, save the `SubstreamId` of that substream and later if the substream
1125                // is opened, the state can be corrected to `pending_open: None`.
1126                ValidationResult::Reject => {
1127                    let _ = inbound.close().await;
1128                    self.negotiation.remove_outbound(&peer);
1129                    self.negotiation.remove_inbound(&peer);
1130                    context.state = PeerState::Closed {
1131                        pending_open: outbound.pending_open(),
1132                    };
1133
1134                    Ok(())
1135                }
1136                ValidationResult::Accept => match outbound {
1137                    // no outbound substream exists so initiate a new substream open and send the
1138                    // local handshake to remote node, indicating that the
1139                    // connection was accepted by the local node
1140                    OutboundState::Closed => match self.service.open_substream(peer) {
1141                        Ok(substream) => {
1142                            self.negotiation.send_handshake(peer, inbound);
1143                            self.pending_outbound.insert(substream, peer);
1144
1145                            context.state = PeerState::Validating {
1146                                protocol,
1147                                fallback,
1148                                direction,
1149                                inbound: InboundState::SendingHandshake,
1150                                outbound: OutboundState::OutboundInitiated { substream },
1151                            };
1152                            Ok(())
1153                        }
1154                        // failed to open outbound substream after accepting an inbound substream
1155                        //
1156                        // since the user was notified of this substream and they accepted it,
1157                        // they expecting some kind of answer (open success/failure).
1158                        //
1159                        // report to user that the substream failed to open so they can track the
1160                        // state transitions of the peer correctly
1161                        Err(error) => {
1162                            tracing::trace!(
1163                                target: LOG_TARGET,
1164                                ?peer,
1165                                protocol = %self.protocol,
1166                                ?result,
1167                                ?error,
1168                                "failed to open outbound substream for accepted substream",
1169                            );
1170
1171                            let _ = inbound.close().await;
1172                            context.state = PeerState::Closed { pending_open: None };
1173
1174                            self.event_handle
1175                                .report_notification_stream_open_failure(
1176                                    peer,
1177                                    NotificationError::Rejected,
1178                                )
1179                                .await;
1180
1181                            Err(error.into())
1182                        }
1183                    },
1184                    // here the state is one of `OutboundState::{OutboundInitiated, Negotiating,
1185                    // Open}` so that state can be safely ignored and all that
1186                    // has to be done is to send the local handshake to remote
1187                    // node to indicate that the connection was accepted.
1188                    _ => {
1189                        self.negotiation.send_handshake(peer, inbound);
1190
1191                        context.state = PeerState::Validating {
1192                            protocol,
1193                            fallback,
1194                            direction,
1195                            inbound: InboundState::SendingHandshake,
1196                            outbound,
1197                        };
1198                        Ok(())
1199                    }
1200                },
1201            },
1202            // validation result received for an inbound substream which is now considered dead
1203            // because while the substream was being validated, the connection had closed.
1204            //
1205            // if the substream was rejected and there is no active connection to the peer,
1206            // just remove the peer from `peers` without informing user
1207            //
1208            // if the substream was accepted, the user must be informed that the substream failed to
1209            // open. Depending on whether there is currently a connection open to the peer, either
1210            // report `Rejected`/`NoConnection` and let the user try again.
1211            PeerState::ValidationPending { state } => {
1212                if let Some(error) = match state {
1213                    ConnectionState::Open => {
1214                        context.state = PeerState::Closed { pending_open: None };
1215
1216                        std::matches!(result, ValidationResult::Accept)
1217                            .then_some(NotificationError::Rejected)
1218                    }
1219                    ConnectionState::Closed => {
1220                        self.peers.remove(&peer);
1221
1222                        std::matches!(result, ValidationResult::Accept)
1223                            .then_some(NotificationError::NoConnection)
1224                    }
1225                } {
1226                    self.event_handle.report_notification_stream_open_failure(peer, error).await;
1227                }
1228
1229                Ok(())
1230            }
1231            // if the user incorrectly send a validation result for a peer that doesn't require
1232            // validation, set state back to what it was and ignore the event
1233            //
1234            // the user protocol may send a stale validation result not because of a programming
1235            // error but because it has a backlock of unhandled events, with one event potentially
1236            // nullifying the need for substream validation, and is just temporarily out of sync
1237            // with `NotificationProtocol`
1238            state => {
1239                tracing::debug!(
1240                    target: LOG_TARGET,
1241                    ?peer,
1242                    protocol = %self.protocol,
1243                    ?state,
1244                    "validation result received for peer that doesn't require validation",
1245                );
1246
1247                context.state = state;
1248                Ok(())
1249            }
1250        }
1251    }
1252
1253    /// Handle handshake event.
1254    ///
1255    /// There are three different handshake event types:
1256    ///   - outbound substream negotiated
1257    ///   - inbound substream negotiated
1258    ///   - substream negotiation error
1259    ///
1260    /// Neither outbound nor inbound substream negotiated automatically means that the connection is
1261    /// considered open as both substreams must be fully negotiated for that to be the case. That is
1262    /// why the peer state for inbound and outbound are set separately and at the end of the
1263    /// function is the collective state of the substreams checked and if both substreams are
1264    /// negotiated, the user informed that the connection is open.
1265    ///
1266    /// If the negotiation fails, the user may have to be informed of that. Outbound substream
1267    /// failure always results in user getting notified since the existence of an outbound substream
1268    /// means that the user has either initiated an outbound substreams or has accepted an inbound
1269    /// substreams, resulting in an outbound substreams.
1270    ///
1271    /// Negotiation failure for inbound substreams which are in the state
1272    /// [`InboundState::ReadingHandshake`] don't result in any notification because while the
1273    /// handshake is being read from the substream, the user is oblivious to the fact that an
1274    /// inbound substream has even been received.
1275    async fn on_handshake_event(&mut self, peer: PeerId, event: HandshakeEvent) {
1276        let Some(context) = self.peers.get_mut(&peer) else {
1277            tracing::error!(target: LOG_TARGET, "invalid state: negotiation event received but peer doesn't exist");
1278            debug_assert!(false);
1279            return;
1280        };
1281
1282        tracing::trace!(
1283            target: LOG_TARGET,
1284            ?peer,
1285            protocol = %self.protocol,
1286            ?event,
1287            "handle handshake event",
1288        );
1289
1290        match event {
1291            // either an inbound or outbound substream has been negotiated successfully
1292            HandshakeEvent::Negotiated {
1293                peer,
1294                handshake,
1295                substream,
1296                direction,
1297            } => match direction {
1298                // outbound substream was negotiated, the only valid state for peer is `Validating`
1299                // and only valid state for `OutboundState` is `Negotiating`
1300                negotiation::Direction::Outbound => {
1301                    self.negotiation.remove_outbound(&peer);
1302
1303                    match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1304                        PeerState::Validating {
1305                            protocol,
1306                            fallback,
1307                            direction,
1308                            outbound: OutboundState::Negotiating,
1309                            inbound,
1310                        } => {
1311                            context.state = PeerState::Validating {
1312                                protocol,
1313                                fallback,
1314                                direction,
1315                                outbound: OutboundState::Open {
1316                                    handshake,
1317                                    outbound: substream,
1318                                },
1319                                inbound,
1320                            };
1321                        }
1322                        state => {
1323                            tracing::warn!(
1324                                target: LOG_TARGET,
1325                                ?peer,
1326                                ?state,
1327                                "outbound substream negotiated but peer has invalid state",
1328                            );
1329                            debug_assert!(false);
1330                        }
1331                    }
1332                }
1333                // inbound negotiation event completed
1334                //
1335                // the negotiation event can be on of two different types:
1336                //   - remote handshake was read from the substream
1337                //   - local handshake has been sent to remote node
1338                //
1339                // For the first case, the substream has to be validated by the local node.
1340                // This means reporting the protocol name, potential negotiated fallback and the
1341                // handshake. Local node will then either accept or reject the substream which is
1342                // handled by [`NotificationProtocol::on_validation_result()`]. Compared to
1343                // Substrate, litep2p requires both peers to validate the inbound handshake to allow
1344                // more complex connection validation. If this is not necessary and the protocol
1345                // wishes to auto-accept the inbound substreams that are a result of
1346                // an outbound substream already accepted by the remote node, the
1347                // substream validation is skipped and the local handshake is sent
1348                // right away.
1349                //
1350                // For the second case, the local handshake was sent to remote node successfully and
1351                // the inbound substream is considered open and if the outbound
1352                // substream is open as well, the connection is fully open.
1353                //
1354                // Only valid states for [`InboundState`] are [`InboundState::ReadingHandshake`] and
1355                // [`InboundState::SendingHandshake`] because otherwise the inbound
1356                // substream cannot be in [`HandshakeService`](super::negotiation::HandshakeService)
1357                // unless there is a logic bug in the state machine.
1358                negotiation::Direction::Inbound => {
1359                    self.negotiation.remove_inbound(&peer);
1360
1361                    match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1362                        PeerState::Validating {
1363                            protocol,
1364                            fallback,
1365                            direction,
1366                            outbound,
1367                            inbound: InboundState::ReadingHandshake,
1368                        } => {
1369                            if !std::matches!(outbound, OutboundState::Closed) && self.auto_accept {
1370                                tracing::trace!(
1371                                    target: LOG_TARGET,
1372                                    ?peer,
1373                                    %protocol,
1374                                    ?fallback,
1375                                    ?direction,
1376                                    ?outbound,
1377                                    "auto-accept inbound substream",
1378                                );
1379
1380                                self.negotiation.send_handshake(peer, substream);
1381                                context.state = PeerState::Validating {
1382                                    protocol,
1383                                    fallback,
1384                                    direction,
1385                                    inbound: InboundState::SendingHandshake,
1386                                    outbound,
1387                                };
1388
1389                                return;
1390                            }
1391
1392                            tracing::trace!(
1393                                target: LOG_TARGET,
1394                                ?peer,
1395                                %protocol,
1396                                ?fallback,
1397                                ?outbound,
1398                                "send inbound protocol for validation",
1399                            );
1400
1401                            context.state = PeerState::Validating {
1402                                protocol: protocol.clone(),
1403                                fallback: fallback.clone(),
1404                                inbound: InboundState::Validating { inbound: substream },
1405                                outbound,
1406                                direction,
1407                            };
1408
1409                            let (tx, rx) = oneshot::channel();
1410                            self.pending_validations.push(Box::pin(async move {
1411                                match rx.await {
1412                                    Ok(ValidationResult::Accept) =>
1413                                        (peer, ValidationResult::Accept),
1414                                    _ => (peer, ValidationResult::Reject),
1415                                }
1416                            }));
1417
1418                            self.event_handle
1419                                .report_inbound_substream(protocol, fallback, peer, handshake, tx)
1420                                .await;
1421                        }
1422                        PeerState::Validating {
1423                            protocol,
1424                            fallback,
1425                            direction,
1426                            inbound: InboundState::SendingHandshake,
1427                            outbound,
1428                        } => {
1429                            tracing::trace!(
1430                                target: LOG_TARGET,
1431                                ?peer,
1432                                %protocol,
1433                                ?fallback,
1434                                "inbound substream negotiated, waiting for outbound substream to complete",
1435                            );
1436
1437                            context.state = PeerState::Validating {
1438                                protocol: protocol.clone(),
1439                                fallback: fallback.clone(),
1440                                inbound: InboundState::Open { inbound: substream },
1441                                outbound,
1442                                direction,
1443                            };
1444                        }
1445                        _state => debug_assert!(false),
1446                    }
1447                }
1448            },
1449            // error occurred during negotiation, eitehr for inbound or outbound substream
1450            // user is notified of the error only if they've either initiated an outbound substream
1451            // or if they accepted an inbound substream and as a result initiated an outbound
1452            // substream.
1453            HandshakeEvent::NegotiationError { peer, direction } => {
1454                tracing::debug!(
1455                    target: LOG_TARGET,
1456                    ?peer,
1457                    protocol = %self.protocol,
1458                    ?direction,
1459                    state = ?context.state,
1460                    "failed to negotiate substream",
1461                );
1462                let _ = self.negotiation.remove_outbound(&peer);
1463                let _ = self.negotiation.remove_inbound(&peer);
1464
1465                // if an outbound substream had been initiated (whatever its state is), it means
1466                // that the user knows about the connection and must be notified that it failed to
1467                // negotiate.
1468                match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1469                    PeerState::Validating { outbound, .. } => {
1470                        context.state = PeerState::Closed {
1471                            pending_open: outbound.pending_open(),
1472                        };
1473
1474                        // notify user if the outbound substream is not considered closed
1475                        if !std::matches!(outbound, OutboundState::Closed) {
1476                            return self
1477                                .event_handle
1478                                .report_notification_stream_open_failure(
1479                                    peer,
1480                                    NotificationError::Rejected,
1481                                )
1482                                .await;
1483                        }
1484                    }
1485                    _state => debug_assert!(false),
1486                }
1487            }
1488        }
1489
1490        // if both inbound and outbound substreams are considered open, notify the user that
1491        // a notification stream has been opened and set up for sending and receiving
1492        // notifications to and from remote node
1493        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1494            PeerState::Validating {
1495                protocol,
1496                fallback,
1497                direction,
1498                outbound:
1499                    OutboundState::Open {
1500                        handshake,
1501                        outbound,
1502                    },
1503                inbound: InboundState::Open { inbound },
1504            } => {
1505                tracing::debug!(
1506                    target: LOG_TARGET,
1507                    ?peer,
1508                    %protocol,
1509                    ?fallback,
1510                    "notification stream opened",
1511                );
1512
1513                let (async_tx, async_rx) = channel(self.async_channel_size);
1514                let (sync_tx, sync_rx) = channel(self.sync_channel_size);
1515                let sink = NotificationSink::new(peer, sync_tx, async_tx);
1516
1517                // start connection handler for the peer which only deals with sending/receiving
1518                // notifications
1519                //
1520                // the connection handler must be started only after the newly opened notification
1521                // substream is reported to user because the connection handler
1522                // might exit immediately after being started if remote closed the connection.
1523                //
1524                // if the order of events (open & close) is not ensured to be correct, the code
1525                // handling the connectivity logic on the `NotificationHandle` side
1526                // might get confused about the current state of the connection.
1527                let shutdown_tx = self.shutdown_tx.clone();
1528                let (connection, shutdown) = Connection::new(
1529                    peer,
1530                    inbound,
1531                    outbound,
1532                    self.event_handle.clone(),
1533                    shutdown_tx.clone(),
1534                    self.notif_tx.clone(),
1535                    async_rx,
1536                    sync_rx,
1537                );
1538
1539                context.state = PeerState::Open { shutdown };
1540                self.event_handle
1541                    .report_notification_stream_opened(
1542                        protocol, fallback, direction, peer, handshake, sink,
1543                    )
1544                    .await;
1545
1546                self.executor.run(Box::pin(async move {
1547                    connection.start().await;
1548                }));
1549            }
1550            state => {
1551                tracing::trace!(
1552                    target: LOG_TARGET,
1553                    ?peer,
1554                    protocol = %self.protocol,
1555                    ?state,
1556                    "validation for substream still pending",
1557                );
1558                self.timers.push(Box::pin(async move {
1559                    futures_timer::Delay::new(Duration::from_secs(5)).await;
1560                    peer
1561                }));
1562
1563                context.state = state;
1564            }
1565        }
1566    }
1567
1568    /// Handle dial failure.
1569    async fn on_dial_failure(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
1570        tracing::trace!(
1571            target: LOG_TARGET,
1572            ?peer,
1573            protocol = %self.protocol,
1574            ?addresses,
1575            "handle dial failure",
1576        );
1577
1578        let Some(context) = self.peers.remove(&peer) else {
1579            tracing::trace!(
1580                target: LOG_TARGET,
1581                ?peer,
1582                protocol = %self.protocol,
1583                ?addresses,
1584                "dial failure for an unknown peer",
1585            );
1586            return;
1587        };
1588
1589        match context.state {
1590            PeerState::Dialing => {
1591                tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, ?addresses, "failed to dial peer");
1592                self.event_handle
1593                    .report_notification_stream_open_failure(peer, NotificationError::DialFailure)
1594                    .await;
1595            }
1596            state => {
1597                tracing::trace!(
1598                    target: LOG_TARGET,
1599                    ?peer,
1600                    protocol = %self.protocol,
1601                    ?state,
1602                    "dial failure for peer that's not being dialed",
1603                );
1604                self.peers.insert(peer, PeerContext { state });
1605            }
1606        }
1607    }
1608
1609    /// Handle next notification event.
1610    ///
1611    /// Returns `true` when the user command stream was dropped.
1612    async fn next_event(&mut self) -> bool {
1613        // biased select is used because the substream events must be prioritized above other events
1614        // that is because a closed substream is detected by either `substreams` or `negotiation`
1615        // and if that event is not handled with priority but, e.g., inbound substream is
1616        // handled before, it can create a situation where the state machine gets confused
1617        // about the peer's state.
1618        tokio::select! {
1619            biased;
1620
1621            event = self.negotiation.next(), if !self.negotiation.is_empty() => {
1622                if let Some((peer, event)) = event {
1623                    self.on_handshake_event(peer, event).await;
1624                } else {
1625                    tracing::error!(target: LOG_TARGET, "`HandshakeService` expected to return `Some(..)`");
1626                    debug_assert!(false);
1627                };
1628            }
1629            event = self.shutdown_rx.recv() => match event {
1630                None => (),
1631                Some(peer) => {
1632                    if let Some(context) = self.peers.get_mut(&peer) {
1633                        tracing::trace!(
1634                            target: LOG_TARGET,
1635                            ?peer,
1636                            protocol = %self.protocol,
1637                            "notification stream to peer closed",
1638                        );
1639                        context.state = PeerState::Closed { pending_open: None };
1640                    }
1641                }
1642            },
1643            // TODO: https://github.com/paritytech/litep2p/issues/338 this could be combined with `Negotiation`
1644            peer = self.timers.next(), if !self.timers.is_empty() => match peer {
1645                Some(peer) => {
1646                    match self.peers.get_mut(&peer) {
1647                        Some(context) => match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1648                            PeerState::Validating {
1649                                outbound: OutboundState::Open { outbound, .. },
1650                                inbound: InboundState::Closed,
1651                                ..
1652                            } => {
1653                                tracing::debug!(
1654                                    target: LOG_TARGET,
1655                                    ?peer,
1656                                    protocol = %self.protocol,
1657                                    "peer didn't answer in 10 seconds, canceling substream and closing connection",
1658                                );
1659                                context.state = PeerState::Closed { pending_open: None };
1660
1661                                let _ = outbound.close().await;
1662                                self.event_handle
1663                                    .report_notification_stream_open_failure(peer, NotificationError::Rejected)
1664                                    .await;
1665
1666                                // NOTE: this is used to work around an issue in Substrate where the protocol
1667                                // is not notified if an inbound substream is closed. That indicates that remote
1668                                // wishes the close the connection but `Notifications` still keeps the substream state
1669                                // as `Open` until the outbound substream is closed (even though the outbound substream
1670                                // is also closed at that point). This causes a further issue: inbound substreams
1671                                // are automatically opened when state is `Open`, even if the inbound substream belongs
1672                                // to a new "connection" (pair of substreams).
1673                                //
1674                                // basically what happens (from Substrate's PoV) is there are pair of substreams (`inbound1`, `outbound1`),
1675                                // litep2p closes both substreams so both `inbound1` and outbound1 become non-readable/writable.
1676                                // Substrate doesn't detect this an instead only marks `inbound1` is closed while still keeping
1677                                // the (now-closed) `outbound1` active and it will be detected closed only when Substrate tries to
1678                                // write something into that substream. If now litep2p tries to open a new connection to Substrate,
1679                                // the outbound substream from litep2p's PoV will be automatically accepted (https://github.com/paritytech/polkadot-sdk/blob/59b2661444de2a25f2125a831bd786035a9fac4b/substrate/client/network/src/protocol/notifications/handler.rs#L544-L556)
1680                                // but since Substrate thinks `outbound1` is still active, it won't open a new outbound substream
1681                                // and it ends up having (`inbound2`, `outbound1`) as its pair of substreams which doens't make sense.
1682                                //
1683                                // since litep2p is expecting to receive an inbound substream from Substrate and never receives it,
1684                                // it basically can't make progress with the substream open request because litep2p can't force Substrate
1685                                // to detect that `outbound1` is closed. Easiest (and very hacky at the same time) way to reset the substream
1686                                // state is to close the connection. This is not an appropriate way to fix the issue and causes issues with,
1687                                // e.g., smoldot which at the time of writing this doesn't support the transaction protocol. The only way to fix
1688                                // this cleanly is to make Substrate detect closed substreams correctly.
1689                                if let Err(error) = self.service.force_close(peer) {
1690                                    tracing::debug!(
1691                                        target: LOG_TARGET,
1692                                        ?peer,
1693                                        protocol = %self.protocol,
1694                                        ?error,
1695                                        "failed to force close connection",
1696                                    );
1697                                }
1698                            }
1699                            state => {
1700                                tracing::trace!(
1701                                    target: LOG_TARGET,
1702                                    ?peer,
1703                                    protocol = %self.protocol,
1704                                    ?state,
1705                                    "ignore expired timer for peer",
1706                                );
1707                                context.state = state;
1708                            }
1709                        }
1710                        None => tracing::debug!(
1711                            target: LOG_TARGET,
1712                            ?peer,
1713                            protocol = %self.protocol,
1714                            "peer doesn't exist anymore",
1715                        ),
1716                    }
1717                }
1718                None => (),
1719            },
1720            event = self.service.next() => match event {
1721                Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
1722                    if let Err(error) = self.on_connection_established(peer).await {
1723                        tracing::debug!(
1724                            target: LOG_TARGET,
1725                            ?peer,
1726                            ?error,
1727                            "failed to register peer",
1728                        );
1729                    }
1730                }
1731                Some(TransportEvent::ConnectionClosed { peer }) => {
1732                    if let Err(error) = self.on_connection_closed(peer).await {
1733                        tracing::debug!(
1734                            target: LOG_TARGET,
1735                            ?peer,
1736                            ?error,
1737                            "failed to disconnect peer",
1738                        );
1739                    }
1740                }
1741                Some(TransportEvent::SubstreamOpened {
1742                    peer,
1743                    substream,
1744                    direction,
1745                    protocol,
1746                    fallback,
1747                }) => match direction {
1748                    protocol::Direction::Inbound => {
1749                        if let Err(error) = self.on_inbound_substream(protocol, fallback, peer, substream).await {
1750                            tracing::debug!(
1751                                target: LOG_TARGET,
1752                                ?peer,
1753                                ?error,
1754                                "failed to handle inbound substream",
1755                            );
1756                        }
1757                    }
1758                    protocol::Direction::Outbound(substream_id) => {
1759                        if let Err(error) = self
1760                            .on_outbound_substream(protocol, fallback, peer, substream_id, substream)
1761                            .await
1762                        {
1763                            tracing::debug!(
1764                                target: LOG_TARGET,
1765                                ?peer,
1766                                ?error,
1767                                "failed to handle outbound substream",
1768                            );
1769                        }
1770                    }
1771                },
1772                Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
1773                    self.on_substream_open_failure(substream, error).await;
1774                }
1775                Some(TransportEvent::DialFailure { peer, addresses }) => self.on_dial_failure(peer, addresses).await,
1776                None => (),
1777            },
1778            result = self.pending_validations.select_next_some(), if !self.pending_validations.is_empty() => {
1779                if let Err(error) = self.on_validation_result(result.0, result.1).await {
1780                    tracing::debug!(
1781                        target: LOG_TARGET,
1782                        peer = ?result.0,
1783                        result = ?result.1,
1784                        ?error,
1785                        "failed to handle validation result",
1786                    );
1787                }
1788            }
1789
1790            // User commands.
1791            command = self.command_rx.recv() => match command {
1792                None => {
1793                    tracing::debug!(
1794                        target: LOG_TARGET,
1795                        protocol = %self.protocol,
1796                        "user protocol has exited, exiting"
1797                    );
1798
1799                    self.service.unregister_protocol();
1800
1801                    return true;
1802                }
1803                Some(command) => match command {
1804                    NotificationCommand::OpenSubstream { peers } => {
1805                        for peer in peers {
1806                            if let Err(error) = self.on_open_substream(peer).await {
1807                                tracing::debug!(
1808                                    target: LOG_TARGET,
1809                                    ?peer,
1810                                    ?error,
1811                                    "failed to open substream",
1812                                );
1813                            }
1814                        }
1815                    }
1816                    NotificationCommand::CloseSubstream { peers } => {
1817                        for peer in peers {
1818                            self.on_close_substream(peer).await;
1819                        }
1820                    }
1821                    NotificationCommand::ForceClose { peer } => {
1822                        let _ = self.service.force_close(peer);
1823                    }
1824                    #[cfg(feature = "fuzz")]
1825                    NotificationCommand::SendNotification{ .. } => unreachable!()
1826                }
1827            },
1828        }
1829
1830        false
1831    }
1832
1833    /// Start [`NotificationProtocol`] event loop.
1834    pub(crate) async fn run(mut self) {
1835        tracing::debug!(target: LOG_TARGET, "starting notification event loop");
1836
1837        while !self.next_event().await {}
1838    }
1839}