litep2p/protocol/notification/
mod.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Notification protocol implementation.
22
23use crate::{
24    error::{Error, SubstreamError},
25    executor::Executor,
26    protocol::{
27        self,
28        notification::{
29            connection::Connection,
30            handle::NotificationEventHandle,
31            negotiation::{HandshakeEvent, HandshakeService},
32            types::NotificationCommand,
33        },
34        TransportEvent, TransportService,
35    },
36    substream::Substream,
37    types::{protocol::ProtocolName, SubstreamId},
38    PeerId, DEFAULT_CHANNEL_SIZE,
39};
40
41use bytes::BytesMut;
42use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
43use multiaddr::Multiaddr;
44use tokio::sync::{
45    mpsc::{channel, Receiver, Sender},
46    oneshot,
47};
48
49use std::{collections::HashMap, sync::Arc, time::Duration};
50
51pub use config::{Config, ConfigBuilder};
52pub use handle::{NotificationHandle, NotificationSink};
53pub use types::{Direction, NotificationError, NotificationEvent, ValidationResult};
54
55mod config;
56mod connection;
57mod handle;
58mod negotiation;
59mod types;
60
61#[cfg(test)]
62mod tests;
63
64/// Logging target for the file.
65const LOG_TARGET: &str = "litep2p::notification";
66
67/// Connection state.
68///
69/// Used to track transport level connectivity state when there is a pending validation.
70/// See [`PeerState::PendingValidation.`] for more details.
71#[derive(Debug, PartialEq, Eq)]
72enum ConnectionState {
73    /// There is a active, transport-level connection open to the peer.
74    Open,
75
76    /// There is no transport-level connection open to the peer.
77    Closed,
78}
79
80/// Inbound substream state.
81#[derive(Debug)]
82enum InboundState {
83    /// Substream is closed.
84    Closed,
85
86    /// Handshake is being read from the remote node.
87    ReadingHandshake,
88
89    /// Substream and its handshake are being validated by the user protocol.
90    Validating {
91        /// Inbound substream.
92        inbound: Substream,
93    },
94
95    /// Handshake is being sent to the remote node.
96    SendingHandshake,
97
98    /// Substream is open.
99    Open {
100        /// Inbound substream.
101        inbound: Substream,
102    },
103}
104
105/// Outbound substream state.
106#[derive(Debug)]
107enum OutboundState {
108    /// Substream is closed.
109    Closed,
110
111    /// Outbound substream initiated.
112    OutboundInitiated {
113        /// Substream ID.
114        substream: SubstreamId,
115    },
116
117    /// Substream is in the state of being negotiated.
118    ///
119    /// This process entails sending local node's handshake and reading back the remote node's
120    /// handshake if they've accepted the substream or detecting that the substream was closed
121    /// in case the substream was rejected.
122    Negotiating,
123
124    /// Substream is open.
125    Open {
126        /// Received handshake.
127        handshake: Vec<u8>,
128
129        /// Outbound substream.
130        outbound: Substream,
131    },
132}
133
134impl OutboundState {
135    /// Get pending outboud substream ID, if it exists.
136    fn pending_open(&self) -> Option<SubstreamId> {
137        match &self {
138            OutboundState::OutboundInitiated { substream } => Some(*substream),
139            _ => None,
140        }
141    }
142}
143
144#[derive(Debug)]
145enum PeerState {
146    /// Peer state is poisoned due to invalid state transition.
147    Poisoned,
148
149    /// Validation for an inbound substream is still pending.
150    ///
151    /// In order to enforce valid state transitions, `NotificationProtocol` keeps track of pending
152    /// validations across connectivity events (open/closed) and enforces that no activity happens
153    /// for any peer that is still awaiting validation for their inbound substream.
154    ///
155    /// If connection closes while the substream is being validated, instead of removing peer from
156    /// `peers`, the peer state is set as `ValidationPending` which indicates to the state machine
157    /// that a response for a inbound substream is pending validation. The substream itself will be
158    /// dead by the time validation is received if the peer state is `ValidationPending` since the
159    /// substream was part of a previous, now-closed substream but this state allows
160    /// `NotificationProtocol` to enforce correct state transitions by, e.g., rejecting new inbound
161    /// substream while a previous substream is still being validated or rejecting outbound
162    /// substreams on new connections if that same condition holds.
163    ValidationPending {
164        /// What is current connectivity state of the peer.
165        ///
166        /// If `state` is `ConnectionState::Closed` when the validation is finally received, peer
167        /// is removed from `peer` and if the `state` is `ConnectionState::Open`, peer is moved to
168        /// state `PeerState::Closed` and user is allowed to retry opening an outbound substream.
169        state: ConnectionState,
170    },
171
172    /// Connection to peer is closed.
173    Closed {
174        /// Connection might have been closed while there was an outbound substream still pending.
175        ///
176        /// To handle this state transition correctly in case the substream opens after the
177        /// connection is considered closed, store the `SubstreamId` to that it can be verified in
178        /// case the substream ever opens.
179        pending_open: Option<SubstreamId>,
180    },
181
182    /// Peer is being dialed in order to open an outbound substream to them.
183    Dialing,
184
185    /// Outbound substream initiated.
186    OutboundInitiated {
187        /// Substream ID.
188        substream: SubstreamId,
189    },
190
191    /// Substream is being validated.
192    Validating {
193        /// Protocol.
194        protocol: ProtocolName,
195
196        /// Fallback protocol, if the substream was negotiated using a fallback name.
197        fallback: Option<ProtocolName>,
198
199        /// Outbound protocol state.
200        outbound: OutboundState,
201
202        /// Inbound protocol state.
203        inbound: InboundState,
204
205        /// Direction.
206        direction: Direction,
207    },
208
209    /// Notification stream has been opened.
210    Open {
211        /// `Oneshot::Sender` for shutting down the connection.
212        shutdown: oneshot::Sender<()>,
213    },
214}
215
216/// Peer context.
217#[derive(Debug)]
218struct PeerContext {
219    /// Peer state.
220    state: PeerState,
221}
222
223impl PeerContext {
224    /// Create new [`PeerContext`].
225    fn new() -> Self {
226        Self {
227            state: PeerState::Closed { pending_open: None },
228        }
229    }
230}
231
232pub(crate) struct NotificationProtocol {
233    /// Transport service.
234    service: TransportService,
235
236    /// Protocol.
237    protocol: ProtocolName,
238
239    /// Auto accept inbound substream if the outbound substream was initiated by the local node.
240    auto_accept: bool,
241
242    /// TX channel passed to the protocol used for sending events.
243    event_handle: NotificationEventHandle,
244
245    /// TX channel for sending shut down notifications from connection handlers to
246    /// [`NotificationProtocol`].
247    shutdown_tx: Sender<PeerId>,
248
249    /// RX channel for receiving shutdown notifications from the connection handlers.
250    shutdown_rx: Receiver<PeerId>,
251
252    /// RX channel passed to the protocol used for receiving commands.
253    command_rx: Receiver<NotificationCommand>,
254
255    /// TX channel given to connection handlers for sending notifications.
256    notif_tx: Sender<(PeerId, BytesMut)>,
257
258    /// Connected peers.
259    peers: HashMap<PeerId, PeerContext>,
260
261    /// Pending outboudn substreams.
262    pending_outbound: HashMap<SubstreamId, PeerId>,
263
264    /// Handshaking service which reads and writes the handshakes to inbound
265    /// and outbound substreams asynchronously.
266    negotiation: HandshakeService,
267
268    /// Synchronous channel size.
269    sync_channel_size: usize,
270
271    /// Asynchronous channel size.
272    async_channel_size: usize,
273
274    /// Executor for connection handlers.
275    executor: Arc<dyn Executor>,
276
277    /// Pending substream validations.
278    pending_validations: FuturesUnordered<BoxFuture<'static, (PeerId, ValidationResult)>>,
279
280    /// Timers for pending outbound substreams.
281    timers: FuturesUnordered<BoxFuture<'static, PeerId>>,
282
283    /// Should `NotificationProtocol` attempt to dial the peer.
284    should_dial: bool,
285}
286
287impl NotificationProtocol {
288    pub(crate) fn new(
289        service: TransportService,
290        config: Config,
291        executor: Arc<dyn Executor>,
292    ) -> Self {
293        let (shutdown_tx, shutdown_rx) = channel(DEFAULT_CHANNEL_SIZE);
294
295        Self {
296            service,
297            shutdown_tx,
298            shutdown_rx,
299            executor,
300            peers: HashMap::new(),
301            protocol: config.protocol_name,
302            auto_accept: config.auto_accept,
303            pending_validations: FuturesUnordered::new(),
304            timers: FuturesUnordered::new(),
305            event_handle: NotificationEventHandle::new(config.event_tx),
306            notif_tx: config.notif_tx,
307            command_rx: config.command_rx,
308            pending_outbound: HashMap::new(),
309            negotiation: HandshakeService::new(config.handshake),
310            sync_channel_size: config.sync_channel_size,
311            async_channel_size: config.async_channel_size,
312            should_dial: config.should_dial,
313        }
314    }
315
316    /// Connection established to remote node.
317    ///
318    /// If the peer already exists, the only valid state for it is `Dialing` as it indicates that
319    /// the user tried to open a substream to a peer who was not connected to local node.
320    ///
321    /// Any other state indicates that there's an error in the state transition logic.
322    async fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
323        tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection established");
324
325        let Some(context) = self.peers.get_mut(&peer) else {
326            self.peers.insert(peer, PeerContext::new());
327            return Ok(());
328        };
329
330        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
331            PeerState::Dialing => {
332                tracing::trace!(
333                    target: LOG_TARGET,
334                    ?peer,
335                    protocol = %self.protocol,
336                    "dial succeeded, open substream to peer",
337                );
338
339                context.state = PeerState::Closed { pending_open: None };
340                self.on_open_substream(peer).await
341            }
342            // connection established but validation is still pending
343            //
344            // update the connection state so that `NotificationProtocol` can proceed
345            // to correct state after the validation result has beern received
346            PeerState::ValidationPending { state } => {
347                debug_assert_eq!(state, ConnectionState::Closed);
348
349                tracing::debug!(
350                    target: LOG_TARGET,
351                    ?peer,
352                    protocol = %self.protocol,
353                    "new connection established while validation still pending",
354                );
355
356                context.state = PeerState::ValidationPending {
357                    state: ConnectionState::Open,
358                };
359
360                Ok(())
361            }
362            state => {
363                tracing::error!(
364                    target: LOG_TARGET,
365                    ?peer,
366                    protocol = %self.protocol,
367                    ?state,
368                    "state mismatch: peer already exists",
369                );
370                debug_assert!(false);
371                Err(Error::PeerAlreadyExists(peer))
372            }
373        }
374    }
375
376    /// Connection closed to remote node.
377    ///
378    /// If the connection was considered open (both substreams were open), user is notified that
379    /// the notification stream was closed.
380    ///
381    /// If the connection was still in progress (either substream was not fully open), the user is
382    /// reported about it only if they had opened an outbound substream (outbound is either fully
383    /// open, it had been initiated or the substream was under negotiation).
384    async fn on_connection_closed(&mut self, peer: PeerId) -> crate::Result<()> {
385        tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
386
387        let Some(context) = self.peers.remove(&peer) else {
388            tracing::error!(
389                target: LOG_TARGET,
390                ?peer,
391                protocol = %self.protocol,
392                "state mismatch: peer doesn't exist",
393            );
394            debug_assert!(false);
395            return Err(Error::PeerDoesntExist(peer));
396        };
397
398        // clean up all pending state for the peer
399        self.negotiation.remove_outbound(&peer);
400        self.negotiation.remove_inbound(&peer);
401
402        match context.state {
403            // outbound initiated, report open failure to peer
404            PeerState::OutboundInitiated { .. } => {
405                self.event_handle
406                    .report_notification_stream_open_failure(peer, NotificationError::Rejected)
407                    .await;
408            }
409            // substream fully open, report that the notification stream is closed
410            PeerState::Open { shutdown } => {
411                let _ = shutdown.send(());
412            }
413            // if the substream was being validated, user must be notified that the substream is
414            // now considered rejected if they had been made aware of the existence of the pending
415            // connection
416            PeerState::Validating {
417                outbound, inbound, ..
418            } => {
419                match (outbound, inbound) {
420                    // substream was being validated by the protocol when the connection was closed
421                    (OutboundState::Closed, InboundState::Validating { .. }) => {
422                        tracing::debug!(
423                            target: LOG_TARGET,
424                            ?peer,
425                            protocol = %self.protocol,
426                            "connection closed while validation pending",
427                        );
428
429                        self.peers.insert(
430                            peer,
431                            PeerContext {
432                                state: PeerState::ValidationPending {
433                                    state: ConnectionState::Closed,
434                                },
435                            },
436                        );
437                    }
438                    // user either initiated an outbound substream or an outbound substream was
439                    // opened/being opened as a result of an accepted inbound substream but was not
440                    // yet fully open
441                    //
442                    // to have consistent state tracking in the user protocol, substream rejection
443                    // must be reported to the user
444                    (
445                        OutboundState::OutboundInitiated { .. }
446                        | OutboundState::Negotiating
447                        | OutboundState::Open { .. },
448                        _,
449                    ) => {
450                        tracing::debug!(
451                            target: LOG_TARGET,
452                            ?peer,
453                            protocol = %self.protocol,
454                            "connection closed outbound substream under negotiation",
455                        );
456
457                        self.event_handle
458                            .report_notification_stream_open_failure(
459                                peer,
460                                NotificationError::Rejected,
461                            )
462                            .await;
463                    }
464                    (_, _) => {}
465                }
466            }
467            // pending validations must be tracked across connection open/close events
468            PeerState::ValidationPending { .. } => {
469                tracing::debug!(
470                    target: LOG_TARGET,
471                    ?peer,
472                    protocol = %self.protocol,
473                    "validation pending while connection closed",
474                );
475
476                self.peers.insert(
477                    peer,
478                    PeerContext {
479                        state: PeerState::ValidationPending {
480                            state: ConnectionState::Closed,
481                        },
482                    },
483                );
484            }
485            _ => {}
486        }
487
488        Ok(())
489    }
490
491    /// Local node opened a substream to remote node.
492    ///
493    /// The connection can be in three different states:
494    ///   - this is the first substream that was opened and thus the connection was initiated by the
495    ///     local node
496    ///   - this is a response to a previously received inbound substream which the local node
497    ///     accepted and as a result, opened its own substream
498    ///   - local and remote nodes opened substreams at the same time
499    ///
500    /// In the first case, the local node's handshake is sent to remote node and the substream is
501    /// polled in the background until they either send their handshake or close the substream.
502    ///
503    /// For the second case, the connection was initiated by the remote node and the substream was
504    /// accepted by the local node which initiated an outbound substream to the remote node.
505    /// The only valid states for this case are [`InboundState::Open`],
506    /// and [`InboundState::SendingHandshake`] as they imply
507    /// that the inbound substream have been accepted by the local node and this opened outbound
508    /// substream is a result of a valid state transition.
509    ///
510    /// For the third case, if the nodes have opened substreams at the same time, the outbound state
511    /// must be [`OutboundState::OutboundInitiated`] to ascertain that the an outbound substream was
512    /// actually opened. Any other state would be a state mismatch and would mean that the
513    /// connection is opening substreams without the permission of the protocol handler.
514    async fn on_outbound_substream(
515        &mut self,
516        protocol: ProtocolName,
517        fallback: Option<ProtocolName>,
518        peer: PeerId,
519        substream_id: SubstreamId,
520        outbound: Substream,
521    ) -> crate::Result<()> {
522        tracing::debug!(
523            target: LOG_TARGET,
524            ?peer,
525            ?protocol,
526            ?substream_id,
527            "handle outbound substream",
528        );
529
530        // peer must exist since an outbound substream was received from them
531        let Some(context) = self.peers.get_mut(&peer) else {
532            tracing::error!(target: LOG_TARGET, ?peer, "peer doesn't exist for outbound substream");
533            debug_assert!(false);
534            return Err(Error::PeerDoesntExist(peer));
535        };
536
537        let pending_peer = self.pending_outbound.remove(&substream_id);
538
539        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
540            // the connection was initiated by the local node, send handshake to remote and wait to
541            // receive their handshake back
542            PeerState::OutboundInitiated { substream } => {
543                debug_assert!(substream == substream_id);
544                debug_assert!(pending_peer == Some(peer));
545
546                tracing::trace!(
547                    target: LOG_TARGET,
548                    ?peer,
549                    protocol = %self.protocol,
550                    ?fallback,
551                    ?substream_id,
552                    "negotiate outbound protocol",
553                );
554
555                self.negotiation.negotiate_outbound(peer, outbound);
556                context.state = PeerState::Validating {
557                    protocol,
558                    fallback,
559                    inbound: InboundState::Closed,
560                    outbound: OutboundState::Negotiating,
561                    direction: Direction::Outbound,
562                };
563            }
564            PeerState::Validating {
565                protocol,
566                fallback,
567                inbound,
568                direction,
569                outbound: outbound_state,
570            } => {
571                // the inbound substream has been accepted by the local node since the handshake has
572                // been read and the local handshake has either already been sent or
573                // it's in the process of being sent.
574                match inbound {
575                    InboundState::SendingHandshake | InboundState::Open { .. } => {
576                        context.state = PeerState::Validating {
577                            protocol,
578                            fallback,
579                            inbound,
580                            direction,
581                            outbound: OutboundState::Negotiating,
582                        };
583                        self.negotiation.negotiate_outbound(peer, outbound);
584                    }
585                    // nodes have opened substreams at the same time
586                    inbound_state => match outbound_state {
587                        OutboundState::OutboundInitiated { substream } => {
588                            debug_assert!(substream == substream_id);
589
590                            context.state = PeerState::Validating {
591                                protocol,
592                                fallback,
593                                direction,
594                                inbound: inbound_state,
595                                outbound: OutboundState::Negotiating,
596                            };
597                            self.negotiation.negotiate_outbound(peer, outbound);
598                        }
599                        // invalid state: more than one outbound substream has been opened
600                        inner_state => {
601                            tracing::error!(
602                                target: LOG_TARGET,
603                                ?peer,
604                                %protocol,
605                                ?substream_id,
606                                ?inbound_state,
607                                ?inner_state,
608                                "invalid state, expected `OutboundInitiated`",
609                            );
610
611                            let _ = outbound.close().await;
612                            debug_assert!(false);
613                        }
614                    },
615                }
616            }
617            // the connection may have been closed while an outbound substream was pending
618            // if the outbound substream was initiated successfully, close it and reset
619            // `pending_open`
620            PeerState::Closed { pending_open } if pending_open == Some(substream_id) => {
621                let _ = outbound.close().await;
622
623                context.state = PeerState::Closed { pending_open: None };
624            }
625            state => {
626                tracing::error!(
627                    target: LOG_TARGET,
628                    ?peer,
629                    %protocol,
630                    ?substream_id,
631                    ?state,
632                    "invalid state: more than one outbound substream opened",
633                );
634
635                let _ = outbound.close().await;
636                debug_assert!(false);
637            }
638        }
639
640        Ok(())
641    }
642
643    /// Remote opened a substream to local node.
644    ///
645    /// The peer can be in four different states for the inbound substream to be considered valid:
646    ///   - the connection is closed
647    ///   - conneection is open but substream validation from a previous connection is still pending
648    ///   - outbound substream has been opened but not yet acknowledged by the remote peer
649    ///   - outbound substream has been opened and acknowledged by the remote peer and it's being
650    ///     negotiated
651    ///
652    /// If remote opened more than one substream, the new substream is simply discarded.
653    async fn on_inbound_substream(
654        &mut self,
655        protocol: ProtocolName,
656        fallback: Option<ProtocolName>,
657        peer: PeerId,
658        substream: Substream,
659    ) -> crate::Result<()> {
660        // peer must exist since an inbound substream was received from them
661        let Some(context) = self.peers.get_mut(&peer) else {
662            tracing::error!(target: LOG_TARGET, ?peer, "peer doesn't exist for inbound substream");
663            debug_assert!(false);
664            return Err(Error::PeerDoesntExist(peer));
665        };
666
667        tracing::debug!(
668            target: LOG_TARGET,
669            ?peer,
670            %protocol,
671            ?fallback,
672            state = ?context.state,
673            "handle inbound substream",
674        );
675
676        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
677            // inbound substream of a previous connection is still pending validation,
678            // reject any new inbound substreams until an answer is heard from the user
679            state @ PeerState::ValidationPending { .. } => {
680                tracing::debug!(
681                    target: LOG_TARGET,
682                    ?peer,
683                    %protocol,
684                    ?fallback,
685                    ?state,
686                    "validation for previous substream still pending",
687                );
688
689                let _ = substream.close().await;
690                context.state = state;
691            }
692            // outbound substream for previous connection still pending, reject inbound substream
693            // and wait for the outbound substream state to conclude as either succeeded or failed
694            // before accepting any inbound substreams.
695            PeerState::Closed {
696                pending_open: Some(substream_id),
697            } => {
698                tracing::debug!(
699                    target: LOG_TARGET,
700                    ?peer,
701                    protocol = %self.protocol,
702                    "received inbound substream while outbound substream opening, rejecting",
703                );
704                let _ = substream.close().await;
705
706                context.state = PeerState::Closed {
707                    pending_open: Some(substream_id),
708                };
709            }
710            // the peer state is closed so this is a fresh inbound substream.
711            PeerState::Closed { pending_open: None } => {
712                self.negotiation.read_handshake(peer, substream);
713
714                context.state = PeerState::Validating {
715                    protocol,
716                    fallback,
717                    direction: Direction::Inbound,
718                    inbound: InboundState::ReadingHandshake,
719                    outbound: OutboundState::Closed,
720                };
721            }
722            // if the connection is under validation (so an outbound substream has been opened and
723            // it's still pending or under negotiation), the only valid state for the
724            // inbound state is closed as it indicates that there isn't an inbound substream yet for
725            // the remote node duplicate substreams are prohibited.
726            PeerState::Validating {
727                protocol,
728                fallback,
729                outbound,
730                direction,
731                inbound: InboundState::Closed,
732            } => {
733                self.negotiation.read_handshake(peer, substream);
734
735                context.state = PeerState::Validating {
736                    protocol,
737                    fallback,
738                    outbound,
739                    direction,
740                    inbound: InboundState::ReadingHandshake,
741                };
742            }
743            // outbound substream may have been initiated by the local node while a remote node also
744            // opened a substream roughly at the same time
745            PeerState::OutboundInitiated {
746                substream: outbound,
747            } => {
748                self.negotiation.read_handshake(peer, substream);
749
750                context.state = PeerState::Validating {
751                    protocol,
752                    fallback,
753                    direction: Direction::Outbound,
754                    outbound: OutboundState::OutboundInitiated {
755                        substream: outbound,
756                    },
757                    inbound: InboundState::ReadingHandshake,
758                };
759            }
760            // new inbound substream opend while validation for the previous substream was still
761            // pending
762            //
763            // the old substream can be considered dead because remote wouldn't open a new substream
764            // to us unless they had discarded the previous substream.
765            //
766            // set peer state to `ValidationPending` to indicate that the peer is "blocked" until a
767            // validation for the substream is heard, blocking any further activity for
768            // the connection and once the validation is received and in case the
769            // substream is accepted, it will be reported as open failure to to the peer
770            // because the states have gone out of sync.
771            PeerState::Validating {
772                outbound: OutboundState::Closed,
773                inbound:
774                    InboundState::Validating {
775                        inbound: pending_substream,
776                    },
777                ..
778            } => {
779                tracing::debug!(
780                    target: LOG_TARGET,
781                    ?peer,
782                    protocol = %self.protocol,
783                    "remote opened substream while previous was still pending, connection failed",
784                );
785                let _ = substream.close().await;
786                let _ = pending_substream.close().await;
787
788                context.state = PeerState::ValidationPending {
789                    state: ConnectionState::Open,
790                };
791            }
792            // remote opened another inbound substream, close it and otherwise ignore the event
793            // as this is a non-serious protocol violation.
794            state => {
795                tracing::debug!(
796                    target: LOG_TARGET,
797                    ?peer,
798                    %protocol,
799                    ?fallback,
800                    ?state,
801                    "remote opened more than one inbound substreams, discarding",
802                );
803
804                let _ = substream.close().await;
805                context.state = state;
806            }
807        }
808
809        Ok(())
810    }
811
812    /// Failed to open substream to remote node.
813    ///
814    /// If the substream was initiated by the local node, it must be reported that the substream
815    /// failed to open. Otherwise the peer state can silently be converted to `Closed`.
816    async fn on_substream_open_failure(
817        &mut self,
818        substream_id: SubstreamId,
819        error: SubstreamError,
820    ) {
821        tracing::debug!(
822            target: LOG_TARGET,
823            protocol = %self.protocol,
824            ?substream_id,
825            ?error,
826            "failed to open substream"
827        );
828
829        let Some(peer) = self.pending_outbound.remove(&substream_id) else {
830            tracing::warn!(
831                target: LOG_TARGET,
832                protocol = %self.protocol,
833                ?substream_id,
834                "pending outbound substream doesn't exist",
835            );
836            debug_assert!(false);
837            return;
838        };
839
840        // peer must exist since an outbound substream failure was received from them
841        let Some(context) = self.peers.get_mut(&peer) else {
842            tracing::warn!(target: LOG_TARGET, ?peer, "peer doesn't exist");
843            debug_assert!(false);
844            return;
845        };
846
847        match &mut context.state {
848            PeerState::OutboundInitiated { .. } => {
849                context.state = PeerState::Closed { pending_open: None };
850
851                self.event_handle
852                    .report_notification_stream_open_failure(peer, NotificationError::Rejected)
853                    .await;
854            }
855            // if the substream was accepted by the local node and as a result, an outbound
856            // substream was accepted as a result this should not be reported to local node
857            PeerState::Validating { outbound, .. } => {
858                self.negotiation.remove_inbound(&peer);
859                self.negotiation.remove_outbound(&peer);
860
861                let pending_open = match outbound {
862                    OutboundState::Closed => None,
863                    OutboundState::OutboundInitiated { substream } => {
864                        self.event_handle
865                            .report_notification_stream_open_failure(
866                                peer,
867                                NotificationError::Rejected,
868                            )
869                            .await;
870
871                        Some(*substream)
872                    }
873                    OutboundState::Negotiating | OutboundState::Open { .. } => {
874                        self.event_handle
875                            .report_notification_stream_open_failure(
876                                peer,
877                                NotificationError::Rejected,
878                            )
879                            .await;
880
881                        None
882                    }
883                };
884
885                context.state = PeerState::Closed { pending_open };
886            }
887            PeerState::Closed { pending_open } => {
888                tracing::debug!(
889                    target: LOG_TARGET,
890                    protocol = %self.protocol,
891                    ?substream_id,
892                    "substream open failure for a closed connection",
893                );
894                debug_assert_eq!(pending_open, &Some(substream_id));
895                context.state = PeerState::Closed { pending_open: None };
896            }
897            state => {
898                tracing::warn!(
899                    target: LOG_TARGET,
900                    protocol = %self.protocol,
901                    ?substream_id,
902                    ?state,
903                    "invalid state for outbound substream open failure",
904                );
905                context.state = PeerState::Closed { pending_open: None };
906                debug_assert!(false);
907            }
908        }
909    }
910
911    /// Open substream to remote `peer`.
912    ///
913    /// Outbound substream can opened only if the `PeerState` is `Closed`.
914    /// By forcing the substream to be opened only if the state is currently closed,
915    /// `NotificationProtocol` can enfore more predictable state transitions.
916    ///
917    /// Other states either imply an invalid state transition ([`PeerState::Open`]) or that an
918    /// inbound substream has already been received and its currently being validated by the user.
919    async fn on_open_substream(&mut self, peer: PeerId) -> crate::Result<()> {
920        tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "open substream");
921
922        let Some(context) = self.peers.get_mut(&peer) else {
923            if !self.should_dial {
924                tracing::debug!(
925                    target: LOG_TARGET,
926                    ?peer,
927                    protocol = %self.protocol,
928                    "connection to peer not open and dialing disabled",
929                );
930
931                self.event_handle
932                    .report_notification_stream_open_failure(peer, NotificationError::DialFailure)
933                    .await;
934                return Ok(());
935            }
936
937            match self.service.dial(&peer) {
938                Err(error) => {
939                    tracing::debug!(
940                        target: LOG_TARGET,
941                        ?peer,
942                        protocol = %self.protocol,
943                        ?error,
944                        "failed to dial peer",
945                    );
946
947                    self.event_handle
948                        .report_notification_stream_open_failure(
949                            peer,
950                            NotificationError::DialFailure,
951                        )
952                        .await;
953
954                    return Err(error.into());
955                }
956                Ok(()) => {
957                    tracing::trace!(
958                        target: LOG_TARGET,
959                        ?peer,
960                        protocol = %self.protocol,
961                        "started to dial peer",
962                    );
963
964                    self.peers.insert(
965                        peer,
966                        PeerContext {
967                            state: PeerState::Dialing,
968                        },
969                    );
970                    return Ok(());
971                }
972            }
973        };
974
975        match context.state {
976            // protocol can only request a new outbound substream to be opened if the state is
977            // `Closed` other states imply that it's already open
978            PeerState::Closed {
979                pending_open: Some(substream_id),
980            } => {
981                tracing::trace!(
982                    target: LOG_TARGET,
983                    ?peer,
984                    protocol = %self.protocol,
985                    ?substream_id,
986                    "outbound substream opening, reusing pending open substream",
987                );
988
989                self.pending_outbound.insert(substream_id, peer);
990                context.state = PeerState::OutboundInitiated {
991                    substream: substream_id,
992                };
993            }
994            PeerState::Closed { .. } => match self.service.open_substream(peer) {
995                Ok(substream_id) => {
996                    tracing::trace!(
997                        target: LOG_TARGET,
998                        ?peer,
999                        protocol = %self.protocol,
1000                        ?substream_id,
1001                        "outbound substream opening",
1002                    );
1003
1004                    self.pending_outbound.insert(substream_id, peer);
1005                    context.state = PeerState::OutboundInitiated {
1006                        substream: substream_id,
1007                    };
1008                }
1009                Err(error) => {
1010                    tracing::debug!(
1011                        target: LOG_TARGET,
1012                        ?peer,
1013                        protocol = %self.protocol,
1014                        ?error,
1015                        "failed to open substream",
1016                    );
1017
1018                    self.event_handle
1019                        .report_notification_stream_open_failure(
1020                            peer,
1021                            NotificationError::NoConnection,
1022                        )
1023                        .await;
1024                    context.state = PeerState::Closed { pending_open: None };
1025                }
1026            },
1027            // while a validation is pending for an inbound substream, user is not allowed to open
1028            // any outbound substreams until the old inbond substream is either accepted or rejected
1029            PeerState::ValidationPending { .. } => {
1030                tracing::trace!(
1031                    target: LOG_TARGET,
1032                    ?peer,
1033                    protocol = %self.protocol,
1034                    "validation still pending, rejecting outbound substream request",
1035                );
1036
1037                self.event_handle
1038                    .report_notification_stream_open_failure(
1039                        peer,
1040                        NotificationError::ValidationPending,
1041                    )
1042                    .await;
1043            }
1044            _ => {}
1045        }
1046
1047        Ok(())
1048    }
1049
1050    /// Close substream to remote `peer`.
1051    ///
1052    /// This function can only be called if the substream was actually open, any other state is
1053    /// unreachable as the user is unable to emit this command to [`NotificationProtocol`] unless
1054    /// the connection has been fully opened.
1055    async fn on_close_substream(&mut self, peer: PeerId) {
1056        tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "close substream");
1057
1058        let Some(context) = self.peers.get_mut(&peer) else {
1059            tracing::debug!(target: LOG_TARGET, ?peer, "peer doesn't exist");
1060            return;
1061        };
1062
1063        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1064            PeerState::Open { shutdown } => {
1065                let _ = shutdown.send(());
1066
1067                context.state = PeerState::Closed { pending_open: None };
1068            }
1069            state => {
1070                tracing::debug!(
1071                    target: LOG_TARGET,
1072                    ?peer,
1073                    protocol = %self.protocol,
1074                    ?state,
1075                    "substream already closed",
1076                );
1077                context.state = state;
1078            }
1079        }
1080    }
1081
1082    /// Handle validation result.
1083    ///
1084    /// The validation result binary (accept/reject). If the node is rejected, the substreams are
1085    /// discarded and state is set to `PeerState::Closed`. If there was an outbound substream in
1086    /// progress while the connection was rejected by the user, the oubound state is discarded,
1087    /// except for the substream ID of the substream which is kept for later use, in case the
1088    /// substream happens to open.
1089    ///
1090    /// If the node is accepted and there is no outbound substream to them open yet, a new substream
1091    /// is opened and once it opens, the local handshake will be sent to the remote peer and if
1092    /// they also accept the substream the connection is considered fully open.
1093    async fn on_validation_result(
1094        &mut self,
1095        peer: PeerId,
1096        result: ValidationResult,
1097    ) -> crate::Result<()> {
1098        tracing::trace!(
1099            target: LOG_TARGET,
1100            ?peer,
1101            protocol = %self.protocol,
1102            ?result,
1103            "handle validation result",
1104        );
1105
1106        let Some(context) = self.peers.get_mut(&peer) else {
1107            tracing::debug!(target: LOG_TARGET, ?peer, "peer doesn't exist");
1108            return Err(Error::PeerDoesntExist(peer));
1109        };
1110
1111        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1112            PeerState::Validating {
1113                protocol,
1114                fallback,
1115                outbound,
1116                direction,
1117                inbound: InboundState::Validating { inbound },
1118            } => match result {
1119                // substream was rejected by the local node, if an outbound substream was under
1120                // negotation, discard that data and if an outbound substream was
1121                // initiated, save the `SubstreamId` of that substream and later if the substream
1122                // is opened, the state can be corrected to `pending_open: None`.
1123                ValidationResult::Reject => {
1124                    let _ = inbound.close().await;
1125                    self.negotiation.remove_outbound(&peer);
1126                    self.negotiation.remove_inbound(&peer);
1127                    context.state = PeerState::Closed {
1128                        pending_open: outbound.pending_open(),
1129                    };
1130
1131                    Ok(())
1132                }
1133                ValidationResult::Accept => match outbound {
1134                    // no outbound substream exists so initiate a new substream open and send the
1135                    // local handshake to remote node, indicating that the
1136                    // connection was accepted by the local node
1137                    OutboundState::Closed => match self.service.open_substream(peer) {
1138                        Ok(substream) => {
1139                            self.negotiation.send_handshake(peer, inbound);
1140                            self.pending_outbound.insert(substream, peer);
1141
1142                            context.state = PeerState::Validating {
1143                                protocol,
1144                                fallback,
1145                                direction,
1146                                inbound: InboundState::SendingHandshake,
1147                                outbound: OutboundState::OutboundInitiated { substream },
1148                            };
1149                            Ok(())
1150                        }
1151                        // failed to open outbound substream after accepting an inbound substream
1152                        //
1153                        // since the user was notified of this substream and they accepted it,
1154                        // they expecting some kind of answer (open success/failure).
1155                        //
1156                        // report to user that the substream failed to open so they can track the
1157                        // state transitions of the peer correctly
1158                        Err(error) => {
1159                            tracing::trace!(
1160                                target: LOG_TARGET,
1161                                ?peer,
1162                                protocol = %self.protocol,
1163                                ?result,
1164                                ?error,
1165                                "failed to open outbound substream for accepted substream",
1166                            );
1167
1168                            let _ = inbound.close().await;
1169                            context.state = PeerState::Closed { pending_open: None };
1170
1171                            self.event_handle
1172                                .report_notification_stream_open_failure(
1173                                    peer,
1174                                    NotificationError::Rejected,
1175                                )
1176                                .await;
1177
1178                            Err(error.into())
1179                        }
1180                    },
1181                    // here the state is one of `OutboundState::{OutboundInitiated, Negotiating,
1182                    // Open}` so that state can be safely ignored and all that
1183                    // has to be done is to send the local handshake to remote
1184                    // node to indicate that the connection was accepted.
1185                    _ => {
1186                        self.negotiation.send_handshake(peer, inbound);
1187
1188                        context.state = PeerState::Validating {
1189                            protocol,
1190                            fallback,
1191                            direction,
1192                            inbound: InboundState::SendingHandshake,
1193                            outbound,
1194                        };
1195                        Ok(())
1196                    }
1197                },
1198            },
1199            // validation result received for an inbound substream which is now considered dead
1200            // because while the substream was being validated, the connection had closed.
1201            //
1202            // if the substream was rejected and there is no active connection to the peer,
1203            // just remove the peer from `peers` without informing user
1204            //
1205            // if the substream was accepted, the user must be informed that the substream failed to
1206            // open. Depending on whether there is currently a connection open to the peer, either
1207            // report `Rejected`/`NoConnection` and let the user try again.
1208            PeerState::ValidationPending { state } => {
1209                if let Some(error) = match state {
1210                    ConnectionState::Open => {
1211                        context.state = PeerState::Closed { pending_open: None };
1212
1213                        std::matches!(result, ValidationResult::Accept)
1214                            .then_some(NotificationError::Rejected)
1215                    }
1216                    ConnectionState::Closed => {
1217                        self.peers.remove(&peer);
1218
1219                        std::matches!(result, ValidationResult::Accept)
1220                            .then_some(NotificationError::NoConnection)
1221                    }
1222                } {
1223                    self.event_handle.report_notification_stream_open_failure(peer, error).await;
1224                }
1225
1226                Ok(())
1227            }
1228            // if the user incorrectly send a validation result for a peer that doesn't require
1229            // validation, set state back to what it was and ignore the event
1230            //
1231            // the user protocol may send a stale validation result not because of a programming
1232            // error but because it has a backlock of unhandled events, with one event potentially
1233            // nullifying the need for substream validation, and is just temporarily out of sync
1234            // with `NotificationProtocol`
1235            state => {
1236                tracing::debug!(
1237                    target: LOG_TARGET,
1238                    ?peer,
1239                    protocol = %self.protocol,
1240                    ?state,
1241                    "validation result received for peer that doesn't require validation",
1242                );
1243
1244                context.state = state;
1245                Ok(())
1246            }
1247        }
1248    }
1249
1250    /// Handle handshake event.
1251    ///
1252    /// There are three different handshake event types:
1253    ///   - outbound substream negotiated
1254    ///   - inbound substream negotiated
1255    ///   - substream negotiation error
1256    ///
1257    /// Neither outbound nor inbound substream negotiated automatically means that the connection is
1258    /// considered open as both substreams must be fully negotiated for that to be the case. That is
1259    /// why the peer state for inbound and outbound are set separately and at the end of the
1260    /// function is the collective state of the substreams checked and if both substreams are
1261    /// negotiated, the user informed that the connection is open.
1262    ///
1263    /// If the negotiation fails, the user may have to be informed of that. Outbound substream
1264    /// failure always results in user getting notified since the existence of an outbound substream
1265    /// means that the user has either initiated an outbound substreams or has accepted an inbound
1266    /// substreams, resulting in an outbound substreams.
1267    ///
1268    /// Negotiation failure for inbound substreams which are in the state
1269    /// [`InboundState::ReadingHandshake`] don't result in any notification because while the
1270    /// handshake is being read from the substream, the user is oblivious to the fact that an
1271    /// inbound substream has even been received.
1272    async fn on_handshake_event(&mut self, peer: PeerId, event: HandshakeEvent) {
1273        let Some(context) = self.peers.get_mut(&peer) else {
1274            tracing::error!(target: LOG_TARGET, "invalid state: negotiation event received but peer doesn't exist");
1275            debug_assert!(false);
1276            return;
1277        };
1278
1279        tracing::trace!(
1280            target: LOG_TARGET,
1281            ?peer,
1282            protocol = %self.protocol,
1283            ?event,
1284            "handle handshake event",
1285        );
1286
1287        match event {
1288            // either an inbound or outbound substream has been negotiated successfully
1289            HandshakeEvent::Negotiated {
1290                peer,
1291                handshake,
1292                substream,
1293                direction,
1294            } => match direction {
1295                // outbound substream was negotiated, the only valid state for peer is `Validating`
1296                // and only valid state for `OutboundState` is `Negotiating`
1297                negotiation::Direction::Outbound => {
1298                    self.negotiation.remove_outbound(&peer);
1299
1300                    match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1301                        PeerState::Validating {
1302                            protocol,
1303                            fallback,
1304                            direction,
1305                            outbound: OutboundState::Negotiating,
1306                            inbound,
1307                        } => {
1308                            context.state = PeerState::Validating {
1309                                protocol,
1310                                fallback,
1311                                direction,
1312                                outbound: OutboundState::Open {
1313                                    handshake,
1314                                    outbound: substream,
1315                                },
1316                                inbound,
1317                            };
1318                        }
1319                        state => {
1320                            tracing::warn!(
1321                                target: LOG_TARGET,
1322                                ?peer,
1323                                ?state,
1324                                "outbound substream negotiated but peer has invalid state",
1325                            );
1326                            debug_assert!(false);
1327                        }
1328                    }
1329                }
1330                // inbound negotiation event completed
1331                //
1332                // the negotiation event can be on of two different types:
1333                //   - remote handshake was read from the substream
1334                //   - local handshake has been sent to remote node
1335                //
1336                // For the first case, the substream has to be validated by the local node.
1337                // This means reporting the protocol name, potential negotiated fallback and the
1338                // handshake. Local node will then either accept or reject the substream which is
1339                // handled by [`NotificationProtocol::on_validation_result()`]. Compared to
1340                // Substrate, litep2p requires both peers to validate the inbound handshake to allow
1341                // more complex connection validation. If this is not necessary and the protocol
1342                // wishes to auto-accept the inbound substreams that are a result of
1343                // an outbound substream already accepted by the remote node, the
1344                // substream validation is skipped and the local handshake is sent
1345                // right away.
1346                //
1347                // For the second case, the local handshake was sent to remote node successfully and
1348                // the inbound substream is considered open and if the outbound
1349                // substream is open as well, the connection is fully open.
1350                //
1351                // Only valid states for [`InboundState`] are [`InboundState::ReadingHandshake`] and
1352                // [`InboundState::SendingHandshake`] because otherwise the inbound
1353                // substream cannot be in [`HandshakeService`](super::negotiation::HandshakeService)
1354                // unless there is a logic bug in the state machine.
1355                negotiation::Direction::Inbound => {
1356                    self.negotiation.remove_inbound(&peer);
1357
1358                    match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1359                        PeerState::Validating {
1360                            protocol,
1361                            fallback,
1362                            direction,
1363                            outbound,
1364                            inbound: InboundState::ReadingHandshake,
1365                        } => {
1366                            if !std::matches!(outbound, OutboundState::Closed) && self.auto_accept {
1367                                tracing::trace!(
1368                                    target: LOG_TARGET,
1369                                    ?peer,
1370                                    %protocol,
1371                                    ?fallback,
1372                                    ?direction,
1373                                    ?outbound,
1374                                    "auto-accept inbound substream",
1375                                );
1376
1377                                self.negotiation.send_handshake(peer, substream);
1378                                context.state = PeerState::Validating {
1379                                    protocol,
1380                                    fallback,
1381                                    direction,
1382                                    inbound: InboundState::SendingHandshake,
1383                                    outbound,
1384                                };
1385
1386                                return;
1387                            }
1388
1389                            tracing::trace!(
1390                                target: LOG_TARGET,
1391                                ?peer,
1392                                %protocol,
1393                                ?fallback,
1394                                ?outbound,
1395                                "send inbound protocol for validation",
1396                            );
1397
1398                            context.state = PeerState::Validating {
1399                                protocol: protocol.clone(),
1400                                fallback: fallback.clone(),
1401                                inbound: InboundState::Validating { inbound: substream },
1402                                outbound,
1403                                direction,
1404                            };
1405
1406                            let (tx, rx) = oneshot::channel();
1407                            self.pending_validations.push(Box::pin(async move {
1408                                match rx.await {
1409                                    Ok(ValidationResult::Accept) =>
1410                                        (peer, ValidationResult::Accept),
1411                                    _ => (peer, ValidationResult::Reject),
1412                                }
1413                            }));
1414
1415                            self.event_handle
1416                                .report_inbound_substream(protocol, fallback, peer, handshake, tx)
1417                                .await;
1418                        }
1419                        PeerState::Validating {
1420                            protocol,
1421                            fallback,
1422                            direction,
1423                            inbound: InboundState::SendingHandshake,
1424                            outbound,
1425                        } => {
1426                            tracing::trace!(
1427                                target: LOG_TARGET,
1428                                ?peer,
1429                                %protocol,
1430                                ?fallback,
1431                                "inbound substream negotiated, waiting for outbound substream to complete",
1432                            );
1433
1434                            context.state = PeerState::Validating {
1435                                protocol: protocol.clone(),
1436                                fallback: fallback.clone(),
1437                                inbound: InboundState::Open { inbound: substream },
1438                                outbound,
1439                                direction,
1440                            };
1441                        }
1442                        _state => debug_assert!(false),
1443                    }
1444                }
1445            },
1446            // error occurred during negotiation, eitehr for inbound or outbound substream
1447            // user is notified of the error only if they've either initiated an outbound substream
1448            // or if they accepted an inbound substream and as a result initiated an outbound
1449            // substream.
1450            HandshakeEvent::NegotiationError { peer, direction } => {
1451                tracing::debug!(
1452                    target: LOG_TARGET,
1453                    ?peer,
1454                    protocol = %self.protocol,
1455                    ?direction,
1456                    state = ?context.state,
1457                    "failed to negotiate substream",
1458                );
1459                let _ = self.negotiation.remove_outbound(&peer);
1460                let _ = self.negotiation.remove_inbound(&peer);
1461
1462                // if an outbound substream had been initiated (whatever its state is), it means
1463                // that the user knows about the connection and must be notified that it failed to
1464                // negotiate.
1465                match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1466                    PeerState::Validating { outbound, .. } => {
1467                        context.state = PeerState::Closed {
1468                            pending_open: outbound.pending_open(),
1469                        };
1470
1471                        // notify user if the outbound substream is not considered closed
1472                        if !std::matches!(outbound, OutboundState::Closed) {
1473                            return self
1474                                .event_handle
1475                                .report_notification_stream_open_failure(
1476                                    peer,
1477                                    NotificationError::Rejected,
1478                                )
1479                                .await;
1480                        }
1481                    }
1482                    _state => debug_assert!(false),
1483                }
1484            }
1485        }
1486
1487        // if both inbound and outbound substreams are considered open, notify the user that
1488        // a notification stream has been opened and set up for sending and receiving
1489        // notifications to and from remote node
1490        match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1491            PeerState::Validating {
1492                protocol,
1493                fallback,
1494                direction,
1495                outbound:
1496                    OutboundState::Open {
1497                        handshake,
1498                        outbound,
1499                    },
1500                inbound: InboundState::Open { inbound },
1501            } => {
1502                tracing::debug!(
1503                    target: LOG_TARGET,
1504                    ?peer,
1505                    %protocol,
1506                    ?fallback,
1507                    "notification stream opened",
1508                );
1509
1510                let (async_tx, async_rx) = channel(self.async_channel_size);
1511                let (sync_tx, sync_rx) = channel(self.sync_channel_size);
1512                let sink = NotificationSink::new(peer, sync_tx, async_tx);
1513
1514                // start connection handler for the peer which only deals with sending/receiving
1515                // notifications
1516                //
1517                // the connection handler must be started only after the newly opened notification
1518                // substream is reported to user because the connection handler
1519                // might exit immediately after being started if remote closed the connection.
1520                //
1521                // if the order of events (open & close) is not ensured to be correct, the code
1522                // handling the connectivity logic on the `NotificationHandle` side
1523                // might get confused about the current state of the connection.
1524                let shutdown_tx = self.shutdown_tx.clone();
1525                let (connection, shutdown) = Connection::new(
1526                    peer,
1527                    inbound,
1528                    outbound,
1529                    self.event_handle.clone(),
1530                    shutdown_tx.clone(),
1531                    self.notif_tx.clone(),
1532                    async_rx,
1533                    sync_rx,
1534                );
1535
1536                context.state = PeerState::Open { shutdown };
1537                self.event_handle
1538                    .report_notification_stream_opened(
1539                        protocol, fallback, direction, peer, handshake, sink,
1540                    )
1541                    .await;
1542
1543                self.executor.run(Box::pin(async move {
1544                    connection.start().await;
1545                }));
1546            }
1547            state => {
1548                tracing::trace!(
1549                    target: LOG_TARGET,
1550                    ?peer,
1551                    protocol = %self.protocol,
1552                    ?state,
1553                    "validation for substream still pending",
1554                );
1555                self.timers.push(Box::pin(async move {
1556                    futures_timer::Delay::new(Duration::from_secs(5)).await;
1557                    peer
1558                }));
1559
1560                context.state = state;
1561            }
1562        }
1563    }
1564
1565    /// Handle dial failure.
1566    async fn on_dial_failure(&mut self, peer: PeerId, address: Multiaddr) {
1567        tracing::trace!(
1568            target: LOG_TARGET,
1569            ?peer,
1570            protocol = %self.protocol,
1571            ?address,
1572            "handle dial failure",
1573        );
1574
1575        let Some(context) = self.peers.remove(&peer) else {
1576            tracing::trace!(
1577                target: LOG_TARGET,
1578                ?peer,
1579                protocol = %self.protocol,
1580                ?address,
1581                "dial failure for an unknown peer",
1582            );
1583            return;
1584        };
1585
1586        match context.state {
1587            PeerState::Dialing => {
1588                tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, ?address, "failed to dial peer");
1589                self.event_handle
1590                    .report_notification_stream_open_failure(peer, NotificationError::DialFailure)
1591                    .await;
1592            }
1593            state => {
1594                tracing::trace!(
1595                    target: LOG_TARGET,
1596                    ?peer,
1597                    protocol = %self.protocol,
1598                    ?state,
1599                    "dial failure for peer that's not being dialed",
1600                );
1601                self.peers.insert(peer, PeerContext { state });
1602            }
1603        }
1604    }
1605
1606    /// Handle next notification event.
1607    async fn next_event(&mut self) {
1608        // biased select is used because the substream events must be prioritized above other events
1609        // that is because a closed substream is detected by either `substreams` or `negotiation`
1610        // and if that event is not handled with priority but, e.g., inbound substream is
1611        // handled before, it can create a situation where the state machine gets confused
1612        // about the peer's state.
1613        tokio::select! {
1614            biased;
1615
1616            event = self.negotiation.next(), if !self.negotiation.is_empty() => {
1617                if let Some((peer, event)) = event {
1618                    self.on_handshake_event(peer, event).await;
1619                } else {
1620                    tracing::error!(target: LOG_TARGET, "`HandshakeService` expected to return `Some(..)`");
1621                    debug_assert!(false);
1622                };
1623            }
1624            event = self.shutdown_rx.recv() => match event {
1625                None => (),
1626                Some(peer) => {
1627                    if let Some(context) = self.peers.get_mut(&peer) {
1628                        tracing::trace!(
1629                            target: LOG_TARGET,
1630                            ?peer,
1631                            protocol = %self.protocol,
1632                            "notification stream to peer closed",
1633                        );
1634                        context.state = PeerState::Closed { pending_open: None };
1635                    }
1636                }
1637            },
1638            // TODO: this could be combined with `Negotiation`
1639            peer = self.timers.next(), if !self.timers.is_empty() => match peer {
1640                Some(peer) => {
1641                    match self.peers.get_mut(&peer) {
1642                        Some(context) => match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1643                            PeerState::Validating {
1644                                outbound: OutboundState::Open { outbound, .. },
1645                                inbound: InboundState::Closed,
1646                                ..
1647                            } => {
1648                                tracing::debug!(
1649                                    target: LOG_TARGET,
1650                                    ?peer,
1651                                    protocol = %self.protocol,
1652                                    "peer didn't answer in 10 seconds, canceling substream and closing connection",
1653                                );
1654                                context.state = PeerState::Closed { pending_open: None };
1655
1656                                let _ = outbound.close().await;
1657                                self.event_handle
1658                                    .report_notification_stream_open_failure(peer, NotificationError::Rejected)
1659                                    .await;
1660
1661                                // NOTE: this is used to work around an issue in Substrate where the protocol
1662                                // is not notified if an inbound substream is closed. That indicates that remote
1663                                // wishes the close the connection but `Notifications` still keeps the substream state
1664                                // as `Open` until the outbound substream is closed (even though the outbound substream
1665                                // is also closed at that point). This causes a further issue: inbound substreams
1666                                // are automatically opened when state is `Open`, even if the inbound substream belongs
1667                                // to a new "connection" (pair of substreams).
1668                                //
1669                                // basically what happens (from Substrate's PoV) is there are pair of substreams (`inbound1`, `outbound1`),
1670                                // litep2p closes both substreams so both `inbound1` and outbound1 become non-readable/writable.
1671                                // Substrate doesn't detect this an instead only marks `inbound1` is closed while still keeping
1672                                // the (now-closed) `outbound1` active and it will be detected closed only when Substrate tries to
1673                                // write something into that substream. If now litep2p tries to open a new connection to Substrate,
1674                                // the outbound substream from litep2p's PoV will be automatically accepted (https://github.com/paritytech/polkadot-sdk/blob/59b2661444de2a25f2125a831bd786035a9fac4b/substrate/client/network/src/protocol/notifications/handler.rs#L544-L556)
1675                                // but since Substrate thinks `outbound1` is still active, it won't open a new outbound substream
1676                                // and it ends up having (`inbound2`, `outbound1`) as its pair of substreams which doens't make sense.
1677                                //
1678                                // since litep2p is expecting to receive an inbound substream from Substrate and never receives it,
1679                                // it basically can't make progress with the substream open request because litep2p can't force Substrate
1680                                // to detect that `outbound1` is closed. Easiest (and very hacky at the same time) way to reset the substream
1681                                // state is to close the connection. This is not an appropriate way to fix the issue and causes issues with,
1682                                // e.g., smoldot which at the time of writing this doesn't support the transaction protocol. The only way to fix
1683                                // this cleanly is to make Substrate detect closed substreams correctly.
1684                                if let Err(error) = self.service.force_close(peer) {
1685                                    tracing::debug!(
1686                                        target: LOG_TARGET,
1687                                        ?peer,
1688                                        protocol = %self.protocol,
1689                                        ?error,
1690                                        "failed to force close connection",
1691                                    );
1692                                }
1693                            }
1694                            state => {
1695                                tracing::trace!(
1696                                    target: LOG_TARGET,
1697                                    ?peer,
1698                                    protocol = %self.protocol,
1699                                    ?state,
1700                                    "ignore expired timer for peer",
1701                                );
1702                                context.state = state;
1703                            }
1704                        }
1705                        None => tracing::debug!(
1706                            target: LOG_TARGET,
1707                            ?peer,
1708                            protocol = %self.protocol,
1709                            "peer doesn't exist anymore",
1710                        ),
1711                    }
1712                }
1713                None => (),
1714            },
1715            event = self.service.next() => match event {
1716                Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
1717                    if let Err(error) = self.on_connection_established(peer).await {
1718                        tracing::debug!(
1719                            target: LOG_TARGET,
1720                            ?peer,
1721                            ?error,
1722                            "failed to register peer",
1723                        );
1724                    }
1725                }
1726                Some(TransportEvent::ConnectionClosed { peer }) => {
1727                    if let Err(error) = self.on_connection_closed(peer).await {
1728                        tracing::debug!(
1729                            target: LOG_TARGET,
1730                            ?peer,
1731                            ?error,
1732                            "failed to disconnect peer",
1733                        );
1734                    }
1735                }
1736                Some(TransportEvent::SubstreamOpened {
1737                    peer,
1738                    substream,
1739                    direction,
1740                    protocol,
1741                    fallback,
1742                }) => match direction {
1743                    protocol::Direction::Inbound => {
1744                        if let Err(error) = self.on_inbound_substream(protocol, fallback, peer, substream).await {
1745                            tracing::debug!(
1746                                target: LOG_TARGET,
1747                                ?peer,
1748                                ?error,
1749                                "failed to handle inbound substream",
1750                            );
1751                        }
1752                    }
1753                    protocol::Direction::Outbound(substream_id) => {
1754                        if let Err(error) = self
1755                            .on_outbound_substream(protocol, fallback, peer, substream_id, substream)
1756                            .await
1757                        {
1758                            tracing::debug!(
1759                                target: LOG_TARGET,
1760                                ?peer,
1761                                ?error,
1762                                "failed to handle outbound substream",
1763                            );
1764                        }
1765                    }
1766                },
1767                Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
1768                    self.on_substream_open_failure(substream, error).await;
1769                }
1770                Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address).await,
1771                None => (),
1772            },
1773            result = self.pending_validations.select_next_some(), if !self.pending_validations.is_empty() => {
1774                if let Err(error) = self.on_validation_result(result.0, result.1).await {
1775                    tracing::debug!(
1776                        target: LOG_TARGET,
1777                        peer = ?result.0,
1778                        result = ?result.1,
1779                        ?error,
1780                        "failed to handle validation result",
1781                    );
1782                }
1783            }
1784            command = self.command_rx.recv() => match command {
1785                None => {
1786                    tracing::debug!(target: LOG_TARGET, "user protocol has exited, exiting");
1787                }
1788                Some(command) => match command {
1789                    NotificationCommand::OpenSubstream { peers } => {
1790                        for peer in peers {
1791                            if let Err(error) = self.on_open_substream(peer).await {
1792                                tracing::debug!(
1793                                    target: LOG_TARGET,
1794                                    ?peer,
1795                                    ?error,
1796                                    "failed to open substream",
1797                                );
1798                            }
1799                        }
1800                    }
1801                    NotificationCommand::CloseSubstream { peers } => {
1802                        for peer in peers {
1803                            self.on_close_substream(peer).await;
1804                        }
1805                    }
1806                    NotificationCommand::ForceClose { peer } => {
1807                        let _ = self.service.force_close(peer);
1808                    }
1809                }
1810            },
1811        }
1812    }
1813
1814    /// Start [`NotificationProtocol`] event loop.
1815    pub(crate) async fn run(mut self) {
1816        tracing::debug!(target: LOG_TARGET, "starting notification event loop");
1817
1818        loop {
1819            self.next_event().await;
1820        }
1821    }
1822}