litep2p/protocol/notification/mod.rs
1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Notification protocol implementation.
22
23use crate::{
24 error::{Error, SubstreamError},
25 executor::Executor,
26 protocol::{
27 self,
28 notification::{
29 connection::Connection,
30 handle::NotificationEventHandle,
31 negotiation::{HandshakeEvent, HandshakeService},
32 types::NotificationCommand,
33 },
34 TransportEvent, TransportService,
35 },
36 substream::Substream,
37 types::{protocol::ProtocolName, SubstreamId},
38 PeerId, DEFAULT_CHANNEL_SIZE,
39};
40
41use bytes::BytesMut;
42use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
43use multiaddr::Multiaddr;
44use tokio::sync::{
45 mpsc::{channel, Receiver, Sender},
46 oneshot,
47};
48
49use std::{collections::HashMap, sync::Arc, time::Duration};
50
51pub use config::{Config, ConfigBuilder};
52pub use handle::{NotificationHandle, NotificationSink};
53pub use types::{Direction, NotificationError, NotificationEvent, ValidationResult};
54
55mod config;
56mod connection;
57mod handle;
58mod negotiation;
59mod types;
60
61#[cfg(test)]
62mod tests;
63
64/// Logging target for the file.
65const LOG_TARGET: &str = "litep2p::notification";
66
67/// Connection state.
68///
69/// Used to track transport level connectivity state when there is a pending validation.
70/// See [`PeerState::PendingValidation.`] for more details.
71#[derive(Debug, PartialEq, Eq)]
72enum ConnectionState {
73 /// There is a active, transport-level connection open to the peer.
74 Open,
75
76 /// There is no transport-level connection open to the peer.
77 Closed,
78}
79
80/// Inbound substream state.
81#[derive(Debug)]
82enum InboundState {
83 /// Substream is closed.
84 Closed,
85
86 /// Handshake is being read from the remote node.
87 ReadingHandshake,
88
89 /// Substream and its handshake are being validated by the user protocol.
90 Validating {
91 /// Inbound substream.
92 inbound: Substream,
93 },
94
95 /// Handshake is being sent to the remote node.
96 SendingHandshake,
97
98 /// Substream is open.
99 Open {
100 /// Inbound substream.
101 inbound: Substream,
102 },
103}
104
105/// Outbound substream state.
106#[derive(Debug)]
107enum OutboundState {
108 /// Substream is closed.
109 Closed,
110
111 /// Outbound substream initiated.
112 OutboundInitiated {
113 /// Substream ID.
114 substream: SubstreamId,
115 },
116
117 /// Substream is in the state of being negotiated.
118 ///
119 /// This process entails sending local node's handshake and reading back the remote node's
120 /// handshake if they've accepted the substream or detecting that the substream was closed
121 /// in case the substream was rejected.
122 Negotiating,
123
124 /// Substream is open.
125 Open {
126 /// Received handshake.
127 handshake: Vec<u8>,
128
129 /// Outbound substream.
130 outbound: Substream,
131 },
132}
133
134impl OutboundState {
135 /// Get pending outboud substream ID, if it exists.
136 fn pending_open(&self) -> Option<SubstreamId> {
137 match &self {
138 OutboundState::OutboundInitiated { substream } => Some(*substream),
139 _ => None,
140 }
141 }
142}
143
144#[derive(Debug)]
145enum PeerState {
146 /// Peer state is poisoned due to invalid state transition.
147 Poisoned,
148
149 /// Validation for an inbound substream is still pending.
150 ///
151 /// In order to enforce valid state transitions, `NotificationProtocol` keeps track of pending
152 /// validations across connectivity events (open/closed) and enforces that no activity happens
153 /// for any peer that is still awaiting validation for their inbound substream.
154 ///
155 /// If connection closes while the substream is being validated, instead of removing peer from
156 /// `peers`, the peer state is set as `ValidationPending` which indicates to the state machine
157 /// that a response for a inbound substream is pending validation. The substream itself will be
158 /// dead by the time validation is received if the peer state is `ValidationPending` since the
159 /// substream was part of a previous, now-closed substream but this state allows
160 /// `NotificationProtocol` to enforce correct state transitions by, e.g., rejecting new inbound
161 /// substream while a previous substream is still being validated or rejecting outbound
162 /// substreams on new connections if that same condition holds.
163 ValidationPending {
164 /// What is current connectivity state of the peer.
165 ///
166 /// If `state` is `ConnectionState::Closed` when the validation is finally received, peer
167 /// is removed from `peer` and if the `state` is `ConnectionState::Open`, peer is moved to
168 /// state `PeerState::Closed` and user is allowed to retry opening an outbound substream.
169 state: ConnectionState,
170 },
171
172 /// Connection to peer is closed.
173 Closed {
174 /// Connection might have been closed while there was an outbound substream still pending.
175 ///
176 /// To handle this state transition correctly in case the substream opens after the
177 /// connection is considered closed, store the `SubstreamId` to that it can be verified in
178 /// case the substream ever opens.
179 pending_open: Option<SubstreamId>,
180 },
181
182 /// Peer is being dialed in order to open an outbound substream to them.
183 Dialing,
184
185 /// Outbound substream initiated.
186 OutboundInitiated {
187 /// Substream ID.
188 substream: SubstreamId,
189 },
190
191 /// Substream is being validated.
192 Validating {
193 /// Protocol.
194 protocol: ProtocolName,
195
196 /// Fallback protocol, if the substream was negotiated using a fallback name.
197 fallback: Option<ProtocolName>,
198
199 /// Outbound protocol state.
200 outbound: OutboundState,
201
202 /// Inbound protocol state.
203 inbound: InboundState,
204
205 /// Direction.
206 direction: Direction,
207 },
208
209 /// Notification stream has been opened.
210 Open {
211 /// `Oneshot::Sender` for shutting down the connection.
212 shutdown: oneshot::Sender<()>,
213 },
214}
215
216/// Peer context.
217#[derive(Debug)]
218struct PeerContext {
219 /// Peer state.
220 state: PeerState,
221}
222
223impl PeerContext {
224 /// Create new [`PeerContext`].
225 fn new() -> Self {
226 Self {
227 state: PeerState::Closed { pending_open: None },
228 }
229 }
230}
231
232pub(crate) struct NotificationProtocol {
233 /// Transport service.
234 service: TransportService,
235
236 /// Protocol.
237 protocol: ProtocolName,
238
239 /// Auto accept inbound substream if the outbound substream was initiated by the local node.
240 auto_accept: bool,
241
242 /// TX channel passed to the protocol used for sending events.
243 event_handle: NotificationEventHandle,
244
245 /// TX channel for sending shut down notifications from connection handlers to
246 /// [`NotificationProtocol`].
247 shutdown_tx: Sender<PeerId>,
248
249 /// RX channel for receiving shutdown notifications from the connection handlers.
250 shutdown_rx: Receiver<PeerId>,
251
252 /// RX channel passed to the protocol used for receiving commands.
253 command_rx: Receiver<NotificationCommand>,
254
255 /// TX channel given to connection handlers for sending notifications.
256 notif_tx: Sender<(PeerId, BytesMut)>,
257
258 /// Connected peers.
259 peers: HashMap<PeerId, PeerContext>,
260
261 /// Pending outboudn substreams.
262 pending_outbound: HashMap<SubstreamId, PeerId>,
263
264 /// Handshaking service which reads and writes the handshakes to inbound
265 /// and outbound substreams asynchronously.
266 negotiation: HandshakeService,
267
268 /// Synchronous channel size.
269 sync_channel_size: usize,
270
271 /// Asynchronous channel size.
272 async_channel_size: usize,
273
274 /// Executor for connection handlers.
275 executor: Arc<dyn Executor>,
276
277 /// Pending substream validations.
278 pending_validations: FuturesUnordered<BoxFuture<'static, (PeerId, ValidationResult)>>,
279
280 /// Timers for pending outbound substreams.
281 timers: FuturesUnordered<BoxFuture<'static, PeerId>>,
282
283 /// Should `NotificationProtocol` attempt to dial the peer.
284 should_dial: bool,
285}
286
287impl NotificationProtocol {
288 pub(crate) fn new(
289 service: TransportService,
290 config: Config,
291 executor: Arc<dyn Executor>,
292 ) -> Self {
293 let (shutdown_tx, shutdown_rx) = channel(DEFAULT_CHANNEL_SIZE);
294
295 Self {
296 service,
297 shutdown_tx,
298 shutdown_rx,
299 executor,
300 peers: HashMap::new(),
301 protocol: config.protocol_name,
302 auto_accept: config.auto_accept,
303 pending_validations: FuturesUnordered::new(),
304 timers: FuturesUnordered::new(),
305 event_handle: NotificationEventHandle::new(config.event_tx),
306 notif_tx: config.notif_tx,
307 command_rx: config.command_rx,
308 pending_outbound: HashMap::new(),
309 negotiation: HandshakeService::new(config.handshake),
310 sync_channel_size: config.sync_channel_size,
311 async_channel_size: config.async_channel_size,
312 should_dial: config.should_dial,
313 }
314 }
315
316 /// Connection established to remote node.
317 ///
318 /// If the peer already exists, the only valid state for it is `Dialing` as it indicates that
319 /// the user tried to open a substream to a peer who was not connected to local node.
320 ///
321 /// Any other state indicates that there's an error in the state transition logic.
322 async fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
323 tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection established");
324
325 let Some(context) = self.peers.get_mut(&peer) else {
326 self.peers.insert(peer, PeerContext::new());
327 return Ok(());
328 };
329
330 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
331 PeerState::Dialing => {
332 tracing::trace!(
333 target: LOG_TARGET,
334 ?peer,
335 protocol = %self.protocol,
336 "dial succeeded, open substream to peer",
337 );
338
339 context.state = PeerState::Closed { pending_open: None };
340 self.on_open_substream(peer).await
341 }
342 // connection established but validation is still pending
343 //
344 // update the connection state so that `NotificationProtocol` can proceed
345 // to correct state after the validation result has beern received
346 PeerState::ValidationPending { state } => {
347 debug_assert_eq!(state, ConnectionState::Closed);
348
349 tracing::debug!(
350 target: LOG_TARGET,
351 ?peer,
352 protocol = %self.protocol,
353 "new connection established while validation still pending",
354 );
355
356 context.state = PeerState::ValidationPending {
357 state: ConnectionState::Open,
358 };
359
360 Ok(())
361 }
362 state => {
363 tracing::error!(
364 target: LOG_TARGET,
365 ?peer,
366 protocol = %self.protocol,
367 ?state,
368 "state mismatch: peer already exists",
369 );
370 debug_assert!(false);
371 Err(Error::PeerAlreadyExists(peer))
372 }
373 }
374 }
375
376 /// Connection closed to remote node.
377 ///
378 /// If the connection was considered open (both substreams were open), user is notified that
379 /// the notification stream was closed.
380 ///
381 /// If the connection was still in progress (either substream was not fully open), the user is
382 /// reported about it only if they had opened an outbound substream (outbound is either fully
383 /// open, it had been initiated or the substream was under negotiation).
384 async fn on_connection_closed(&mut self, peer: PeerId) -> crate::Result<()> {
385 tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "connection closed");
386
387 let Some(context) = self.peers.remove(&peer) else {
388 tracing::error!(
389 target: LOG_TARGET,
390 ?peer,
391 protocol = %self.protocol,
392 "state mismatch: peer doesn't exist",
393 );
394 debug_assert!(false);
395 return Err(Error::PeerDoesntExist(peer));
396 };
397
398 // clean up all pending state for the peer
399 self.negotiation.remove_outbound(&peer);
400 self.negotiation.remove_inbound(&peer);
401
402 match context.state {
403 // outbound initiated, report open failure to peer
404 PeerState::OutboundInitiated { .. } => {
405 self.event_handle
406 .report_notification_stream_open_failure(peer, NotificationError::Rejected)
407 .await;
408 }
409 // substream fully open, report that the notification stream is closed
410 PeerState::Open { shutdown } => {
411 let _ = shutdown.send(());
412 }
413 // if the substream was being validated, user must be notified that the substream is
414 // now considered rejected if they had been made aware of the existence of the pending
415 // connection
416 PeerState::Validating {
417 outbound, inbound, ..
418 } => {
419 match (outbound, inbound) {
420 // substream was being validated by the protocol when the connection was closed
421 (OutboundState::Closed, InboundState::Validating { .. }) => {
422 tracing::debug!(
423 target: LOG_TARGET,
424 ?peer,
425 protocol = %self.protocol,
426 "connection closed while validation pending",
427 );
428
429 self.peers.insert(
430 peer,
431 PeerContext {
432 state: PeerState::ValidationPending {
433 state: ConnectionState::Closed,
434 },
435 },
436 );
437 }
438 // user either initiated an outbound substream or an outbound substream was
439 // opened/being opened as a result of an accepted inbound substream but was not
440 // yet fully open
441 //
442 // to have consistent state tracking in the user protocol, substream rejection
443 // must be reported to the user
444 (
445 OutboundState::OutboundInitiated { .. }
446 | OutboundState::Negotiating
447 | OutboundState::Open { .. },
448 _,
449 ) => {
450 tracing::debug!(
451 target: LOG_TARGET,
452 ?peer,
453 protocol = %self.protocol,
454 "connection closed outbound substream under negotiation",
455 );
456
457 self.event_handle
458 .report_notification_stream_open_failure(
459 peer,
460 NotificationError::Rejected,
461 )
462 .await;
463 }
464 (_, _) => {}
465 }
466 }
467 // pending validations must be tracked across connection open/close events
468 PeerState::ValidationPending { .. } => {
469 tracing::debug!(
470 target: LOG_TARGET,
471 ?peer,
472 protocol = %self.protocol,
473 "validation pending while connection closed",
474 );
475
476 self.peers.insert(
477 peer,
478 PeerContext {
479 state: PeerState::ValidationPending {
480 state: ConnectionState::Closed,
481 },
482 },
483 );
484 }
485 _ => {}
486 }
487
488 Ok(())
489 }
490
491 /// Local node opened a substream to remote node.
492 ///
493 /// The connection can be in three different states:
494 /// - this is the first substream that was opened and thus the connection was initiated by the
495 /// local node
496 /// - this is a response to a previously received inbound substream which the local node
497 /// accepted and as a result, opened its own substream
498 /// - local and remote nodes opened substreams at the same time
499 ///
500 /// In the first case, the local node's handshake is sent to remote node and the substream is
501 /// polled in the background until they either send their handshake or close the substream.
502 ///
503 /// For the second case, the connection was initiated by the remote node and the substream was
504 /// accepted by the local node which initiated an outbound substream to the remote node.
505 /// The only valid states for this case are [`InboundState::Open`],
506 /// and [`InboundState::SendingHandshake`] as they imply
507 /// that the inbound substream have been accepted by the local node and this opened outbound
508 /// substream is a result of a valid state transition.
509 ///
510 /// For the third case, if the nodes have opened substreams at the same time, the outbound state
511 /// must be [`OutboundState::OutboundInitiated`] to ascertain that the an outbound substream was
512 /// actually opened. Any other state would be a state mismatch and would mean that the
513 /// connection is opening substreams without the permission of the protocol handler.
514 async fn on_outbound_substream(
515 &mut self,
516 protocol: ProtocolName,
517 fallback: Option<ProtocolName>,
518 peer: PeerId,
519 substream_id: SubstreamId,
520 outbound: Substream,
521 ) -> crate::Result<()> {
522 tracing::debug!(
523 target: LOG_TARGET,
524 ?peer,
525 ?protocol,
526 ?substream_id,
527 "handle outbound substream",
528 );
529
530 // peer must exist since an outbound substream was received from them
531 let Some(context) = self.peers.get_mut(&peer) else {
532 tracing::error!(target: LOG_TARGET, ?peer, "peer doesn't exist for outbound substream");
533 debug_assert!(false);
534 return Err(Error::PeerDoesntExist(peer));
535 };
536
537 let pending_peer = self.pending_outbound.remove(&substream_id);
538
539 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
540 // the connection was initiated by the local node, send handshake to remote and wait to
541 // receive their handshake back
542 PeerState::OutboundInitiated { substream } => {
543 debug_assert!(substream == substream_id);
544 debug_assert!(pending_peer == Some(peer));
545
546 tracing::trace!(
547 target: LOG_TARGET,
548 ?peer,
549 protocol = %self.protocol,
550 ?fallback,
551 ?substream_id,
552 "negotiate outbound protocol",
553 );
554
555 self.negotiation.negotiate_outbound(peer, outbound);
556 context.state = PeerState::Validating {
557 protocol,
558 fallback,
559 inbound: InboundState::Closed,
560 outbound: OutboundState::Negotiating,
561 direction: Direction::Outbound,
562 };
563 }
564 PeerState::Validating {
565 protocol,
566 fallback,
567 inbound,
568 direction,
569 outbound: outbound_state,
570 } => {
571 // the inbound substream has been accepted by the local node since the handshake has
572 // been read and the local handshake has either already been sent or
573 // it's in the process of being sent.
574 match inbound {
575 InboundState::SendingHandshake | InboundState::Open { .. } => {
576 context.state = PeerState::Validating {
577 protocol,
578 fallback,
579 inbound,
580 direction,
581 outbound: OutboundState::Negotiating,
582 };
583 self.negotiation.negotiate_outbound(peer, outbound);
584 }
585 // nodes have opened substreams at the same time
586 inbound_state => match outbound_state {
587 OutboundState::OutboundInitiated { substream } => {
588 debug_assert!(substream == substream_id);
589
590 context.state = PeerState::Validating {
591 protocol,
592 fallback,
593 direction,
594 inbound: inbound_state,
595 outbound: OutboundState::Negotiating,
596 };
597 self.negotiation.negotiate_outbound(peer, outbound);
598 }
599 // invalid state: more than one outbound substream has been opened
600 inner_state => {
601 tracing::error!(
602 target: LOG_TARGET,
603 ?peer,
604 %protocol,
605 ?substream_id,
606 ?inbound_state,
607 ?inner_state,
608 "invalid state, expected `OutboundInitiated`",
609 );
610
611 let _ = outbound.close().await;
612 debug_assert!(false);
613 }
614 },
615 }
616 }
617 // the connection may have been closed while an outbound substream was pending
618 // if the outbound substream was initiated successfully, close it and reset
619 // `pending_open`
620 PeerState::Closed { pending_open } if pending_open == Some(substream_id) => {
621 let _ = outbound.close().await;
622
623 context.state = PeerState::Closed { pending_open: None };
624 }
625 state => {
626 tracing::error!(
627 target: LOG_TARGET,
628 ?peer,
629 %protocol,
630 ?substream_id,
631 ?state,
632 "invalid state: more than one outbound substream opened",
633 );
634
635 let _ = outbound.close().await;
636 debug_assert!(false);
637 }
638 }
639
640 Ok(())
641 }
642
643 /// Remote opened a substream to local node.
644 ///
645 /// The peer can be in four different states for the inbound substream to be considered valid:
646 /// - the connection is closed
647 /// - conneection is open but substream validation from a previous connection is still pending
648 /// - outbound substream has been opened but not yet acknowledged by the remote peer
649 /// - outbound substream has been opened and acknowledged by the remote peer and it's being
650 /// negotiated
651 ///
652 /// If remote opened more than one substream, the new substream is simply discarded.
653 async fn on_inbound_substream(
654 &mut self,
655 protocol: ProtocolName,
656 fallback: Option<ProtocolName>,
657 peer: PeerId,
658 substream: Substream,
659 ) -> crate::Result<()> {
660 // peer must exist since an inbound substream was received from them
661 let Some(context) = self.peers.get_mut(&peer) else {
662 tracing::error!(target: LOG_TARGET, ?peer, "peer doesn't exist for inbound substream");
663 debug_assert!(false);
664 return Err(Error::PeerDoesntExist(peer));
665 };
666
667 tracing::debug!(
668 target: LOG_TARGET,
669 ?peer,
670 %protocol,
671 ?fallback,
672 state = ?context.state,
673 "handle inbound substream",
674 );
675
676 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
677 // inbound substream of a previous connection is still pending validation,
678 // reject any new inbound substreams until an answer is heard from the user
679 state @ PeerState::ValidationPending { .. } => {
680 tracing::debug!(
681 target: LOG_TARGET,
682 ?peer,
683 %protocol,
684 ?fallback,
685 ?state,
686 "validation for previous substream still pending",
687 );
688
689 let _ = substream.close().await;
690 context.state = state;
691 }
692 // outbound substream for previous connection still pending, reject inbound substream
693 // and wait for the outbound substream state to conclude as either succeeded or failed
694 // before accepting any inbound substreams.
695 PeerState::Closed {
696 pending_open: Some(substream_id),
697 } => {
698 tracing::debug!(
699 target: LOG_TARGET,
700 ?peer,
701 protocol = %self.protocol,
702 "received inbound substream while outbound substream opening, rejecting",
703 );
704 let _ = substream.close().await;
705
706 context.state = PeerState::Closed {
707 pending_open: Some(substream_id),
708 };
709 }
710 // the peer state is closed so this is a fresh inbound substream.
711 PeerState::Closed { pending_open: None } => {
712 self.negotiation.read_handshake(peer, substream);
713
714 context.state = PeerState::Validating {
715 protocol,
716 fallback,
717 direction: Direction::Inbound,
718 inbound: InboundState::ReadingHandshake,
719 outbound: OutboundState::Closed,
720 };
721 }
722 // if the connection is under validation (so an outbound substream has been opened and
723 // it's still pending or under negotiation), the only valid state for the
724 // inbound state is closed as it indicates that there isn't an inbound substream yet for
725 // the remote node duplicate substreams are prohibited.
726 PeerState::Validating {
727 protocol,
728 fallback,
729 outbound,
730 direction,
731 inbound: InboundState::Closed,
732 } => {
733 self.negotiation.read_handshake(peer, substream);
734
735 context.state = PeerState::Validating {
736 protocol,
737 fallback,
738 outbound,
739 direction,
740 inbound: InboundState::ReadingHandshake,
741 };
742 }
743 // outbound substream may have been initiated by the local node while a remote node also
744 // opened a substream roughly at the same time
745 PeerState::OutboundInitiated {
746 substream: outbound,
747 } => {
748 self.negotiation.read_handshake(peer, substream);
749
750 context.state = PeerState::Validating {
751 protocol,
752 fallback,
753 direction: Direction::Outbound,
754 outbound: OutboundState::OutboundInitiated {
755 substream: outbound,
756 },
757 inbound: InboundState::ReadingHandshake,
758 };
759 }
760 // new inbound substream opend while validation for the previous substream was still
761 // pending
762 //
763 // the old substream can be considered dead because remote wouldn't open a new substream
764 // to us unless they had discarded the previous substream.
765 //
766 // set peer state to `ValidationPending` to indicate that the peer is "blocked" until a
767 // validation for the substream is heard, blocking any further activity for
768 // the connection and once the validation is received and in case the
769 // substream is accepted, it will be reported as open failure to to the peer
770 // because the states have gone out of sync.
771 PeerState::Validating {
772 outbound: OutboundState::Closed,
773 inbound:
774 InboundState::Validating {
775 inbound: pending_substream,
776 },
777 ..
778 } => {
779 tracing::debug!(
780 target: LOG_TARGET,
781 ?peer,
782 protocol = %self.protocol,
783 "remote opened substream while previous was still pending, connection failed",
784 );
785 let _ = substream.close().await;
786 let _ = pending_substream.close().await;
787
788 context.state = PeerState::ValidationPending {
789 state: ConnectionState::Open,
790 };
791 }
792 // remote opened another inbound substream, close it and otherwise ignore the event
793 // as this is a non-serious protocol violation.
794 state => {
795 tracing::debug!(
796 target: LOG_TARGET,
797 ?peer,
798 %protocol,
799 ?fallback,
800 ?state,
801 "remote opened more than one inbound substreams, discarding",
802 );
803
804 let _ = substream.close().await;
805 context.state = state;
806 }
807 }
808
809 Ok(())
810 }
811
812 /// Failed to open substream to remote node.
813 ///
814 /// If the substream was initiated by the local node, it must be reported that the substream
815 /// failed to open. Otherwise the peer state can silently be converted to `Closed`.
816 async fn on_substream_open_failure(
817 &mut self,
818 substream_id: SubstreamId,
819 error: SubstreamError,
820 ) {
821 tracing::debug!(
822 target: LOG_TARGET,
823 protocol = %self.protocol,
824 ?substream_id,
825 ?error,
826 "failed to open substream"
827 );
828
829 let Some(peer) = self.pending_outbound.remove(&substream_id) else {
830 tracing::warn!(
831 target: LOG_TARGET,
832 protocol = %self.protocol,
833 ?substream_id,
834 "pending outbound substream doesn't exist",
835 );
836 debug_assert!(false);
837 return;
838 };
839
840 // peer must exist since an outbound substream failure was received from them
841 let Some(context) = self.peers.get_mut(&peer) else {
842 tracing::warn!(target: LOG_TARGET, ?peer, "peer doesn't exist");
843 debug_assert!(false);
844 return;
845 };
846
847 match &mut context.state {
848 PeerState::OutboundInitiated { .. } => {
849 context.state = PeerState::Closed { pending_open: None };
850
851 self.event_handle
852 .report_notification_stream_open_failure(peer, NotificationError::Rejected)
853 .await;
854 }
855 // if the substream was accepted by the local node and as a result, an outbound
856 // substream was accepted as a result this should not be reported to local node
857 PeerState::Validating { outbound, .. } => {
858 self.negotiation.remove_inbound(&peer);
859 self.negotiation.remove_outbound(&peer);
860
861 let pending_open = match outbound {
862 OutboundState::Closed => None,
863 OutboundState::OutboundInitiated { substream } => {
864 self.event_handle
865 .report_notification_stream_open_failure(
866 peer,
867 NotificationError::Rejected,
868 )
869 .await;
870
871 Some(*substream)
872 }
873 OutboundState::Negotiating | OutboundState::Open { .. } => {
874 self.event_handle
875 .report_notification_stream_open_failure(
876 peer,
877 NotificationError::Rejected,
878 )
879 .await;
880
881 None
882 }
883 };
884
885 context.state = PeerState::Closed { pending_open };
886 }
887 PeerState::Closed { pending_open } => {
888 tracing::debug!(
889 target: LOG_TARGET,
890 protocol = %self.protocol,
891 ?substream_id,
892 "substream open failure for a closed connection",
893 );
894 debug_assert_eq!(pending_open, &Some(substream_id));
895 context.state = PeerState::Closed { pending_open: None };
896 }
897 state => {
898 tracing::warn!(
899 target: LOG_TARGET,
900 protocol = %self.protocol,
901 ?substream_id,
902 ?state,
903 "invalid state for outbound substream open failure",
904 );
905 context.state = PeerState::Closed { pending_open: None };
906 debug_assert!(false);
907 }
908 }
909 }
910
911 /// Open substream to remote `peer`.
912 ///
913 /// Outbound substream can opened only if the `PeerState` is `Closed`.
914 /// By forcing the substream to be opened only if the state is currently closed,
915 /// `NotificationProtocol` can enfore more predictable state transitions.
916 ///
917 /// Other states either imply an invalid state transition ([`PeerState::Open`]) or that an
918 /// inbound substream has already been received and its currently being validated by the user.
919 async fn on_open_substream(&mut self, peer: PeerId) -> crate::Result<()> {
920 tracing::trace!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "open substream");
921
922 let Some(context) = self.peers.get_mut(&peer) else {
923 if !self.should_dial {
924 tracing::debug!(
925 target: LOG_TARGET,
926 ?peer,
927 protocol = %self.protocol,
928 "connection to peer not open and dialing disabled",
929 );
930
931 self.event_handle
932 .report_notification_stream_open_failure(peer, NotificationError::DialFailure)
933 .await;
934 return Ok(());
935 }
936
937 match self.service.dial(&peer) {
938 Err(error) => {
939 tracing::debug!(
940 target: LOG_TARGET,
941 ?peer,
942 protocol = %self.protocol,
943 ?error,
944 "failed to dial peer",
945 );
946
947 self.event_handle
948 .report_notification_stream_open_failure(
949 peer,
950 NotificationError::DialFailure,
951 )
952 .await;
953
954 return Err(error.into());
955 }
956 Ok(()) => {
957 tracing::trace!(
958 target: LOG_TARGET,
959 ?peer,
960 protocol = %self.protocol,
961 "started to dial peer",
962 );
963
964 self.peers.insert(
965 peer,
966 PeerContext {
967 state: PeerState::Dialing,
968 },
969 );
970 return Ok(());
971 }
972 }
973 };
974
975 match context.state {
976 // protocol can only request a new outbound substream to be opened if the state is
977 // `Closed` other states imply that it's already open
978 PeerState::Closed {
979 pending_open: Some(substream_id),
980 } => {
981 tracing::trace!(
982 target: LOG_TARGET,
983 ?peer,
984 protocol = %self.protocol,
985 ?substream_id,
986 "outbound substream opening, reusing pending open substream",
987 );
988
989 self.pending_outbound.insert(substream_id, peer);
990 context.state = PeerState::OutboundInitiated {
991 substream: substream_id,
992 };
993 }
994 PeerState::Closed { .. } => match self.service.open_substream(peer) {
995 Ok(substream_id) => {
996 tracing::trace!(
997 target: LOG_TARGET,
998 ?peer,
999 protocol = %self.protocol,
1000 ?substream_id,
1001 "outbound substream opening",
1002 );
1003
1004 self.pending_outbound.insert(substream_id, peer);
1005 context.state = PeerState::OutboundInitiated {
1006 substream: substream_id,
1007 };
1008 }
1009 Err(error) => {
1010 tracing::debug!(
1011 target: LOG_TARGET,
1012 ?peer,
1013 protocol = %self.protocol,
1014 ?error,
1015 "failed to open substream",
1016 );
1017
1018 self.event_handle
1019 .report_notification_stream_open_failure(
1020 peer,
1021 NotificationError::NoConnection,
1022 )
1023 .await;
1024 context.state = PeerState::Closed { pending_open: None };
1025 }
1026 },
1027 // while a validation is pending for an inbound substream, user is not allowed to open
1028 // any outbound substreams until the old inbond substream is either accepted or rejected
1029 PeerState::ValidationPending { .. } => {
1030 tracing::trace!(
1031 target: LOG_TARGET,
1032 ?peer,
1033 protocol = %self.protocol,
1034 "validation still pending, rejecting outbound substream request",
1035 );
1036
1037 self.event_handle
1038 .report_notification_stream_open_failure(
1039 peer,
1040 NotificationError::ValidationPending,
1041 )
1042 .await;
1043 }
1044 _ => {}
1045 }
1046
1047 Ok(())
1048 }
1049
1050 /// Close substream to remote `peer`.
1051 ///
1052 /// This function can only be called if the substream was actually open, any other state is
1053 /// unreachable as the user is unable to emit this command to [`NotificationProtocol`] unless
1054 /// the connection has been fully opened.
1055 async fn on_close_substream(&mut self, peer: PeerId) {
1056 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, "close substream");
1057
1058 let Some(context) = self.peers.get_mut(&peer) else {
1059 tracing::debug!(target: LOG_TARGET, ?peer, "peer doesn't exist");
1060 return;
1061 };
1062
1063 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1064 PeerState::Open { shutdown } => {
1065 let _ = shutdown.send(());
1066
1067 context.state = PeerState::Closed { pending_open: None };
1068 }
1069 state => {
1070 tracing::debug!(
1071 target: LOG_TARGET,
1072 ?peer,
1073 protocol = %self.protocol,
1074 ?state,
1075 "substream already closed",
1076 );
1077 context.state = state;
1078 }
1079 }
1080 }
1081
1082 /// Handle validation result.
1083 ///
1084 /// The validation result binary (accept/reject). If the node is rejected, the substreams are
1085 /// discarded and state is set to `PeerState::Closed`. If there was an outbound substream in
1086 /// progress while the connection was rejected by the user, the oubound state is discarded,
1087 /// except for the substream ID of the substream which is kept for later use, in case the
1088 /// substream happens to open.
1089 ///
1090 /// If the node is accepted and there is no outbound substream to them open yet, a new substream
1091 /// is opened and once it opens, the local handshake will be sent to the remote peer and if
1092 /// they also accept the substream the connection is considered fully open.
1093 async fn on_validation_result(
1094 &mut self,
1095 peer: PeerId,
1096 result: ValidationResult,
1097 ) -> crate::Result<()> {
1098 tracing::trace!(
1099 target: LOG_TARGET,
1100 ?peer,
1101 protocol = %self.protocol,
1102 ?result,
1103 "handle validation result",
1104 );
1105
1106 let Some(context) = self.peers.get_mut(&peer) else {
1107 tracing::debug!(target: LOG_TARGET, ?peer, "peer doesn't exist");
1108 return Err(Error::PeerDoesntExist(peer));
1109 };
1110
1111 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1112 PeerState::Validating {
1113 protocol,
1114 fallback,
1115 outbound,
1116 direction,
1117 inbound: InboundState::Validating { inbound },
1118 } => match result {
1119 // substream was rejected by the local node, if an outbound substream was under
1120 // negotation, discard that data and if an outbound substream was
1121 // initiated, save the `SubstreamId` of that substream and later if the substream
1122 // is opened, the state can be corrected to `pending_open: None`.
1123 ValidationResult::Reject => {
1124 let _ = inbound.close().await;
1125 self.negotiation.remove_outbound(&peer);
1126 self.negotiation.remove_inbound(&peer);
1127 context.state = PeerState::Closed {
1128 pending_open: outbound.pending_open(),
1129 };
1130
1131 Ok(())
1132 }
1133 ValidationResult::Accept => match outbound {
1134 // no outbound substream exists so initiate a new substream open and send the
1135 // local handshake to remote node, indicating that the
1136 // connection was accepted by the local node
1137 OutboundState::Closed => match self.service.open_substream(peer) {
1138 Ok(substream) => {
1139 self.negotiation.send_handshake(peer, inbound);
1140 self.pending_outbound.insert(substream, peer);
1141
1142 context.state = PeerState::Validating {
1143 protocol,
1144 fallback,
1145 direction,
1146 inbound: InboundState::SendingHandshake,
1147 outbound: OutboundState::OutboundInitiated { substream },
1148 };
1149 Ok(())
1150 }
1151 // failed to open outbound substream after accepting an inbound substream
1152 //
1153 // since the user was notified of this substream and they accepted it,
1154 // they expecting some kind of answer (open success/failure).
1155 //
1156 // report to user that the substream failed to open so they can track the
1157 // state transitions of the peer correctly
1158 Err(error) => {
1159 tracing::trace!(
1160 target: LOG_TARGET,
1161 ?peer,
1162 protocol = %self.protocol,
1163 ?result,
1164 ?error,
1165 "failed to open outbound substream for accepted substream",
1166 );
1167
1168 let _ = inbound.close().await;
1169 context.state = PeerState::Closed { pending_open: None };
1170
1171 self.event_handle
1172 .report_notification_stream_open_failure(
1173 peer,
1174 NotificationError::Rejected,
1175 )
1176 .await;
1177
1178 Err(error.into())
1179 }
1180 },
1181 // here the state is one of `OutboundState::{OutboundInitiated, Negotiating,
1182 // Open}` so that state can be safely ignored and all that
1183 // has to be done is to send the local handshake to remote
1184 // node to indicate that the connection was accepted.
1185 _ => {
1186 self.negotiation.send_handshake(peer, inbound);
1187
1188 context.state = PeerState::Validating {
1189 protocol,
1190 fallback,
1191 direction,
1192 inbound: InboundState::SendingHandshake,
1193 outbound,
1194 };
1195 Ok(())
1196 }
1197 },
1198 },
1199 // validation result received for an inbound substream which is now considered dead
1200 // because while the substream was being validated, the connection had closed.
1201 //
1202 // if the substream was rejected and there is no active connection to the peer,
1203 // just remove the peer from `peers` without informing user
1204 //
1205 // if the substream was accepted, the user must be informed that the substream failed to
1206 // open. Depending on whether there is currently a connection open to the peer, either
1207 // report `Rejected`/`NoConnection` and let the user try again.
1208 PeerState::ValidationPending { state } => {
1209 if let Some(error) = match state {
1210 ConnectionState::Open => {
1211 context.state = PeerState::Closed { pending_open: None };
1212
1213 std::matches!(result, ValidationResult::Accept)
1214 .then_some(NotificationError::Rejected)
1215 }
1216 ConnectionState::Closed => {
1217 self.peers.remove(&peer);
1218
1219 std::matches!(result, ValidationResult::Accept)
1220 .then_some(NotificationError::NoConnection)
1221 }
1222 } {
1223 self.event_handle.report_notification_stream_open_failure(peer, error).await;
1224 }
1225
1226 Ok(())
1227 }
1228 // if the user incorrectly send a validation result for a peer that doesn't require
1229 // validation, set state back to what it was and ignore the event
1230 //
1231 // the user protocol may send a stale validation result not because of a programming
1232 // error but because it has a backlock of unhandled events, with one event potentially
1233 // nullifying the need for substream validation, and is just temporarily out of sync
1234 // with `NotificationProtocol`
1235 state => {
1236 tracing::debug!(
1237 target: LOG_TARGET,
1238 ?peer,
1239 protocol = %self.protocol,
1240 ?state,
1241 "validation result received for peer that doesn't require validation",
1242 );
1243
1244 context.state = state;
1245 Ok(())
1246 }
1247 }
1248 }
1249
1250 /// Handle handshake event.
1251 ///
1252 /// There are three different handshake event types:
1253 /// - outbound substream negotiated
1254 /// - inbound substream negotiated
1255 /// - substream negotiation error
1256 ///
1257 /// Neither outbound nor inbound substream negotiated automatically means that the connection is
1258 /// considered open as both substreams must be fully negotiated for that to be the case. That is
1259 /// why the peer state for inbound and outbound are set separately and at the end of the
1260 /// function is the collective state of the substreams checked and if both substreams are
1261 /// negotiated, the user informed that the connection is open.
1262 ///
1263 /// If the negotiation fails, the user may have to be informed of that. Outbound substream
1264 /// failure always results in user getting notified since the existence of an outbound substream
1265 /// means that the user has either initiated an outbound substreams or has accepted an inbound
1266 /// substreams, resulting in an outbound substreams.
1267 ///
1268 /// Negotiation failure for inbound substreams which are in the state
1269 /// [`InboundState::ReadingHandshake`] don't result in any notification because while the
1270 /// handshake is being read from the substream, the user is oblivious to the fact that an
1271 /// inbound substream has even been received.
1272 async fn on_handshake_event(&mut self, peer: PeerId, event: HandshakeEvent) {
1273 let Some(context) = self.peers.get_mut(&peer) else {
1274 tracing::error!(target: LOG_TARGET, "invalid state: negotiation event received but peer doesn't exist");
1275 debug_assert!(false);
1276 return;
1277 };
1278
1279 tracing::trace!(
1280 target: LOG_TARGET,
1281 ?peer,
1282 protocol = %self.protocol,
1283 ?event,
1284 "handle handshake event",
1285 );
1286
1287 match event {
1288 // either an inbound or outbound substream has been negotiated successfully
1289 HandshakeEvent::Negotiated {
1290 peer,
1291 handshake,
1292 substream,
1293 direction,
1294 } => match direction {
1295 // outbound substream was negotiated, the only valid state for peer is `Validating`
1296 // and only valid state for `OutboundState` is `Negotiating`
1297 negotiation::Direction::Outbound => {
1298 self.negotiation.remove_outbound(&peer);
1299
1300 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1301 PeerState::Validating {
1302 protocol,
1303 fallback,
1304 direction,
1305 outbound: OutboundState::Negotiating,
1306 inbound,
1307 } => {
1308 context.state = PeerState::Validating {
1309 protocol,
1310 fallback,
1311 direction,
1312 outbound: OutboundState::Open {
1313 handshake,
1314 outbound: substream,
1315 },
1316 inbound,
1317 };
1318 }
1319 state => {
1320 tracing::warn!(
1321 target: LOG_TARGET,
1322 ?peer,
1323 ?state,
1324 "outbound substream negotiated but peer has invalid state",
1325 );
1326 debug_assert!(false);
1327 }
1328 }
1329 }
1330 // inbound negotiation event completed
1331 //
1332 // the negotiation event can be on of two different types:
1333 // - remote handshake was read from the substream
1334 // - local handshake has been sent to remote node
1335 //
1336 // For the first case, the substream has to be validated by the local node.
1337 // This means reporting the protocol name, potential negotiated fallback and the
1338 // handshake. Local node will then either accept or reject the substream which is
1339 // handled by [`NotificationProtocol::on_validation_result()`]. Compared to
1340 // Substrate, litep2p requires both peers to validate the inbound handshake to allow
1341 // more complex connection validation. If this is not necessary and the protocol
1342 // wishes to auto-accept the inbound substreams that are a result of
1343 // an outbound substream already accepted by the remote node, the
1344 // substream validation is skipped and the local handshake is sent
1345 // right away.
1346 //
1347 // For the second case, the local handshake was sent to remote node successfully and
1348 // the inbound substream is considered open and if the outbound
1349 // substream is open as well, the connection is fully open.
1350 //
1351 // Only valid states for [`InboundState`] are [`InboundState::ReadingHandshake`] and
1352 // [`InboundState::SendingHandshake`] because otherwise the inbound
1353 // substream cannot be in [`HandshakeService`](super::negotiation::HandshakeService)
1354 // unless there is a logic bug in the state machine.
1355 negotiation::Direction::Inbound => {
1356 self.negotiation.remove_inbound(&peer);
1357
1358 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1359 PeerState::Validating {
1360 protocol,
1361 fallback,
1362 direction,
1363 outbound,
1364 inbound: InboundState::ReadingHandshake,
1365 } => {
1366 if !std::matches!(outbound, OutboundState::Closed) && self.auto_accept {
1367 tracing::trace!(
1368 target: LOG_TARGET,
1369 ?peer,
1370 %protocol,
1371 ?fallback,
1372 ?direction,
1373 ?outbound,
1374 "auto-accept inbound substream",
1375 );
1376
1377 self.negotiation.send_handshake(peer, substream);
1378 context.state = PeerState::Validating {
1379 protocol,
1380 fallback,
1381 direction,
1382 inbound: InboundState::SendingHandshake,
1383 outbound,
1384 };
1385
1386 return;
1387 }
1388
1389 tracing::trace!(
1390 target: LOG_TARGET,
1391 ?peer,
1392 %protocol,
1393 ?fallback,
1394 ?outbound,
1395 "send inbound protocol for validation",
1396 );
1397
1398 context.state = PeerState::Validating {
1399 protocol: protocol.clone(),
1400 fallback: fallback.clone(),
1401 inbound: InboundState::Validating { inbound: substream },
1402 outbound,
1403 direction,
1404 };
1405
1406 let (tx, rx) = oneshot::channel();
1407 self.pending_validations.push(Box::pin(async move {
1408 match rx.await {
1409 Ok(ValidationResult::Accept) =>
1410 (peer, ValidationResult::Accept),
1411 _ => (peer, ValidationResult::Reject),
1412 }
1413 }));
1414
1415 self.event_handle
1416 .report_inbound_substream(protocol, fallback, peer, handshake, tx)
1417 .await;
1418 }
1419 PeerState::Validating {
1420 protocol,
1421 fallback,
1422 direction,
1423 inbound: InboundState::SendingHandshake,
1424 outbound,
1425 } => {
1426 tracing::trace!(
1427 target: LOG_TARGET,
1428 ?peer,
1429 %protocol,
1430 ?fallback,
1431 "inbound substream negotiated, waiting for outbound substream to complete",
1432 );
1433
1434 context.state = PeerState::Validating {
1435 protocol: protocol.clone(),
1436 fallback: fallback.clone(),
1437 inbound: InboundState::Open { inbound: substream },
1438 outbound,
1439 direction,
1440 };
1441 }
1442 _state => debug_assert!(false),
1443 }
1444 }
1445 },
1446 // error occurred during negotiation, eitehr for inbound or outbound substream
1447 // user is notified of the error only if they've either initiated an outbound substream
1448 // or if they accepted an inbound substream and as a result initiated an outbound
1449 // substream.
1450 HandshakeEvent::NegotiationError { peer, direction } => {
1451 tracing::debug!(
1452 target: LOG_TARGET,
1453 ?peer,
1454 protocol = %self.protocol,
1455 ?direction,
1456 state = ?context.state,
1457 "failed to negotiate substream",
1458 );
1459 let _ = self.negotiation.remove_outbound(&peer);
1460 let _ = self.negotiation.remove_inbound(&peer);
1461
1462 // if an outbound substream had been initiated (whatever its state is), it means
1463 // that the user knows about the connection and must be notified that it failed to
1464 // negotiate.
1465 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1466 PeerState::Validating { outbound, .. } => {
1467 context.state = PeerState::Closed {
1468 pending_open: outbound.pending_open(),
1469 };
1470
1471 // notify user if the outbound substream is not considered closed
1472 if !std::matches!(outbound, OutboundState::Closed) {
1473 return self
1474 .event_handle
1475 .report_notification_stream_open_failure(
1476 peer,
1477 NotificationError::Rejected,
1478 )
1479 .await;
1480 }
1481 }
1482 _state => debug_assert!(false),
1483 }
1484 }
1485 }
1486
1487 // if both inbound and outbound substreams are considered open, notify the user that
1488 // a notification stream has been opened and set up for sending and receiving
1489 // notifications to and from remote node
1490 match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1491 PeerState::Validating {
1492 protocol,
1493 fallback,
1494 direction,
1495 outbound:
1496 OutboundState::Open {
1497 handshake,
1498 outbound,
1499 },
1500 inbound: InboundState::Open { inbound },
1501 } => {
1502 tracing::debug!(
1503 target: LOG_TARGET,
1504 ?peer,
1505 %protocol,
1506 ?fallback,
1507 "notification stream opened",
1508 );
1509
1510 let (async_tx, async_rx) = channel(self.async_channel_size);
1511 let (sync_tx, sync_rx) = channel(self.sync_channel_size);
1512 let sink = NotificationSink::new(peer, sync_tx, async_tx);
1513
1514 // start connection handler for the peer which only deals with sending/receiving
1515 // notifications
1516 //
1517 // the connection handler must be started only after the newly opened notification
1518 // substream is reported to user because the connection handler
1519 // might exit immediately after being started if remote closed the connection.
1520 //
1521 // if the order of events (open & close) is not ensured to be correct, the code
1522 // handling the connectivity logic on the `NotificationHandle` side
1523 // might get confused about the current state of the connection.
1524 let shutdown_tx = self.shutdown_tx.clone();
1525 let (connection, shutdown) = Connection::new(
1526 peer,
1527 inbound,
1528 outbound,
1529 self.event_handle.clone(),
1530 shutdown_tx.clone(),
1531 self.notif_tx.clone(),
1532 async_rx,
1533 sync_rx,
1534 );
1535
1536 context.state = PeerState::Open { shutdown };
1537 self.event_handle
1538 .report_notification_stream_opened(
1539 protocol, fallback, direction, peer, handshake, sink,
1540 )
1541 .await;
1542
1543 self.executor.run(Box::pin(async move {
1544 connection.start().await;
1545 }));
1546 }
1547 state => {
1548 tracing::trace!(
1549 target: LOG_TARGET,
1550 ?peer,
1551 protocol = %self.protocol,
1552 ?state,
1553 "validation for substream still pending",
1554 );
1555 self.timers.push(Box::pin(async move {
1556 futures_timer::Delay::new(Duration::from_secs(5)).await;
1557 peer
1558 }));
1559
1560 context.state = state;
1561 }
1562 }
1563 }
1564
1565 /// Handle dial failure.
1566 async fn on_dial_failure(&mut self, peer: PeerId, address: Multiaddr) {
1567 tracing::trace!(
1568 target: LOG_TARGET,
1569 ?peer,
1570 protocol = %self.protocol,
1571 ?address,
1572 "handle dial failure",
1573 );
1574
1575 let Some(context) = self.peers.remove(&peer) else {
1576 tracing::trace!(
1577 target: LOG_TARGET,
1578 ?peer,
1579 protocol = %self.protocol,
1580 ?address,
1581 "dial failure for an unknown peer",
1582 );
1583 return;
1584 };
1585
1586 match context.state {
1587 PeerState::Dialing => {
1588 tracing::debug!(target: LOG_TARGET, ?peer, protocol = %self.protocol, ?address, "failed to dial peer");
1589 self.event_handle
1590 .report_notification_stream_open_failure(peer, NotificationError::DialFailure)
1591 .await;
1592 }
1593 state => {
1594 tracing::trace!(
1595 target: LOG_TARGET,
1596 ?peer,
1597 protocol = %self.protocol,
1598 ?state,
1599 "dial failure for peer that's not being dialed",
1600 );
1601 self.peers.insert(peer, PeerContext { state });
1602 }
1603 }
1604 }
1605
1606 /// Handle next notification event.
1607 async fn next_event(&mut self) {
1608 // biased select is used because the substream events must be prioritized above other events
1609 // that is because a closed substream is detected by either `substreams` or `negotiation`
1610 // and if that event is not handled with priority but, e.g., inbound substream is
1611 // handled before, it can create a situation where the state machine gets confused
1612 // about the peer's state.
1613 tokio::select! {
1614 biased;
1615
1616 event = self.negotiation.next(), if !self.negotiation.is_empty() => {
1617 if let Some((peer, event)) = event {
1618 self.on_handshake_event(peer, event).await;
1619 } else {
1620 tracing::error!(target: LOG_TARGET, "`HandshakeService` expected to return `Some(..)`");
1621 debug_assert!(false);
1622 };
1623 }
1624 event = self.shutdown_rx.recv() => match event {
1625 None => (),
1626 Some(peer) => {
1627 if let Some(context) = self.peers.get_mut(&peer) {
1628 tracing::trace!(
1629 target: LOG_TARGET,
1630 ?peer,
1631 protocol = %self.protocol,
1632 "notification stream to peer closed",
1633 );
1634 context.state = PeerState::Closed { pending_open: None };
1635 }
1636 }
1637 },
1638 // TODO: this could be combined with `Negotiation`
1639 peer = self.timers.next(), if !self.timers.is_empty() => match peer {
1640 Some(peer) => {
1641 match self.peers.get_mut(&peer) {
1642 Some(context) => match std::mem::replace(&mut context.state, PeerState::Poisoned) {
1643 PeerState::Validating {
1644 outbound: OutboundState::Open { outbound, .. },
1645 inbound: InboundState::Closed,
1646 ..
1647 } => {
1648 tracing::debug!(
1649 target: LOG_TARGET,
1650 ?peer,
1651 protocol = %self.protocol,
1652 "peer didn't answer in 10 seconds, canceling substream and closing connection",
1653 );
1654 context.state = PeerState::Closed { pending_open: None };
1655
1656 let _ = outbound.close().await;
1657 self.event_handle
1658 .report_notification_stream_open_failure(peer, NotificationError::Rejected)
1659 .await;
1660
1661 // NOTE: this is used to work around an issue in Substrate where the protocol
1662 // is not notified if an inbound substream is closed. That indicates that remote
1663 // wishes the close the connection but `Notifications` still keeps the substream state
1664 // as `Open` until the outbound substream is closed (even though the outbound substream
1665 // is also closed at that point). This causes a further issue: inbound substreams
1666 // are automatically opened when state is `Open`, even if the inbound substream belongs
1667 // to a new "connection" (pair of substreams).
1668 //
1669 // basically what happens (from Substrate's PoV) is there are pair of substreams (`inbound1`, `outbound1`),
1670 // litep2p closes both substreams so both `inbound1` and outbound1 become non-readable/writable.
1671 // Substrate doesn't detect this an instead only marks `inbound1` is closed while still keeping
1672 // the (now-closed) `outbound1` active and it will be detected closed only when Substrate tries to
1673 // write something into that substream. If now litep2p tries to open a new connection to Substrate,
1674 // the outbound substream from litep2p's PoV will be automatically accepted (https://github.com/paritytech/polkadot-sdk/blob/59b2661444de2a25f2125a831bd786035a9fac4b/substrate/client/network/src/protocol/notifications/handler.rs#L544-L556)
1675 // but since Substrate thinks `outbound1` is still active, it won't open a new outbound substream
1676 // and it ends up having (`inbound2`, `outbound1`) as its pair of substreams which doens't make sense.
1677 //
1678 // since litep2p is expecting to receive an inbound substream from Substrate and never receives it,
1679 // it basically can't make progress with the substream open request because litep2p can't force Substrate
1680 // to detect that `outbound1` is closed. Easiest (and very hacky at the same time) way to reset the substream
1681 // state is to close the connection. This is not an appropriate way to fix the issue and causes issues with,
1682 // e.g., smoldot which at the time of writing this doesn't support the transaction protocol. The only way to fix
1683 // this cleanly is to make Substrate detect closed substreams correctly.
1684 if let Err(error) = self.service.force_close(peer) {
1685 tracing::debug!(
1686 target: LOG_TARGET,
1687 ?peer,
1688 protocol = %self.protocol,
1689 ?error,
1690 "failed to force close connection",
1691 );
1692 }
1693 }
1694 state => {
1695 tracing::trace!(
1696 target: LOG_TARGET,
1697 ?peer,
1698 protocol = %self.protocol,
1699 ?state,
1700 "ignore expired timer for peer",
1701 );
1702 context.state = state;
1703 }
1704 }
1705 None => tracing::debug!(
1706 target: LOG_TARGET,
1707 ?peer,
1708 protocol = %self.protocol,
1709 "peer doesn't exist anymore",
1710 ),
1711 }
1712 }
1713 None => (),
1714 },
1715 event = self.service.next() => match event {
1716 Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
1717 if let Err(error) = self.on_connection_established(peer).await {
1718 tracing::debug!(
1719 target: LOG_TARGET,
1720 ?peer,
1721 ?error,
1722 "failed to register peer",
1723 );
1724 }
1725 }
1726 Some(TransportEvent::ConnectionClosed { peer }) => {
1727 if let Err(error) = self.on_connection_closed(peer).await {
1728 tracing::debug!(
1729 target: LOG_TARGET,
1730 ?peer,
1731 ?error,
1732 "failed to disconnect peer",
1733 );
1734 }
1735 }
1736 Some(TransportEvent::SubstreamOpened {
1737 peer,
1738 substream,
1739 direction,
1740 protocol,
1741 fallback,
1742 }) => match direction {
1743 protocol::Direction::Inbound => {
1744 if let Err(error) = self.on_inbound_substream(protocol, fallback, peer, substream).await {
1745 tracing::debug!(
1746 target: LOG_TARGET,
1747 ?peer,
1748 ?error,
1749 "failed to handle inbound substream",
1750 );
1751 }
1752 }
1753 protocol::Direction::Outbound(substream_id) => {
1754 if let Err(error) = self
1755 .on_outbound_substream(protocol, fallback, peer, substream_id, substream)
1756 .await
1757 {
1758 tracing::debug!(
1759 target: LOG_TARGET,
1760 ?peer,
1761 ?error,
1762 "failed to handle outbound substream",
1763 );
1764 }
1765 }
1766 },
1767 Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
1768 self.on_substream_open_failure(substream, error).await;
1769 }
1770 Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address).await,
1771 None => (),
1772 },
1773 result = self.pending_validations.select_next_some(), if !self.pending_validations.is_empty() => {
1774 if let Err(error) = self.on_validation_result(result.0, result.1).await {
1775 tracing::debug!(
1776 target: LOG_TARGET,
1777 peer = ?result.0,
1778 result = ?result.1,
1779 ?error,
1780 "failed to handle validation result",
1781 );
1782 }
1783 }
1784 command = self.command_rx.recv() => match command {
1785 None => {
1786 tracing::debug!(target: LOG_TARGET, "user protocol has exited, exiting");
1787 }
1788 Some(command) => match command {
1789 NotificationCommand::OpenSubstream { peers } => {
1790 for peer in peers {
1791 if let Err(error) = self.on_open_substream(peer).await {
1792 tracing::debug!(
1793 target: LOG_TARGET,
1794 ?peer,
1795 ?error,
1796 "failed to open substream",
1797 );
1798 }
1799 }
1800 }
1801 NotificationCommand::CloseSubstream { peers } => {
1802 for peer in peers {
1803 self.on_close_substream(peer).await;
1804 }
1805 }
1806 NotificationCommand::ForceClose { peer } => {
1807 let _ = self.service.force_close(peer);
1808 }
1809 }
1810 },
1811 }
1812 }
1813
1814 /// Start [`NotificationProtocol`] event loop.
1815 pub(crate) async fn run(mut self) {
1816 tracing::debug!(target: LOG_TARGET, "starting notification event loop");
1817
1818 loop {
1819 self.next_event().await;
1820 }
1821 }
1822}