1use crate::{
22 addresses::PublicAddresses,
23 codec::ProtocolCodec,
24 crypto::ed25519::Keypair,
25 error::{AddressError, DialError, Error},
26 executor::Executor,
27 protocol::{InnerTransportEvent, TransportService},
28 transport::{
29 manager::{
30 address::{AddressRecord, AddressStore},
31 handle::InnerTransportManagerCommand,
32 types::{PeerContext, PeerState},
33 },
34 Endpoint, Transport, TransportEvent,
35 },
36 types::{protocol::ProtocolName, ConnectionId},
37 BandwidthSink, PeerId,
38};
39
40use futures::{Stream, StreamExt};
41use indexmap::IndexMap;
42use multiaddr::{Multiaddr, Protocol};
43use multihash::Multihash;
44use parking_lot::RwLock;
45use tokio::sync::mpsc::{channel, Receiver, Sender};
46
47use std::{
48 collections::{hash_map::Entry, HashMap, HashSet},
49 pin::Pin,
50 sync::{
51 atomic::{AtomicUsize, Ordering},
52 Arc,
53 },
54 task::{Context, Poll},
55 time::Duration,
56};
57
58pub use handle::{TransportHandle, TransportManagerHandle};
59pub use types::SupportedTransport;
60
61mod address;
62pub mod limits;
63mod types;
64
65pub(crate) mod handle;
66
67const LOG_TARGET: &str = "litep2p::transport-manager";
74
75const SCORE_CONNECT_SUCCESS: i32 = 100i32;
77
78const SCORE_CONNECT_FAILURE: i32 = -100i32;
80
81#[derive(Debug, Clone, Copy, Eq, PartialEq)]
83enum ConnectionEstablishedResult {
84 Accept,
86
87 Reject,
89}
90
91pub enum TransportManagerEvent {
93 ConnectionClosed {
95 peer: PeerId,
97
98 connection: ConnectionId,
100 },
101}
102
103#[derive(Debug, Clone)]
105pub struct ProtocolContext {
106 pub codec: ProtocolCodec,
108
109 pub tx: Sender<InnerTransportEvent>,
111
112 pub fallback_names: Vec<ProtocolName>,
114}
115
116impl ProtocolContext {
117 fn new(
119 codec: ProtocolCodec,
120 tx: Sender<InnerTransportEvent>,
121 fallback_names: Vec<ProtocolName>,
122 ) -> Self {
123 Self {
124 tx,
125 codec,
126 fallback_names,
127 }
128 }
129}
130
131struct TransportContext {
133 index: usize,
135
136 transports: IndexMap<SupportedTransport, Box<dyn Transport<Item = TransportEvent>>>,
138}
139
140impl TransportContext {
141 pub fn new() -> Self {
143 Self {
144 index: 0usize,
145 transports: IndexMap::new(),
146 }
147 }
148
149 pub fn keys(&self) -> impl Iterator<Item = &SupportedTransport> {
151 self.transports.keys()
152 }
153
154 pub fn get_mut(
156 &mut self,
157 key: &SupportedTransport,
158 ) -> Option<&mut Box<dyn Transport<Item = TransportEvent>>> {
159 self.transports.get_mut(key)
160 }
161
162 pub fn register_transport(
164 &mut self,
165 name: SupportedTransport,
166 transport: Box<dyn Transport<Item = TransportEvent>>,
167 ) {
168 assert!(self.transports.insert(name, transport).is_none());
169 }
170}
171
172impl Stream for TransportContext {
173 type Item = (SupportedTransport, TransportEvent);
174
175 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176 let len = match self.transports.len() {
177 0 => return Poll::Ready(None),
178 len => len,
179 };
180 let start_index = self.index;
181
182 loop {
183 let index = self.index % len;
184 self.index += 1;
185
186 let (key, stream) = self.transports.get_index_mut(index).expect("transport to exist");
187 match stream.poll_next_unpin(cx) {
188 Poll::Pending => {}
189 Poll::Ready(None) => return Poll::Ready(None),
190 Poll::Ready(Some(event)) => return Poll::Ready(Some((*key, event))),
191 }
192
193 if self.index == start_index + len {
194 break Poll::Pending;
195 }
196 }
197 }
198}
199
200pub struct TransportManager {
202 local_peer_id: PeerId,
204
205 keypair: Keypair,
207
208 bandwidth_sink: BandwidthSink,
210
211 max_parallel_dials: usize,
213
214 protocols: HashMap<ProtocolName, ProtocolContext>,
216
217 protocol_names: HashSet<ProtocolName>,
219
220 listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
222
223 public_addresses: PublicAddresses,
225
226 next_connection_id: Arc<AtomicUsize>,
228
229 next_substream_id: Arc<AtomicUsize>,
231
232 transports: TransportContext,
234
235 peers: Arc<RwLock<HashMap<PeerId, PeerContext>>>,
237
238 transport_manager_handle: TransportManagerHandle,
240
241 event_rx: Receiver<TransportManagerEvent>,
243
244 cmd_rx: Receiver<InnerTransportManagerCommand>,
246
247 event_tx: Sender<TransportManagerEvent>,
249
250 pending_connections: HashMap<ConnectionId, PeerId>,
252
253 connection_limits: limits::ConnectionLimits,
255
256 opening_errors: HashMap<ConnectionId, Vec<(Multiaddr, DialError)>>,
258}
259
260impl TransportManager {
261 pub fn new(
264 keypair: Keypair,
265 supported_transports: HashSet<SupportedTransport>,
266 bandwidth_sink: BandwidthSink,
267 max_parallel_dials: usize,
268 connection_limits_config: limits::ConnectionLimitsConfig,
269 ) -> (Self, TransportManagerHandle) {
270 let local_peer_id = PeerId::from_public_key(&keypair.public().into());
271 let peers = Arc::new(RwLock::new(HashMap::new()));
272 let (cmd_tx, cmd_rx) = channel(256);
273 let (event_tx, event_rx) = channel(256);
274 let listen_addresses = Arc::new(RwLock::new(HashSet::new()));
275 let public_addresses = PublicAddresses::new(local_peer_id);
276 let handle = TransportManagerHandle::new(
277 local_peer_id,
278 peers.clone(),
279 cmd_tx,
280 supported_transports,
281 listen_addresses.clone(),
282 public_addresses.clone(),
283 );
284
285 (
286 Self {
287 peers,
288 cmd_rx,
289 keypair,
290 event_tx,
291 event_rx,
292 local_peer_id,
293 bandwidth_sink,
294 listen_addresses,
295 public_addresses,
296 max_parallel_dials,
297 protocols: HashMap::new(),
298 transports: TransportContext::new(),
299 protocol_names: HashSet::new(),
300 transport_manager_handle: handle.clone(),
301 pending_connections: HashMap::new(),
302 next_substream_id: Arc::new(AtomicUsize::new(0usize)),
303 next_connection_id: Arc::new(AtomicUsize::new(0usize)),
304 connection_limits: limits::ConnectionLimits::new(connection_limits_config),
305 opening_errors: HashMap::new(),
306 },
307 handle,
308 )
309 }
310
311 pub fn protocols(&self) -> impl Iterator<Item = &ProtocolName> {
313 self.protocols.keys()
314 }
315
316 pub fn installed_transports(&self) -> impl Iterator<Item = &SupportedTransport> {
318 self.transports.keys()
319 }
320
321 fn next_connection_id(&mut self) -> ConnectionId {
323 let connection_id = self.next_connection_id.fetch_add(1usize, Ordering::Relaxed);
324
325 ConnectionId::from(connection_id)
326 }
327
328 pub fn register_protocol(
333 &mut self,
334 protocol: ProtocolName,
335 fallback_names: Vec<ProtocolName>,
336 codec: ProtocolCodec,
337 keep_alive_timeout: Duration,
338 ) -> TransportService {
339 assert!(!self.protocol_names.contains(&protocol));
340
341 for fallback in &fallback_names {
342 if self.protocol_names.contains(fallback) {
343 panic!("duplicate fallback protocol given: {fallback:?}");
344 }
345 }
346
347 let (service, sender) = TransportService::new(
348 self.local_peer_id,
349 protocol.clone(),
350 fallback_names.clone(),
351 self.next_substream_id.clone(),
352 self.transport_manager_handle.clone(),
353 keep_alive_timeout,
354 );
355
356 self.protocols.insert(
357 protocol.clone(),
358 ProtocolContext::new(codec, sender, fallback_names.clone()),
359 );
360 self.protocol_names.insert(protocol);
361 self.protocol_names.extend(fallback_names);
362
363 service
364 }
365
366 pub fn transport_handle(&self, executor: Arc<dyn Executor>) -> TransportHandle {
368 TransportHandle {
369 tx: self.event_tx.clone(),
370 executor,
371 keypair: self.keypair.clone(),
372 protocols: self.protocols.clone(),
373 bandwidth_sink: self.bandwidth_sink.clone(),
374 next_substream_id: self.next_substream_id.clone(),
375 next_connection_id: self.next_connection_id.clone(),
376 }
377 }
378
379 pub(crate) fn register_transport(
381 &mut self,
382 name: SupportedTransport,
383 transport: Box<dyn Transport<Item = TransportEvent>>,
384 ) {
385 tracing::debug!(target: LOG_TARGET, transport = ?name, "register transport");
386
387 self.transports.register_transport(name, transport);
388 self.transport_manager_handle.register_transport(name);
389 }
390
391 pub(crate) fn public_addresses(&self) -> PublicAddresses {
393 self.public_addresses.clone()
394 }
395
396 pub fn register_listen_address(&mut self, address: Multiaddr) {
398 assert!(!address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))));
399
400 let mut listen_addresses = self.listen_addresses.write();
401
402 listen_addresses.insert(address.clone());
403 listen_addresses.insert(address.with(Protocol::P2p(
404 Multihash::from_bytes(&self.local_peer_id.to_bytes()).unwrap(),
405 )));
406 }
407
408 pub fn add_known_address(
410 &mut self,
411 peer: PeerId,
412 address: impl Iterator<Item = Multiaddr>,
413 ) -> usize {
414 self.transport_manager_handle.add_known_address(&peer, address)
415 }
416
417 pub async fn dial(&mut self, peer: PeerId) -> crate::Result<()> {
421 let available_capacity = self.connection_limits.on_dial_address()?;
423 let limit = available_capacity.min(self.max_parallel_dials);
426
427 if peer == self.local_peer_id {
428 return Err(Error::TriedToDialSelf);
429 }
430 let mut peers = self.peers.write();
431
432 let PeerContext {
436 state,
437 secondary_connection,
438 mut addresses,
439 } = match peers.remove(&peer) {
440 None => return Err(Error::PeerDoesntExist(peer)),
441 Some(
442 context @ PeerContext {
443 state: PeerState::Connected { .. },
444 ..
445 },
446 ) => {
447 peers.insert(peer, context);
448 return Err(Error::AlreadyConnected);
449 }
450 Some(
451 context @ PeerContext {
452 state: PeerState::Dialing { .. } | PeerState::Opening { .. },
453 ..
454 },
455 ) => {
456 peers.insert(peer, context);
457 return Ok(());
458 }
459 Some(context) => context,
460 };
461
462 if let PeerState::Disconnected {
463 dial_record: Some(_),
464 } = &state
465 {
466 tracing::debug!(
467 target: LOG_TARGET,
468 ?peer,
469 "peer is already being dialed",
470 );
471
472 peers.insert(
473 peer,
474 PeerContext {
475 state,
476 secondary_connection,
477 addresses,
478 },
479 );
480
481 return Ok(());
482 }
483
484 let mut records: HashMap<_, _> = addresses
485 .take(limit)
486 .into_iter()
487 .map(|record| (record.address().clone(), record))
488 .collect();
489
490 if records.is_empty() {
491 return Err(Error::NoAddressAvailable(peer));
492 }
493
494 let locked_addresses = self.listen_addresses.read();
495 for record in records.values() {
496 if locked_addresses.contains(record.as_ref()) {
497 tracing::warn!(
498 target: LOG_TARGET,
499 ?peer,
500 ?record,
501 "tried to dial self",
502 );
503
504 debug_assert!(false);
505 return Err(Error::TriedToDialSelf);
506 }
507 }
508 drop(locked_addresses);
509
510 let connection_id =
512 ConnectionId::from(self.next_connection_id.fetch_add(1usize, Ordering::Relaxed));
513
514 tracing::debug!(
515 target: LOG_TARGET,
516 ?connection_id,
517 addresses = ?records,
518 "dial remote peer",
519 );
520
521 let mut transports = HashSet::new();
522 #[cfg(feature = "websocket")]
523 let mut websocket = Vec::new();
524 #[cfg(feature = "quic")]
525 let mut quic = Vec::new();
526 let mut tcp = Vec::new();
527
528 for (address, record) in &mut records {
529 record.set_connection_id(connection_id);
530
531 #[cfg(feature = "quic")]
532 if address.iter().any(|p| std::matches!(&p, Protocol::QuicV1)) {
533 quic.push(address.clone());
534 transports.insert(SupportedTransport::Quic);
535 continue;
536 }
537
538 #[cfg(feature = "websocket")]
539 if address.iter().any(|p| std::matches!(&p, Protocol::Ws(_) | Protocol::Wss(_))) {
540 websocket.push(address.clone());
541 transports.insert(SupportedTransport::WebSocket);
542 continue;
543 }
544
545 tcp.push(address.clone());
546 transports.insert(SupportedTransport::Tcp);
547 }
548
549 peers.insert(
550 peer,
551 PeerContext {
552 state: PeerState::Opening {
553 records,
554 connection_id,
555 transports,
556 },
557 secondary_connection,
558 addresses,
559 },
560 );
561
562 if !tcp.is_empty() {
563 self.transports
564 .get_mut(&SupportedTransport::Tcp)
565 .expect("transport to be supported")
566 .open(connection_id, tcp)?;
567 }
568
569 #[cfg(feature = "quic")]
570 if !quic.is_empty() {
571 self.transports
572 .get_mut(&SupportedTransport::Quic)
573 .expect("transport to be supported")
574 .open(connection_id, quic)?;
575 }
576
577 #[cfg(feature = "websocket")]
578 if !websocket.is_empty() {
579 self.transports
580 .get_mut(&SupportedTransport::WebSocket)
581 .expect("transport to be supported")
582 .open(connection_id, websocket)?;
583 }
584
585 self.pending_connections.insert(connection_id, peer);
586
587 Ok(())
588 }
589
590 pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
594 self.connection_limits.on_dial_address()?;
595
596 let mut record = AddressRecord::from_multiaddr(address)
597 .ok_or(Error::AddressError(AddressError::PeerIdMissing))?;
598
599 if self.listen_addresses.read().contains(record.as_ref()) {
600 return Err(Error::TriedToDialSelf);
601 }
602
603 tracing::debug!(target: LOG_TARGET, address = ?record.address(), "dial address");
604
605 let mut protocol_stack = record.as_ref().iter();
606 match protocol_stack
607 .next()
608 .ok_or_else(|| Error::TransportNotSupported(record.address().clone()))?
609 {
610 Protocol::Ip4(_) | Protocol::Ip6(_) => {}
611 Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) => {}
612 transport => {
613 tracing::error!(
614 target: LOG_TARGET,
615 ?transport,
616 "invalid transport, expected `ip4`/`ip6`"
617 );
618 return Err(Error::TransportNotSupported(record.address().clone()));
619 }
620 };
621
622 let supported_transport = match protocol_stack
623 .next()
624 .ok_or_else(|| Error::TransportNotSupported(record.address().clone()))?
625 {
626 Protocol::Tcp(_) => match protocol_stack.next() {
627 #[cfg(feature = "websocket")]
628 Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) => SupportedTransport::WebSocket,
629 Some(Protocol::P2p(_)) => SupportedTransport::Tcp,
630 _ => return Err(Error::TransportNotSupported(record.address().clone())),
631 },
632 #[cfg(feature = "quic")]
633 Protocol::Udp(_) => match protocol_stack
634 .next()
635 .ok_or_else(|| Error::TransportNotSupported(record.address().clone()))?
636 {
637 Protocol::QuicV1 => SupportedTransport::Quic,
638 _ => {
639 tracing::debug!(target: LOG_TARGET, address = ?record.address(), "expected `quic-v1`");
640 return Err(Error::TransportNotSupported(record.address().clone()));
641 }
642 },
643 protocol => {
644 tracing::error!(
645 target: LOG_TARGET,
646 ?protocol,
647 "invalid protocol"
648 );
649
650 return Err(Error::TransportNotSupported(record.address().clone()));
651 }
652 };
653
654 let remote_peer_id =
656 PeerId::try_from_multiaddr(record.address()).expect("`PeerId` to exist");
657
658 let connection_id = self.next_connection_id();
660 record.set_connection_id(connection_id);
661
662 {
663 let mut peers = self.peers.write();
664
665 match peers.entry(remote_peer_id) {
666 Entry::Occupied(occupied) => {
667 let context = occupied.into_mut();
668
669 tracing::debug!(
675 target: LOG_TARGET,
676 peer = ?remote_peer_id,
677 state = ?context.state,
678 "peer state exists",
679 );
680
681 match context.state {
682 PeerState::Connected { .. } => {
683 return Err(Error::AlreadyConnected);
684 }
685 PeerState::Dialing { .. } | PeerState::Opening { .. } => {
686 return Ok(());
687 }
688 PeerState::Disconnected {
689 dial_record: Some(_),
690 } => {
691 tracing::debug!(
692 target: LOG_TARGET,
693 peer = ?remote_peer_id,
694 state = ?context.state,
695 "peer is already being dialed from a disconnected state"
696 );
697 return Ok(());
698 }
699 PeerState::Disconnected { dial_record: None } => {
700 context.state = PeerState::Dialing {
701 record: record.clone(),
702 };
703 }
704 }
705 }
706 Entry::Vacant(vacant) => {
707 vacant.insert(PeerContext {
708 state: PeerState::Dialing {
709 record: record.clone(),
710 },
711 addresses: AddressStore::new(),
712 secondary_connection: None,
713 });
714 }
715 };
716 }
717
718 self.transports
719 .get_mut(&supported_transport)
720 .ok_or(Error::TransportNotSupported(record.address().clone()))?
721 .dial(connection_id, record.address().clone())?;
722 self.pending_connections.insert(connection_id, remote_peer_id);
723
724 Ok(())
725 }
726
727 fn on_dial_failure(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
729 let peer = self.pending_connections.remove(&connection_id).ok_or_else(|| {
730 tracing::error!(
731 target: LOG_TARGET,
732 ?connection_id,
733 "dial failed for a connection that doesn't exist",
734 );
735 Error::InvalidState
736 })?;
737
738 let mut peers = self.peers.write();
739 let context = peers.get_mut(&peer).ok_or_else(|| {
740 tracing::error!(
741 target: LOG_TARGET,
742 ?peer,
743 ?connection_id,
744 "dial failed for a peer that doesn't exist",
745 );
746 debug_assert!(false);
747
748 Error::InvalidState
749 })?;
750
751 match std::mem::replace(
752 &mut context.state,
753 PeerState::Disconnected { dial_record: None },
754 ) {
755 PeerState::Dialing { ref mut record } => {
756 debug_assert_eq!(record.connection_id(), &Some(connection_id));
757 if record.connection_id() != &Some(connection_id) {
758 tracing::warn!(
759 target: LOG_TARGET,
760 ?peer,
761 ?connection_id,
762 ?record,
763 "unknown dial failure for a dialing peer",
764 );
765
766 context.state = PeerState::Dialing {
767 record: record.clone(),
768 };
769 debug_assert!(false);
770 return Ok(());
771 }
772
773 record.update_score(SCORE_CONNECT_FAILURE);
774 context.addresses.insert(record.clone());
775
776 context.state = PeerState::Disconnected { dial_record: None };
777 Ok(())
778 }
779 PeerState::Opening { .. } => {
780 todo!();
781 }
782 PeerState::Connected {
783 record,
784 dial_record: Some(mut dial_record),
785 } => {
786 if dial_record.connection_id() != &Some(connection_id) {
787 tracing::warn!(
788 target: LOG_TARGET,
789 ?peer,
790 ?connection_id,
791 ?record,
792 "unknown dial failure for a connected peer",
793 );
794
795 context.state = PeerState::Connected {
796 record,
797 dial_record: Some(dial_record),
798 };
799 debug_assert!(false);
800 return Ok(());
801 }
802
803 dial_record.update_score(SCORE_CONNECT_FAILURE);
804 context.addresses.insert(dial_record);
805
806 context.state = PeerState::Connected {
807 record,
808 dial_record: None,
809 };
810 Ok(())
811 }
812 PeerState::Disconnected {
813 dial_record: Some(mut dial_record),
814 } => {
815 tracing::debug!(
816 target: LOG_TARGET,
817 ?connection_id,
818 ?dial_record,
819 "dial failed for a disconnected peer",
820 );
821
822 if dial_record.connection_id() != &Some(connection_id) {
823 tracing::warn!(
824 target: LOG_TARGET,
825 ?peer,
826 ?connection_id,
827 ?dial_record,
828 "unknown dial failure for a disconnected peer",
829 );
830
831 context.state = PeerState::Disconnected {
832 dial_record: Some(dial_record),
833 };
834 debug_assert!(false);
835 return Ok(());
836 }
837
838 dial_record.update_score(SCORE_CONNECT_FAILURE);
839 context.addresses.insert(dial_record);
840
841 Ok(())
842 }
843 state => {
844 tracing::warn!(
845 target: LOG_TARGET,
846 ?peer,
847 ?connection_id,
848 ?state,
849 "invalid state for dial failure",
850 );
851 context.state = state;
852
853 debug_assert!(false);
854 Ok(())
855 }
856 }
857 }
858
859 fn on_pending_incoming_connection(&mut self) -> crate::Result<()> {
860 self.connection_limits.on_incoming()?;
861 Ok(())
862 }
863
864 fn on_connection_closed(
866 &mut self,
867 peer: PeerId,
868 connection_id: ConnectionId,
869 ) -> crate::Result<Option<TransportEvent>> {
870 self.connection_limits.on_connection_closed(connection_id);
871
872 let mut peers = self.peers.write();
873 let Some(context) = peers.get_mut(&peer) else {
874 tracing::warn!(
875 target: LOG_TARGET,
876 ?peer,
877 ?connection_id,
878 "cannot handle closed connection: peer doesn't exist",
879 );
880 debug_assert!(false);
881 return Err(Error::PeerDoesntExist(peer));
882 };
883
884 tracing::trace!(
885 target: LOG_TARGET,
886 ?peer,
887 ?connection_id,
888 "connection closed",
889 );
890
891 match std::mem::replace(
892 &mut context.state,
893 PeerState::Disconnected { dial_record: None },
894 ) {
895 PeerState::Connected {
896 record,
897 dial_record: actual_dial_record,
898 } => match record.connection_id() == &Some(connection_id) {
899 true => match context.secondary_connection.take() {
905 None => {
906 context.addresses.insert(record);
907 context.state = PeerState::Disconnected {
908 dial_record: actual_dial_record,
909 };
910
911 Ok(Some(TransportEvent::ConnectionClosed {
912 peer,
913 connection_id,
914 }))
915 }
916 Some(secondary_connection) => {
917 context.addresses.insert(record);
918 context.state = PeerState::Connected {
919 record: secondary_connection,
920 dial_record: actual_dial_record,
921 };
922
923 Ok(None)
924 }
925 },
926 false => match context.secondary_connection.take() {
928 Some(secondary_connection) => {
929 if secondary_connection.connection_id() != &Some(connection_id) {
930 tracing::debug!(
931 target: LOG_TARGET,
932 ?peer,
933 ?connection_id,
934 "unknown connection was closed, potentially ignored tertiary connection",
935 );
936
937 context.secondary_connection = Some(secondary_connection);
938 context.state = PeerState::Connected {
939 record,
940 dial_record: actual_dial_record,
941 };
942
943 return Ok(None);
944 }
945
946 tracing::trace!(
947 target: LOG_TARGET,
948 ?peer,
949 ?connection_id,
950 "secondary connection closed",
951 );
952
953 context.addresses.insert(secondary_connection);
954 context.state = PeerState::Connected {
955 record,
956 dial_record: actual_dial_record,
957 };
958 Ok(None)
959 }
960 None => {
961 tracing::warn!(
962 target: LOG_TARGET,
963 ?peer,
964 ?connection_id,
965 "non-primary connection was closed but secondary connection doesn't exist",
966 );
967
968 debug_assert!(false);
969 Err(Error::InvalidState)
970 }
971 },
972 },
973 PeerState::Disconnected { dial_record } => match context.secondary_connection.take() {
974 Some(record) => {
975 tracing::warn!(
976 target: LOG_TARGET,
977 ?peer,
978 ?connection_id,
979 ?record,
980 ?dial_record,
981 "peer is disconnected but secondary connection exists",
982 );
983
984 debug_assert!(false);
985 context.state = PeerState::Disconnected { dial_record };
986 Err(Error::InvalidState)
987 }
988 None => {
989 context.state = PeerState::Disconnected { dial_record };
990
991 Ok(Some(TransportEvent::ConnectionClosed {
992 peer,
993 connection_id,
994 }))
995 }
996 },
997 state => {
998 tracing::warn!(target: LOG_TARGET, ?peer, ?connection_id, ?state, "invalid state for a closed connection");
999 debug_assert!(false);
1000 Err(Error::InvalidState)
1001 }
1002 }
1003 }
1004
1005 fn on_connection_established(
1006 &mut self,
1007 peer: PeerId,
1008 endpoint: &Endpoint,
1009 ) -> crate::Result<ConnectionEstablishedResult> {
1010 if let Some(dialed_peer) = self.pending_connections.remove(&endpoint.connection_id()) {
1011 if dialed_peer != peer {
1012 tracing::warn!(
1013 target: LOG_TARGET,
1014 ?dialed_peer,
1015 ?peer,
1016 ?endpoint,
1017 "peer ids do not match but transport was supposed to reject connection"
1018 );
1019 debug_assert!(false);
1020 return Err(Error::InvalidState);
1021 }
1022 };
1023
1024 if let Err(error) = self
1026 .connection_limits
1027 .on_connection_established(endpoint.connection_id(), endpoint.is_listener())
1028 {
1029 tracing::debug!(
1030 target: LOG_TARGET,
1031 ?peer,
1032 ?endpoint,
1033 ?error,
1034 "connection limit exceeded, rejecting connection",
1035 );
1036 return Ok(ConnectionEstablishedResult::Reject);
1037 }
1038
1039 let mut peers = self.peers.write();
1040 match peers.get_mut(&peer) {
1041 Some(context) => match context.state {
1042 PeerState::Connected {
1043 ref mut dial_record,
1044 ..
1045 } => match context.secondary_connection {
1046 Some(_) => {
1047 tracing::debug!(
1048 target: LOG_TARGET,
1049 ?peer,
1050 connection_id = ?endpoint.connection_id(),
1051 ?endpoint,
1052 "secondary connection already exists, ignoring connection",
1053 );
1054
1055 if endpoint.is_listener() {
1060 context.addresses.insert(AddressRecord::new(
1061 &peer,
1062 endpoint.address().clone(),
1063 SCORE_CONNECT_SUCCESS,
1064 None,
1065 ))
1066 }
1067
1068 return Ok(ConnectionEstablishedResult::Reject);
1069 }
1070 None => match dial_record.take() {
1071 Some(record)
1072 if record.connection_id() == &Some(endpoint.connection_id()) =>
1073 {
1074 tracing::debug!(
1075 target: LOG_TARGET,
1076 ?peer,
1077 connection_id = ?endpoint.connection_id(),
1078 address = ?endpoint.address(),
1079 "dialed connection opened as secondary connection",
1080 );
1081
1082 context.secondary_connection = Some(AddressRecord::new(
1083 &peer,
1084 endpoint.address().clone(),
1085 SCORE_CONNECT_SUCCESS,
1086 Some(endpoint.connection_id()),
1087 ));
1088 }
1089 None => {
1090 tracing::debug!(
1091 target: LOG_TARGET,
1092 ?peer,
1093 connection_id = ?endpoint.connection_id(),
1094 address = ?endpoint.address(),
1095 "secondary connection",
1096 );
1097
1098 context.secondary_connection = Some(AddressRecord::new(
1099 &peer,
1100 endpoint.address().clone(),
1101 SCORE_CONNECT_SUCCESS,
1102 Some(endpoint.connection_id()),
1103 ));
1104 }
1105 Some(record) => {
1106 tracing::warn!(
1107 target: LOG_TARGET,
1108 ?peer,
1109 connection_id = ?endpoint.connection_id(),
1110 address = ?endpoint.address(),
1111 dial_record = ?record,
1112 "unknown connection opened as secondary connection, discarding",
1113 );
1114
1115 *dial_record = Some(record);
1117
1118 return Ok(ConnectionEstablishedResult::Reject);
1119 }
1120 },
1121 },
1122 PeerState::Dialing { ref record, .. } => {
1123 match record.connection_id() == &Some(endpoint.connection_id()) {
1124 true => {
1125 tracing::trace!(
1126 target: LOG_TARGET,
1127 ?peer,
1128 connection_id = ?endpoint.connection_id(),
1129 ?endpoint,
1130 ?record,
1131 "connection opened to remote",
1132 );
1133
1134 context.state = PeerState::Connected {
1135 record: record.clone(),
1136 dial_record: None,
1137 };
1138 }
1139 false => {
1140 tracing::trace!(
1141 target: LOG_TARGET,
1142 ?peer,
1143 connection_id = ?endpoint.connection_id(),
1144 ?endpoint,
1145 "connection opened by remote while local node was dialing",
1146 );
1147
1148 context.state = PeerState::Connected {
1149 record: AddressRecord::new(
1150 &peer,
1151 endpoint.address().clone(),
1152 SCORE_CONNECT_SUCCESS,
1153 Some(endpoint.connection_id()),
1154 ),
1155 dial_record: Some(record.clone()),
1156 };
1157 }
1158 }
1159 }
1160 PeerState::Opening {
1161 ref mut records,
1162 connection_id,
1163 ref transports,
1164 } => {
1165 debug_assert!(std::matches!(endpoint, &Endpoint::Listener { .. }));
1166
1167 tracing::trace!(
1168 target: LOG_TARGET,
1169 ?peer,
1170 dial_connection_id = ?connection_id,
1171 dial_records = ?records,
1172 dial_transports = ?transports,
1173 listener_endpoint = ?endpoint,
1174 "inbound connection while opening an outbound connection",
1175 );
1176
1177 transports.iter().for_each(|transport| {
1179 self.transports
1180 .get_mut(transport)
1181 .expect("transport to exist")
1182 .cancel(connection_id);
1183 });
1184
1185 self.pending_connections.remove(
1191 &records
1192 .iter()
1193 .next()
1194 .expect("record to exist")
1195 .1
1196 .connection_id()
1197 .expect("`ConnectionId` to exist"),
1198 );
1199
1200 let record = match records.remove(endpoint.address()) {
1201 Some(mut record) => {
1202 record.update_score(SCORE_CONNECT_SUCCESS);
1203 record.set_connection_id(endpoint.connection_id());
1204 record
1205 }
1206 None => AddressRecord::new(
1207 &peer,
1208 endpoint.address().clone(),
1209 SCORE_CONNECT_SUCCESS,
1210 Some(endpoint.connection_id()),
1211 ),
1212 };
1213 context.addresses.extend(records.iter().map(|(_, record)| record));
1214
1215 context.state = PeerState::Connected {
1216 record,
1217 dial_record: None,
1218 };
1219 }
1220 PeerState::Disconnected {
1221 ref mut dial_record,
1222 } => {
1223 tracing::trace!(
1224 target: LOG_TARGET,
1225 ?peer,
1226 connection_id = ?endpoint.connection_id(),
1227 ?endpoint,
1228 ?dial_record,
1229 "connection opened by remote or delayed dial succeeded",
1230 );
1231
1232 let (record, dial_record) = match dial_record.take() {
1233 Some(mut dial_record) =>
1234 if dial_record.address() == endpoint.address() {
1235 dial_record.set_connection_id(endpoint.connection_id());
1236 (dial_record, None)
1237 } else {
1238 (
1239 AddressRecord::new(
1240 &peer,
1241 endpoint.address().clone(),
1242 SCORE_CONNECT_SUCCESS,
1243 Some(endpoint.connection_id()),
1244 ),
1245 Some(dial_record),
1246 )
1247 },
1248 None => (
1249 AddressRecord::new(
1250 &peer,
1251 endpoint.address().clone(),
1252 SCORE_CONNECT_SUCCESS,
1253 Some(endpoint.connection_id()),
1254 ),
1255 None,
1256 ),
1257 };
1258
1259 context.state = PeerState::Connected {
1260 record,
1261 dial_record,
1262 };
1263 }
1264 },
1265 None => {
1266 peers.insert(
1267 peer,
1268 PeerContext {
1269 state: PeerState::Connected {
1270 record: AddressRecord::new(
1271 &peer,
1272 endpoint.address().clone(),
1273 SCORE_CONNECT_SUCCESS,
1274 Some(endpoint.connection_id()),
1275 ),
1276 dial_record: None,
1277 },
1278 addresses: AddressStore::new(),
1279 secondary_connection: None,
1280 },
1281 );
1282 }
1283 }
1284
1285 Ok(ConnectionEstablishedResult::Accept)
1286 }
1287
1288 fn on_connection_opened(
1289 &mut self,
1290 transport: SupportedTransport,
1291 connection_id: ConnectionId,
1292 address: Multiaddr,
1293 ) -> crate::Result<()> {
1294 let Some(peer) = self.pending_connections.remove(&connection_id) else {
1295 tracing::warn!(
1296 target: LOG_TARGET,
1297 ?connection_id,
1298 ?transport,
1299 ?address,
1300 "connection opened but dial record doesn't exist",
1301 );
1302
1303 debug_assert!(false);
1304 return Err(Error::InvalidState);
1305 };
1306
1307 let mut peers = self.peers.write();
1308 let context = peers.get_mut(&peer).ok_or_else(|| {
1309 tracing::warn!(
1310 target: LOG_TARGET,
1311 ?peer,
1312 ?connection_id,
1313 "connection opened but peer doesn't exist",
1314 );
1315
1316 debug_assert!(false);
1317 Error::InvalidState
1318 })?;
1319
1320 match std::mem::replace(
1321 &mut context.state,
1322 PeerState::Disconnected { dial_record: None },
1323 ) {
1324 PeerState::Opening {
1325 mut records,
1326 connection_id,
1327 transports,
1328 } => {
1329 tracing::trace!(
1330 target: LOG_TARGET,
1331 ?peer,
1332 ?connection_id,
1333 ?address,
1334 ?transport,
1335 "connection opened to peer",
1336 );
1337
1338 for transport in transports.iter() {
1340 self.transports
1341 .get_mut(transport)
1342 .expect("transport to exist")
1343 .cancel(connection_id);
1344 }
1345
1346 let mut dial_record = records.remove(&address).expect("address to exist");
1352 dial_record.update_score(SCORE_CONNECT_SUCCESS);
1353
1354 match self
1356 .transports
1357 .get_mut(&transport)
1358 .expect("transport to exist")
1359 .negotiate(connection_id)
1360 {
1361 Ok(()) => {
1362 tracing::trace!(
1363 target: LOG_TARGET,
1364 ?peer,
1365 ?connection_id,
1366 ?dial_record,
1367 ?transport,
1368 "negotiation started"
1369 );
1370
1371 self.pending_connections.insert(connection_id, peer);
1372
1373 context.state = PeerState::Dialing {
1374 record: dial_record,
1375 };
1376
1377 for (_, record) in records {
1378 context.addresses.insert(record);
1379 }
1380
1381 Ok(())
1382 }
1383 Err(error) => {
1384 tracing::warn!(
1385 target: LOG_TARGET,
1386 ?peer,
1387 ?connection_id,
1388 ?error,
1389 "failed to negotiate connection",
1390 );
1391 context.state = PeerState::Disconnected { dial_record: None };
1392
1393 debug_assert!(false);
1394 Err(Error::InvalidState)
1395 }
1396 }
1397 }
1398 state => {
1399 tracing::warn!(
1400 target: LOG_TARGET,
1401 ?peer,
1402 ?connection_id,
1403 ?state,
1404 "connection opened but `PeerState` is not `Opening`",
1405 );
1406 context.state = state;
1407
1408 debug_assert!(false);
1409 Err(Error::InvalidState)
1410 }
1411 }
1412 }
1413
1414 fn on_open_failure(
1416 &mut self,
1417 transport: SupportedTransport,
1418 connection_id: ConnectionId,
1419 ) -> crate::Result<Option<PeerId>> {
1420 let Some(peer) = self.pending_connections.remove(&connection_id) else {
1421 tracing::warn!(
1422 target: LOG_TARGET,
1423 ?connection_id,
1424 "open failure but dial record doesn't exist",
1425 );
1426 return Err(Error::InvalidState);
1427 };
1428
1429 let mut peers = self.peers.write();
1430 let context = peers.get_mut(&peer).ok_or_else(|| {
1431 tracing::warn!(
1432 target: LOG_TARGET,
1433 ?peer,
1434 ?connection_id,
1435 "open failure but peer doesn't exist",
1436 );
1437
1438 debug_assert!(false);
1439 Error::InvalidState
1440 })?;
1441
1442 match std::mem::replace(
1443 &mut context.state,
1444 PeerState::Disconnected { dial_record: None },
1445 ) {
1446 PeerState::Opening {
1447 records,
1448 connection_id,
1449 mut transports,
1450 } => {
1451 tracing::trace!(
1452 target: LOG_TARGET,
1453 ?peer,
1454 ?connection_id,
1455 ?transport,
1456 "open failure for peer",
1457 );
1458 transports.remove(&transport);
1459
1460 if transports.is_empty() {
1461 for (_, mut record) in records {
1462 record.update_score(SCORE_CONNECT_FAILURE);
1463 context.addresses.insert(record);
1464 }
1465
1466 tracing::trace!(
1467 target: LOG_TARGET,
1468 ?peer,
1469 ?connection_id,
1470 "open failure for last transport",
1471 );
1472
1473 return Ok(Some(peer));
1474 }
1475
1476 self.pending_connections.insert(connection_id, peer);
1477 context.state = PeerState::Opening {
1478 records,
1479 connection_id,
1480 transports,
1481 };
1482
1483 Ok(None)
1484 }
1485 state => {
1486 tracing::warn!(
1487 target: LOG_TARGET,
1488 ?peer,
1489 ?connection_id,
1490 ?state,
1491 "open failure but `PeerState` is not `Opening`",
1492 );
1493 context.state = state;
1494
1495 debug_assert!(false);
1496 Err(Error::InvalidState)
1497 }
1498 }
1499 }
1500
1501 pub async fn next(&mut self) -> Option<TransportEvent> {
1503 loop {
1504 tokio::select! {
1505 event = self.event_rx.recv() => match event? {
1506 TransportManagerEvent::ConnectionClosed {
1507 peer,
1508 connection: connection_id,
1509 } => match self.on_connection_closed(peer, connection_id) {
1510 Ok(None) => {}
1511 Ok(Some(event)) => return Some(event),
1512 Err(error) => tracing::error!(
1513 target: LOG_TARGET,
1514 ?error,
1515 "failed to handle closed connection",
1516 ),
1517 }
1518 },
1519 command = self.cmd_rx.recv() => match command? {
1520 InnerTransportManagerCommand::DialPeer { peer } => {
1521 if let Err(error) = self.dial(peer).await {
1522 tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer")
1523 }
1524 }
1525 InnerTransportManagerCommand::DialAddress { address } => {
1526 if let Err(error) = self.dial_address(address).await {
1527 tracing::debug!(target: LOG_TARGET, ?error, "failed to dial peer")
1528 }
1529 }
1530 },
1531 event = self.transports.next() => {
1532 let (transport, event) = event?;
1533
1534 match event {
1535 TransportEvent::DialFailure { connection_id, address, error } => {
1536 tracing::debug!(
1537 target: LOG_TARGET,
1538 ?connection_id,
1539 ?address,
1540 ?error,
1541 "failed to dial peer",
1542 );
1543
1544 if let Ok(()) = self.on_dial_failure(connection_id) {
1545 match address.iter().last() {
1546 Some(Protocol::P2p(hash)) => match PeerId::from_multihash(hash) {
1547 Ok(peer) => {
1548 tracing::trace!(
1549 target: LOG_TARGET,
1550 ?connection_id,
1551 ?error,
1552 ?address,
1553 num_protocols = self.protocols.len(),
1554 "dial failure, notify protocols",
1555 );
1556
1557 for (protocol, context) in &self.protocols {
1558 tracing::trace!(
1559 target: LOG_TARGET,
1560 ?connection_id,
1561 ?error,
1562 ?address,
1563 ?protocol,
1564 "dial failure, notify protocol",
1565 );
1566 match context.tx.try_send(InnerTransportEvent::DialFailure {
1567 peer,
1568 address: address.clone(),
1569 }) {
1570 Ok(()) => {}
1571 Err(_) => {
1572 tracing::trace!(
1573 target: LOG_TARGET,
1574 ?connection_id,
1575 ?error,
1576 ?address,
1577 ?protocol,
1578 "dial failure, channel to protocol clogged, use await",
1579 );
1580 let _ = context
1581 .tx
1582 .send(InnerTransportEvent::DialFailure {
1583 peer,
1584 address: address.clone(),
1585 })
1586 .await;
1587 }
1588 }
1589 }
1590
1591 tracing::trace!(
1592 target: LOG_TARGET,
1593 ?connection_id,
1594 ?error,
1595 ?address,
1596 "all protocols notified",
1597 );
1598 }
1599 Err(error) => {
1600 tracing::warn!(
1601 target: LOG_TARGET,
1602 ?address,
1603 ?connection_id,
1604 ?error,
1605 "failed to parse `PeerId` from `Multiaddr`",
1606 );
1607 debug_assert!(false);
1608 }
1609 },
1610 _ => {
1611 tracing::warn!(target: LOG_TARGET, ?address, ?connection_id, "address doesn't contain `PeerId`");
1612 debug_assert!(false);
1613 }
1614 }
1615
1616 return Some(TransportEvent::DialFailure {
1617 connection_id,
1618 address,
1619 error,
1620 })
1621 }
1622 }
1623 TransportEvent::ConnectionEstablished { peer, endpoint } => {
1624 self.opening_errors.remove(&endpoint.connection_id());
1625 match self.on_connection_established(peer, &endpoint) {
1626 Err(error) => {
1627 tracing::debug!(
1628 target: LOG_TARGET,
1629 ?peer,
1630 ?endpoint,
1631 ?error,
1632 "failed to handle established connection",
1633 );
1634
1635 let _ = self
1636 .transports
1637 .get_mut(&transport)
1638 .expect("transport to exist")
1639 .reject(endpoint.connection_id());
1640 }
1641 Ok(ConnectionEstablishedResult::Accept) => {
1642 tracing::trace!(
1643 target: LOG_TARGET,
1644 ?peer,
1645 ?endpoint,
1646 "accept connection",
1647 );
1648
1649 let _ = self
1650 .transports
1651 .get_mut(&transport)
1652 .expect("transport to exist")
1653 .accept(endpoint.connection_id());
1654
1655 return Some(TransportEvent::ConnectionEstablished {
1656 peer,
1657 endpoint,
1658 });
1659 }
1660 Ok(ConnectionEstablishedResult::Reject) => {
1661 tracing::trace!(
1662 target: LOG_TARGET,
1663 ?peer,
1664 ?endpoint,
1665 "reject connection",
1666 );
1667
1668 let _ = self
1669 .transports
1670 .get_mut(&transport)
1671 .expect("transport to exist")
1672 .reject(endpoint.connection_id());
1673 }
1674 }
1675 }
1676 TransportEvent::ConnectionOpened { connection_id, address } => {
1677 self.opening_errors.remove(&connection_id);
1678
1679 if let Err(error) = self.on_connection_opened(transport, connection_id, address) {
1680 tracing::debug!(
1681 target: LOG_TARGET,
1682 ?connection_id,
1683 ?error,
1684 "failed to handle opened connection",
1685 );
1686 }
1687 }
1688 TransportEvent::OpenFailure { connection_id, errors } => {
1689 match self.on_open_failure(transport, connection_id) {
1690 Err(error) => tracing::debug!(
1691 target: LOG_TARGET,
1692 ?connection_id,
1693 ?error,
1694 "failed to handle opened connection",
1695 ),
1696 Ok(Some(peer)) => {
1697 tracing::trace!(
1698 target: LOG_TARGET,
1699 ?peer,
1700 ?connection_id,
1701 num_protocols = self.protocols.len(),
1702 "inform protocols about open failure",
1703 );
1704
1705 for (protocol, context) in &self.protocols {
1706 let _ = match context
1707 .tx
1708 .try_send(InnerTransportEvent::DialFailure {
1709 peer,
1710 address: Multiaddr::empty(),
1711 }) {
1712 Ok(_) => Ok(()),
1713 Err(_) => {
1714 tracing::trace!(
1715 target: LOG_TARGET,
1716 ?peer,
1717 %protocol,
1718 ?connection_id,
1719 "call to protocol would, block try sending in a blocking way",
1720 );
1721
1722 context
1723 .tx
1724 .send(InnerTransportEvent::DialFailure {
1725 peer,
1726 address: Multiaddr::empty(),
1727 })
1728 .await
1729 }
1730 };
1731 }
1732
1733 let mut grouped_errors = self.opening_errors.remove(&connection_id).unwrap_or_default();
1734 grouped_errors.extend(errors);
1735 return Some(TransportEvent::OpenFailure { connection_id, errors: grouped_errors });
1736 }
1737 Ok(None) => {
1738 tracing::trace!(
1739 target: LOG_TARGET,
1740 ?connection_id,
1741 "open failure, but not the last transport",
1742 );
1743
1744 self.opening_errors.entry(connection_id).or_default().extend(errors);
1745 }
1746 }
1747 },
1748 TransportEvent::PendingInboundConnection { connection_id } => {
1749 if self.on_pending_incoming_connection().is_ok() {
1750 tracing::trace!(
1751 target: LOG_TARGET,
1752 ?connection_id,
1753 "accept pending incoming connection",
1754 );
1755
1756 let _ = self
1757 .transports
1758 .get_mut(&transport)
1759 .expect("transport to exist")
1760 .accept_pending(connection_id);
1761 } else {
1762 tracing::debug!(
1763 target: LOG_TARGET,
1764 ?connection_id,
1765 "reject pending incoming connection",
1766 );
1767
1768 let _ = self
1769 .transports
1770 .get_mut(&transport)
1771 .expect("transport to exist")
1772 .reject_pending(connection_id);
1773 }
1774 },
1775 event => panic!("event not supported: {event:?}"),
1776 }
1777 },
1778 }
1779 }
1780 }
1781}
1782
1783#[cfg(test)]
1784mod tests {
1785 use limits::ConnectionLimitsConfig;
1786
1787 use multihash::Multihash;
1788
1789 use super::*;
1790 use crate::{
1791 crypto::ed25519::Keypair,
1792 executor::DefaultExecutor,
1793 transport::{dummy::DummyTransport, KEEP_ALIVE_TIMEOUT},
1794 };
1795 #[cfg(feature = "websocket")]
1796 use std::borrow::Cow;
1797 use std::{
1798 net::{Ipv4Addr, Ipv6Addr},
1799 sync::Arc,
1800 };
1801
1802 fn setup_dial_addr(peer: PeerId, connection_id: u16) -> (Multiaddr, ConnectionId) {
1804 let dial_address = Multiaddr::empty()
1805 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
1806 .with(Protocol::Tcp(8888 + connection_id))
1807 .with(Protocol::P2p(
1808 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
1809 ));
1810 let connection_id = ConnectionId::from(connection_id as usize);
1811
1812 (dial_address, connection_id)
1813 }
1814
1815 #[test]
1816 #[should_panic]
1817 #[cfg(debug_assertions)]
1818 fn duplicate_protocol() {
1819 let sink = BandwidthSink::new();
1820 let (mut manager, _handle) = TransportManager::new(
1821 Keypair::generate(),
1822 HashSet::new(),
1823 sink,
1824 8usize,
1825 ConnectionLimitsConfig::default(),
1826 );
1827
1828 manager.register_protocol(
1829 ProtocolName::from("/notif/1"),
1830 Vec::new(),
1831 ProtocolCodec::UnsignedVarint(None),
1832 KEEP_ALIVE_TIMEOUT,
1833 );
1834 manager.register_protocol(
1835 ProtocolName::from("/notif/1"),
1836 Vec::new(),
1837 ProtocolCodec::UnsignedVarint(None),
1838 KEEP_ALIVE_TIMEOUT,
1839 );
1840 }
1841
1842 #[test]
1843 #[should_panic]
1844 #[cfg(debug_assertions)]
1845 fn fallback_protocol_as_duplicate_main_protocol() {
1846 let sink = BandwidthSink::new();
1847 let (mut manager, _handle) = TransportManager::new(
1848 Keypair::generate(),
1849 HashSet::new(),
1850 sink,
1851 8usize,
1852 ConnectionLimitsConfig::default(),
1853 );
1854
1855 manager.register_protocol(
1856 ProtocolName::from("/notif/1"),
1857 Vec::new(),
1858 ProtocolCodec::UnsignedVarint(None),
1859 KEEP_ALIVE_TIMEOUT,
1860 );
1861 manager.register_protocol(
1862 ProtocolName::from("/notif/2"),
1863 vec![
1864 ProtocolName::from("/notif/2/new"),
1865 ProtocolName::from("/notif/1"),
1866 ],
1867 ProtocolCodec::UnsignedVarint(None),
1868 KEEP_ALIVE_TIMEOUT,
1869 );
1870 }
1871
1872 #[test]
1873 #[should_panic]
1874 #[cfg(debug_assertions)]
1875 fn duplicate_fallback_protocol() {
1876 let sink = BandwidthSink::new();
1877 let (mut manager, _handle) = TransportManager::new(
1878 Keypair::generate(),
1879 HashSet::new(),
1880 sink,
1881 8usize,
1882 ConnectionLimitsConfig::default(),
1883 );
1884
1885 manager.register_protocol(
1886 ProtocolName::from("/notif/1"),
1887 vec![
1888 ProtocolName::from("/notif/1/new"),
1889 ProtocolName::from("/notif/1"),
1890 ],
1891 ProtocolCodec::UnsignedVarint(None),
1892 KEEP_ALIVE_TIMEOUT,
1893 );
1894 manager.register_protocol(
1895 ProtocolName::from("/notif/2"),
1896 vec![
1897 ProtocolName::from("/notif/2/new"),
1898 ProtocolName::from("/notif/1/new"),
1899 ],
1900 ProtocolCodec::UnsignedVarint(None),
1901 KEEP_ALIVE_TIMEOUT,
1902 );
1903 }
1904
1905 #[test]
1906 #[should_panic]
1907 #[cfg(debug_assertions)]
1908 fn duplicate_transport() {
1909 let sink = BandwidthSink::new();
1910 let (mut manager, _handle) = TransportManager::new(
1911 Keypair::generate(),
1912 HashSet::new(),
1913 sink,
1914 8usize,
1915 ConnectionLimitsConfig::default(),
1916 );
1917
1918 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
1919 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
1920 }
1921
1922 #[tokio::test]
1923 async fn tried_to_self_using_peer_id() {
1924 let keypair = Keypair::generate();
1925 let local_peer_id = PeerId::from_public_key(&keypair.public().into());
1926 let sink = BandwidthSink::new();
1927 let (mut manager, _handle) = TransportManager::new(
1928 keypair,
1929 HashSet::new(),
1930 sink,
1931 8usize,
1932 ConnectionLimitsConfig::default(),
1933 );
1934
1935 assert!(manager.dial(local_peer_id).await.is_err());
1936 }
1937
1938 #[tokio::test]
1939 async fn try_to_dial_over_disabled_transport() {
1940 let (mut manager, _handle) = TransportManager::new(
1941 Keypair::generate(),
1942 HashSet::new(),
1943 BandwidthSink::new(),
1944 8usize,
1945 ConnectionLimitsConfig::default(),
1946 );
1947 let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
1948 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
1949
1950 let address = Multiaddr::empty()
1951 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
1952 .with(Protocol::Udp(8888))
1953 .with(Protocol::QuicV1)
1954 .with(Protocol::P2p(
1955 Multihash::from_bytes(&PeerId::random().to_bytes()).unwrap(),
1956 ));
1957
1958 assert!(std::matches!(
1959 manager.dial_address(address).await,
1960 Err(Error::TransportNotSupported(_))
1961 ));
1962 }
1963
1964 #[tokio::test]
1965 async fn successful_dial_reported_to_transport_manager() {
1966 let _ = tracing_subscriber::fmt()
1967 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1968 .try_init();
1969
1970 let (mut manager, _handle) = TransportManager::new(
1971 Keypair::generate(),
1972 HashSet::new(),
1973 BandwidthSink::new(),
1974 8usize,
1975 ConnectionLimitsConfig::default(),
1976 );
1977 let peer = PeerId::random();
1978 let dial_address = Multiaddr::empty()
1979 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
1980 .with(Protocol::Tcp(8888))
1981 .with(Protocol::P2p(
1982 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
1983 ));
1984
1985 let transport = Box::new({
1986 let mut transport = DummyTransport::new();
1987 transport.inject_event(TransportEvent::ConnectionEstablished {
1988 peer,
1989 endpoint: Endpoint::dialer(dial_address.clone(), ConnectionId::from(0usize)),
1990 });
1991 transport
1992 });
1993 manager.register_transport(SupportedTransport::Tcp, transport);
1994
1995 assert!(manager.dial_address(dial_address.clone()).await.is_ok());
1996 assert!(!manager.pending_connections.is_empty());
1997
1998 {
1999 let peers = manager.peers.read();
2000
2001 match peers.get(&peer) {
2002 Some(PeerContext {
2003 state: PeerState::Dialing { .. },
2004 ..
2005 }) => {}
2006 state => panic!("invalid state for peer: {state:?}"),
2007 }
2008 }
2009
2010 match manager.next().await.unwrap() {
2011 TransportEvent::ConnectionEstablished {
2012 peer: event_peer,
2013 endpoint: event_endpoint,
2014 ..
2015 } => {
2016 assert_eq!(peer, event_peer);
2017 assert_eq!(
2018 event_endpoint,
2019 Endpoint::dialer(dial_address.clone(), ConnectionId::from(0usize))
2020 )
2021 }
2022 event => panic!("invalid event: {event:?}"),
2023 }
2024 }
2025
2026 #[tokio::test]
2027 async fn try_to_dial_same_peer_twice() {
2028 let _ = tracing_subscriber::fmt()
2029 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2030 .try_init();
2031
2032 let (mut manager, _handle) = TransportManager::new(
2033 Keypair::generate(),
2034 HashSet::new(),
2035 BandwidthSink::new(),
2036 8usize,
2037 ConnectionLimitsConfig::default(),
2038 );
2039 let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2040 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2041
2042 let peer = PeerId::random();
2043 let dial_address = Multiaddr::empty()
2044 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2045 .with(Protocol::Tcp(8888))
2046 .with(Protocol::P2p(
2047 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2048 ));
2049
2050 assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2051 assert_eq!(manager.pending_connections.len(), 1);
2052
2053 assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2054 assert_eq!(manager.pending_connections.len(), 1);
2055 }
2056
2057 #[tokio::test]
2058 async fn try_to_dial_same_peer_twice_diffrent_address() {
2059 let _ = tracing_subscriber::fmt()
2060 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2061 .try_init();
2062
2063 let (mut manager, _handle) = TransportManager::new(
2064 Keypair::generate(),
2065 HashSet::new(),
2066 BandwidthSink::new(),
2067 8usize,
2068 ConnectionLimitsConfig::default(),
2069 );
2070 let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2071 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2072
2073 let peer = PeerId::random();
2074
2075 assert!(manager
2076 .dial_address(
2077 Multiaddr::empty()
2078 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2079 .with(Protocol::Tcp(8888))
2080 .with(Protocol::P2p(
2081 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2082 ))
2083 )
2084 .await
2085 .is_ok());
2086 assert_eq!(manager.pending_connections.len(), 1);
2087
2088 assert!(manager
2089 .dial_address(
2090 Multiaddr::empty()
2091 .with(Protocol::Ip6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))
2092 .with(Protocol::Tcp(8888))
2093 .with(Protocol::P2p(
2094 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2095 ))
2096 )
2097 .await
2098 .is_ok());
2099 assert_eq!(manager.pending_connections.len(), 1);
2100 }
2101
2102 #[tokio::test]
2103 async fn dial_non_existent_peer() {
2104 let _ = tracing_subscriber::fmt()
2105 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2106 .try_init();
2107
2108 let (mut manager, _handle) = TransportManager::new(
2109 Keypair::generate(),
2110 HashSet::new(),
2111 BandwidthSink::new(),
2112 8usize,
2113 ConnectionLimitsConfig::default(),
2114 );
2115 let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2116 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2117
2118 assert!(manager.dial(PeerId::random()).await.is_err());
2119 }
2120
2121 #[tokio::test]
2122 async fn dial_non_peer_with_no_known_addresses() {
2123 let _ = tracing_subscriber::fmt()
2124 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2125 .try_init();
2126
2127 let (mut manager, _handle) = TransportManager::new(
2128 Keypair::generate(),
2129 HashSet::new(),
2130 BandwidthSink::new(),
2131 8usize,
2132 ConnectionLimitsConfig::default(),
2133 );
2134 let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2135 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2136
2137 let peer = PeerId::random();
2138 manager.peers.write().insert(
2139 peer,
2140 PeerContext {
2141 state: PeerState::Disconnected { dial_record: None },
2142 addresses: AddressStore::new(),
2143 secondary_connection: None,
2144 },
2145 );
2146
2147 assert!(manager.dial(peer).await.is_err());
2148 }
2149
2150 #[tokio::test]
2151 async fn check_supported_transport_when_adding_known_address() {
2152 let _ = tracing_subscriber::fmt()
2153 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2154 .try_init();
2155
2156 let mut transports = HashSet::new();
2157 transports.insert(SupportedTransport::Tcp);
2158 #[cfg(feature = "quic")]
2159 transports.insert(SupportedTransport::Quic);
2160
2161 let (_manager, handle) = TransportManager::new(
2162 Keypair::generate(),
2163 transports,
2164 BandwidthSink::new(),
2165 8usize,
2166 ConnectionLimitsConfig::default(),
2167 );
2168
2169 let address = Multiaddr::empty()
2171 .with(Protocol::Ip6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))
2172 .with(Protocol::Tcp(8888))
2173 .with(Protocol::P2p(
2174 Multihash::from_bytes(&PeerId::random().to_bytes()).unwrap(),
2175 ));
2176 assert!(handle.supported_transport(&address));
2177
2178 let address = Multiaddr::empty()
2180 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2181 .with(Protocol::Tcp(8888))
2182 .with(Protocol::P2p(
2183 Multihash::from_bytes(&PeerId::random().to_bytes()).unwrap(),
2184 ));
2185 assert!(handle.supported_transport(&address));
2186
2187 let address = Multiaddr::empty()
2189 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2190 .with(Protocol::Udp(8888))
2191 .with(Protocol::QuicV1)
2192 .with(Protocol::P2p(
2193 Multihash::from_bytes(&PeerId::random().to_bytes()).unwrap(),
2194 ));
2195 #[cfg(feature = "quic")]
2196 assert!(handle.supported_transport(&address));
2197 #[cfg(not(feature = "quic"))]
2198 assert!(!handle.supported_transport(&address));
2199
2200 let address = Multiaddr::empty()
2202 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2203 .with(Protocol::Tcp(8888))
2204 .with(Protocol::Ws(std::borrow::Cow::Owned("/".to_string())));
2205 assert!(!handle.supported_transport(&address));
2206
2207 let address = Multiaddr::empty()
2209 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2210 .with(Protocol::Tcp(8888))
2211 .with(Protocol::Wss(std::borrow::Cow::Owned("/".to_string())));
2212 assert!(!handle.supported_transport(&address));
2213 }
2214
2215 #[tokio::test]
2218 async fn on_dial_failure_already_connected() {
2219 let _ = tracing_subscriber::fmt()
2220 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2221 .try_init();
2222
2223 let (mut manager, _handle) = TransportManager::new(
2224 Keypair::generate(),
2225 HashSet::new(),
2226 BandwidthSink::new(),
2227 8usize,
2228 ConnectionLimitsConfig::default(),
2229 );
2230 let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2231 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2232
2233 let peer = PeerId::random();
2234 let dial_address = Multiaddr::empty()
2235 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2236 .with(Protocol::Tcp(8888))
2237 .with(Protocol::P2p(
2238 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2239 ));
2240 let connect_address = Multiaddr::empty()
2241 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2242 .with(Protocol::Tcp(8888))
2243 .with(Protocol::P2p(
2244 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2245 ));
2246 assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2247 assert_eq!(manager.pending_connections.len(), 1);
2248
2249 match &manager.peers.read().get(&peer).unwrap().state {
2250 PeerState::Dialing { record } => {
2251 assert_eq!(record.address(), &dial_address);
2252 }
2253 state => panic!("invalid state for peer: {state:?}"),
2254 }
2255
2256 manager
2258 .on_connection_established(
2259 peer,
2260 &Endpoint::dialer(connect_address, ConnectionId::from(1usize)),
2261 )
2262 .unwrap();
2263
2264 manager.on_dial_failure(ConnectionId::from(0usize)).unwrap();
2266
2267 let peers = manager.peers.read();
2268 let peer = peers.get(&peer).unwrap();
2269
2270 match &peer.state {
2271 PeerState::Connected { dial_record, .. } => {
2272 assert!(dial_record.is_none());
2273 assert!(peer.addresses.contains(&dial_address));
2274 }
2275 state => panic!("invalid state: {state:?}"),
2276 }
2277 }
2278
2279 #[tokio::test]
2285 async fn on_dial_failure_already_connected_and_disconnected() {
2286 let _ = tracing_subscriber::fmt()
2287 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2288 .try_init();
2289
2290 let (mut manager, _handle) = TransportManager::new(
2291 Keypair::generate(),
2292 HashSet::new(),
2293 BandwidthSink::new(),
2294 8usize,
2295 ConnectionLimitsConfig::default(),
2296 );
2297 let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2298 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2299
2300 let peer = PeerId::random();
2301 let dial_address = Multiaddr::empty()
2302 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2303 .with(Protocol::Tcp(8888))
2304 .with(Protocol::P2p(
2305 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2306 ));
2307 let connect_address = Multiaddr::empty()
2308 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2309 .with(Protocol::Tcp(8888))
2310 .with(Protocol::P2p(
2311 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2312 ));
2313 assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2314 assert_eq!(manager.pending_connections.len(), 1);
2315
2316 match &manager.peers.read().get(&peer).unwrap().state {
2317 PeerState::Dialing { record } => {
2318 assert_eq!(record.address(), &dial_address);
2319 }
2320 state => panic!("invalid state for peer: {state:?}"),
2321 }
2322
2323 manager
2325 .on_connection_established(
2326 peer,
2327 &Endpoint::listener(connect_address, ConnectionId::from(1usize)),
2328 )
2329 .unwrap();
2330
2331 manager.on_connection_closed(peer, ConnectionId::from(1usize)).unwrap();
2333
2334 {
2336 let peers = manager.peers.read();
2337 let peer = peers.get(&peer).unwrap();
2338
2339 match &peer.state {
2340 PeerState::Disconnected {
2341 dial_record: Some(dial_record),
2342 ..
2343 } => {
2344 assert_eq!(dial_record.address(), &dial_address);
2345 }
2346 state => panic!("invalid state: {state:?}"),
2347 }
2348 }
2349
2350 manager.on_dial_failure(ConnectionId::from(0usize)).unwrap();
2352
2353 let peers = manager.peers.read();
2354 let peer = peers.get(&peer).unwrap();
2355
2356 match &peer.state {
2357 PeerState::Disconnected {
2358 dial_record: None, ..
2359 } => {
2360 assert!(peer.addresses.contains(&dial_address));
2361 }
2362 state => panic!("invalid state: {state:?}"),
2363 }
2364 }
2365
2366 #[tokio::test]
2372 async fn on_dial_success_while_connected_and_disconnected() {
2373 let _ = tracing_subscriber::fmt()
2374 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2375 .try_init();
2376
2377 let (mut manager, _handle) = TransportManager::new(
2378 Keypair::generate(),
2379 HashSet::new(),
2380 BandwidthSink::new(),
2381 8usize,
2382 ConnectionLimitsConfig::default(),
2383 );
2384 let _handle = manager.transport_handle(Arc::new(DefaultExecutor {}));
2385 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2386
2387 let peer = PeerId::random();
2388 let dial_address = Multiaddr::empty()
2389 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2390 .with(Protocol::Tcp(8888))
2391 .with(Protocol::P2p(
2392 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2393 ));
2394 let connect_address = Multiaddr::empty()
2395 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2396 .with(Protocol::Tcp(8888))
2397 .with(Protocol::P2p(
2398 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2399 ));
2400 assert!(manager.dial_address(dial_address.clone()).await.is_ok());
2401 assert_eq!(manager.pending_connections.len(), 1);
2402
2403 match &manager.peers.read().get(&peer).unwrap().state {
2404 PeerState::Dialing { record } => {
2405 assert_eq!(record.address(), &dial_address);
2406 }
2407 state => panic!("invalid state for peer: {state:?}"),
2408 }
2409
2410 manager
2412 .on_connection_established(
2413 peer,
2414 &Endpoint::listener(connect_address, ConnectionId::from(1usize)),
2415 )
2416 .unwrap();
2417
2418 manager.on_connection_closed(peer, ConnectionId::from(1usize)).unwrap();
2420
2421 {
2423 let peers = manager.peers.read();
2424 let peer = peers.get(&peer).unwrap();
2425
2426 match &peer.state {
2427 PeerState::Disconnected {
2428 dial_record: Some(dial_record),
2429 ..
2430 } => {
2431 assert_eq!(dial_record.address(), &dial_address);
2432 }
2433 state => panic!("invalid state: {state:?}"),
2434 }
2435 }
2436
2437 manager
2439 .on_connection_established(
2440 peer,
2441 &Endpoint::dialer(dial_address, ConnectionId::from(0usize)),
2442 )
2443 .unwrap();
2444
2445 let peers = manager.peers.read();
2446 let peer = peers.get(&peer).unwrap();
2447
2448 match &peer.state {
2449 PeerState::Connected {
2450 dial_record: None, ..
2451 } => {}
2452 state => panic!("invalid state: {state:?}"),
2453 }
2454 }
2455
2456 #[tokio::test]
2457 async fn secondary_connection_is_tracked() {
2458 let _ = tracing_subscriber::fmt()
2459 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2460 .try_init();
2461
2462 let (mut manager, _handle) = TransportManager::new(
2463 Keypair::generate(),
2464 HashSet::new(),
2465 BandwidthSink::new(),
2466 8usize,
2467 ConnectionLimitsConfig::default(),
2468 );
2469 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2470
2471 let peer = PeerId::random();
2472 let address1 = Multiaddr::empty()
2473 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2474 .with(Protocol::Tcp(8888))
2475 .with(Protocol::P2p(
2476 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2477 ));
2478 let address2 = Multiaddr::empty()
2479 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2480 .with(Protocol::Tcp(8888))
2481 .with(Protocol::P2p(
2482 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2483 ));
2484 let address3 = Multiaddr::empty()
2485 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 10, 64)))
2486 .with(Protocol::Tcp(9999))
2487 .with(Protocol::P2p(
2488 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2489 ));
2490
2491 let established_result = manager
2493 .on_connection_established(
2494 peer,
2495 &Endpoint::listener(address1, ConnectionId::from(0usize)),
2496 )
2497 .unwrap();
2498 assert_eq!(established_result, ConnectionEstablishedResult::Accept);
2499
2500 {
2502 let peers = manager.peers.read();
2503 let peer = peers.get(&peer).unwrap();
2504
2505 match &peer.state {
2506 PeerState::Connected {
2507 dial_record: None, ..
2508 } => {
2509 assert!(peer.secondary_connection.is_none());
2510 }
2511 state => panic!("invalid state: {state:?}"),
2512 }
2513 }
2514
2515 let established_result = manager
2517 .on_connection_established(
2518 peer,
2519 &Endpoint::listener(address2.clone(), ConnectionId::from(1usize)),
2520 )
2521 .unwrap();
2522 assert_eq!(established_result, ConnectionEstablishedResult::Accept);
2523
2524 let peers = manager.peers.read();
2525 let context = peers.get(&peer).unwrap();
2526
2527 match &context.state {
2528 PeerState::Connected {
2529 dial_record: None, ..
2530 } => {
2531 let seconary_connection = context.secondary_connection.as_ref().unwrap();
2532 assert_eq!(seconary_connection.address(), &address2);
2533 assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2534 }
2535 state => panic!("invalid state: {state:?}"),
2536 }
2537 drop(peers);
2538
2539 let established_result = manager
2541 .on_connection_established(
2542 peer,
2543 &Endpoint::listener(address3.clone(), ConnectionId::from(2usize)),
2544 )
2545 .unwrap();
2546 assert_eq!(established_result, ConnectionEstablishedResult::Reject);
2547
2548 let peers = manager.peers.read();
2549 let peer = peers.get(&peer).unwrap();
2550
2551 match &peer.state {
2552 PeerState::Connected {
2553 dial_record: None, ..
2554 } => {
2555 let seconary_connection = peer.secondary_connection.as_ref().unwrap();
2556 assert_eq!(seconary_connection.address(), &address2);
2557 assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2558 assert!(peer.addresses.contains(&address3));
2559 }
2560 state => panic!("invalid state: {state:?}"),
2561 }
2562 }
2563
2564 #[tokio::test]
2565 async fn secondary_connection_with_different_dial_endpoint_is_rejected() {
2566 let _ = tracing_subscriber::fmt()
2567 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2568 .try_init();
2569
2570 let (mut manager, _handle) = TransportManager::new(
2571 Keypair::generate(),
2572 HashSet::new(),
2573 BandwidthSink::new(),
2574 8usize,
2575 ConnectionLimitsConfig::default(),
2576 );
2577 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2578
2579 let peer = PeerId::random();
2580 let address1 = Multiaddr::empty()
2581 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2582 .with(Protocol::Tcp(8888))
2583 .with(Protocol::P2p(
2584 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2585 ));
2586 let address2 = Multiaddr::empty()
2587 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2588 .with(Protocol::Tcp(8888))
2589 .with(Protocol::P2p(
2590 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2591 ));
2592
2593 let established_result = manager
2595 .on_connection_established(
2596 peer,
2597 &Endpoint::listener(address1, ConnectionId::from(0usize)),
2598 )
2599 .unwrap();
2600 assert_eq!(established_result, ConnectionEstablishedResult::Accept);
2601
2602 {
2604 let peers = manager.peers.read();
2605 let peer = peers.get(&peer).unwrap();
2606
2607 match &peer.state {
2608 PeerState::Connected {
2609 dial_record: None, ..
2610 } => {
2611 assert!(peer.secondary_connection.is_none());
2612 }
2613 state => panic!("invalid state: {state:?}"),
2614 }
2615 }
2616
2617 {
2619 let mut peers = manager.peers.write();
2620 let peer_context = peers.get_mut(&peer).unwrap();
2621
2622 let record = match &peer_context.state {
2623 PeerState::Connected { record, .. } => record.clone(),
2624 state => panic!("invalid state: {state:?}"),
2625 };
2626
2627 let dial_record = Some(AddressRecord::new(
2628 &peer,
2629 address2.clone(),
2630 0,
2631 Some(ConnectionId::from(0usize)),
2632 ));
2633
2634 peer_context.state = PeerState::Connected {
2635 record,
2636 dial_record,
2637 };
2638 }
2639
2640 let established_result = manager
2642 .on_connection_established(
2643 peer,
2644 &Endpoint::listener(address2.clone(), ConnectionId::from(1usize)),
2645 )
2646 .unwrap();
2647 assert_eq!(established_result, ConnectionEstablishedResult::Reject);
2648
2649 let established_result = manager
2651 .on_connection_established(
2652 peer,
2653 &Endpoint::listener(address2.clone(), ConnectionId::from(1usize)),
2654 )
2655 .unwrap();
2656 assert_eq!(established_result, ConnectionEstablishedResult::Reject);
2657
2658 let established_result = manager
2660 .on_connection_established(
2661 peer,
2662 &Endpoint::listener(address2.clone(), ConnectionId::from(0usize)),
2663 )
2664 .unwrap();
2665 assert_eq!(established_result, ConnectionEstablishedResult::Accept);
2666 }
2667
2668 #[tokio::test]
2669 async fn secondary_connection_closed() {
2670 let _ = tracing_subscriber::fmt()
2671 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2672 .try_init();
2673
2674 let (mut manager, _handle) = TransportManager::new(
2675 Keypair::generate(),
2676 HashSet::new(),
2677 BandwidthSink::new(),
2678 8usize,
2679 ConnectionLimitsConfig::default(),
2680 );
2681 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2682
2683 let peer = PeerId::random();
2684 let address1 = Multiaddr::empty()
2685 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2686 .with(Protocol::Tcp(8888))
2687 .with(Protocol::P2p(
2688 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2689 ));
2690 let address2 = Multiaddr::empty()
2691 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2692 .with(Protocol::Tcp(8888))
2693 .with(Protocol::P2p(
2694 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2695 ));
2696
2697 let emit_event = manager
2699 .on_connection_established(
2700 peer,
2701 &Endpoint::listener(address1, ConnectionId::from(0usize)),
2702 )
2703 .unwrap();
2704 assert!(std::matches!(
2705 emit_event,
2706 ConnectionEstablishedResult::Accept
2707 ));
2708
2709 {
2711 let peers = manager.peers.read();
2712 let peer = peers.get(&peer).unwrap();
2713
2714 match &peer.state {
2715 PeerState::Connected {
2716 dial_record: None, ..
2717 } => {
2718 assert!(peer.secondary_connection.is_none());
2719 }
2720 state => panic!("invalid state: {state:?}"),
2721 }
2722 }
2723
2724 let emit_event = manager
2726 .on_connection_established(
2727 peer,
2728 &Endpoint::dialer(address2.clone(), ConnectionId::from(1usize)),
2729 )
2730 .unwrap();
2731 assert!(std::matches!(
2732 emit_event,
2733 ConnectionEstablishedResult::Accept
2734 ));
2735
2736 let peers = manager.peers.read();
2737 let context = peers.get(&peer).unwrap();
2738
2739 match &context.state {
2740 PeerState::Connected {
2741 dial_record: None, ..
2742 } => {
2743 let seconary_connection = context.secondary_connection.as_ref().unwrap();
2744 assert_eq!(seconary_connection.address(), &address2);
2745 assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2746 }
2747 state => panic!("invalid state: {state:?}"),
2748 }
2749 drop(peers);
2750
2751 let emit_event = manager.on_connection_closed(peer, ConnectionId::from(1usize)).unwrap();
2753 assert!(emit_event.is_none());
2754
2755 let peers = manager.peers.read();
2756 let context = peers.get(&peer).unwrap();
2757
2758 match &context.state {
2759 PeerState::Connected {
2760 dial_record: None,
2761 record,
2762 } => {
2763 assert!(context.secondary_connection.is_none());
2764 assert!(context.addresses.contains(&address2));
2765 assert_eq!(record.connection_id(), &Some(ConnectionId::from(0usize)));
2766 }
2767 state => panic!("invalid state: {state:?}"),
2768 }
2769 }
2770
2771 #[tokio::test]
2772 async fn switch_to_secondary_connection() {
2773 let _ = tracing_subscriber::fmt()
2774 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2775 .try_init();
2776
2777 let (mut manager, _handle) = TransportManager::new(
2778 Keypair::generate(),
2779 HashSet::new(),
2780 BandwidthSink::new(),
2781 8usize,
2782 ConnectionLimitsConfig::default(),
2783 );
2784 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2785
2786 let peer = PeerId::random();
2787 let address1 = Multiaddr::empty()
2788 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2789 .with(Protocol::Tcp(8888))
2790 .with(Protocol::P2p(
2791 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2792 ));
2793 let address2 = Multiaddr::empty()
2794 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2795 .with(Protocol::Tcp(8888))
2796 .with(Protocol::P2p(
2797 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2798 ));
2799
2800 let emit_event = manager
2802 .on_connection_established(
2803 peer,
2804 &Endpoint::listener(address1.clone(), ConnectionId::from(0usize)),
2805 )
2806 .unwrap();
2807 assert!(std::matches!(
2808 emit_event,
2809 ConnectionEstablishedResult::Accept
2810 ));
2811
2812 {
2814 let peers = manager.peers.read();
2815 let peer = peers.get(&peer).unwrap();
2816
2817 match &peer.state {
2818 PeerState::Connected {
2819 dial_record: None, ..
2820 } => {
2821 assert!(peer.secondary_connection.is_none());
2822 }
2823 state => panic!("invalid state: {state:?}"),
2824 }
2825 }
2826
2827 let emit_event = manager
2829 .on_connection_established(
2830 peer,
2831 &Endpoint::dialer(address2.clone(), ConnectionId::from(1usize)),
2832 )
2833 .unwrap();
2834 assert!(std::matches!(
2835 emit_event,
2836 ConnectionEstablishedResult::Accept
2837 ));
2838
2839 let peers = manager.peers.read();
2840 let context = peers.get(&peer).unwrap();
2841
2842 match &context.state {
2843 PeerState::Connected {
2844 dial_record: None, ..
2845 } => {
2846 let seconary_connection = context.secondary_connection.as_ref().unwrap();
2847 assert_eq!(seconary_connection.address(), &address2);
2848 assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2849 }
2850 state => panic!("invalid state: {state:?}"),
2851 }
2852 drop(peers);
2853
2854 let emit_event = manager.on_connection_closed(peer, ConnectionId::from(0usize)).unwrap();
2857 assert!(emit_event.is_none());
2858
2859 let peers = manager.peers.read();
2860 let context = peers.get(&peer).unwrap();
2861
2862 match &context.state {
2863 PeerState::Connected {
2864 dial_record: None,
2865 record,
2866 } => {
2867 assert!(context.secondary_connection.is_none());
2868 assert!(context.addresses.contains(&address1));
2869 assert_eq!(record.connection_id(), &Some(ConnectionId::from(1usize)));
2870 }
2871 state => panic!("invalid state: {state:?}"),
2872 }
2873 }
2874
2875 #[tokio::test]
2879 async fn tertiary_connection_closed() {
2880 let _ = tracing_subscriber::fmt()
2881 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2882 .try_init();
2883
2884 let (mut manager, _handle) = TransportManager::new(
2885 Keypair::generate(),
2886 HashSet::new(),
2887 BandwidthSink::new(),
2888 8usize,
2889 ConnectionLimitsConfig::default(),
2890 );
2891 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
2892
2893 let peer = PeerId::random();
2894 let address1 = Multiaddr::empty()
2895 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
2896 .with(Protocol::Tcp(8888))
2897 .with(Protocol::P2p(
2898 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2899 ));
2900 let address2 = Multiaddr::empty()
2901 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2902 .with(Protocol::Tcp(8888))
2903 .with(Protocol::P2p(
2904 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2905 ));
2906 let address3 = Multiaddr::empty()
2907 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 173)))
2908 .with(Protocol::Tcp(9999))
2909 .with(Protocol::P2p(
2910 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
2911 ));
2912
2913 let emit_event = manager
2915 .on_connection_established(
2916 peer,
2917 &Endpoint::listener(address1, ConnectionId::from(0usize)),
2918 )
2919 .unwrap();
2920 assert!(std::matches!(
2921 emit_event,
2922 ConnectionEstablishedResult::Accept
2923 ));
2924
2925 {
2927 let peers = manager.peers.read();
2928 let peer = peers.get(&peer).unwrap();
2929
2930 match &peer.state {
2931 PeerState::Connected {
2932 dial_record: None, ..
2933 } => {
2934 assert!(peer.secondary_connection.is_none());
2935 }
2936 state => panic!("invalid state: {state:?}"),
2937 }
2938 }
2939
2940 let emit_event = manager
2942 .on_connection_established(
2943 peer,
2944 &Endpoint::dialer(address2.clone(), ConnectionId::from(1usize)),
2945 )
2946 .unwrap();
2947 assert!(std::matches!(
2948 emit_event,
2949 ConnectionEstablishedResult::Accept
2950 ));
2951
2952 let peers = manager.peers.read();
2953 let context = peers.get(&peer).unwrap();
2954
2955 match &context.state {
2956 PeerState::Connected {
2957 dial_record: None, ..
2958 } => {
2959 let seconary_connection = context.secondary_connection.as_ref().unwrap();
2960 assert_eq!(seconary_connection.address(), &address2);
2961 assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2962 }
2963 state => panic!("invalid state: {state:?}"),
2964 }
2965 drop(peers);
2966
2967 let emit_event = manager
2969 .on_connection_established(
2970 peer,
2971 &Endpoint::listener(address3.clone(), ConnectionId::from(2usize)),
2972 )
2973 .unwrap();
2974 assert!(std::matches!(
2975 emit_event,
2976 ConnectionEstablishedResult::Reject
2977 ));
2978
2979 let peers = manager.peers.read();
2980 let context = peers.get(&peer).unwrap();
2981 assert!(context.addresses.contains(&address3));
2982 drop(peers);
2983
2984 let emit_event = manager.on_connection_closed(peer, ConnectionId::from(2usize)).unwrap();
2986 assert!(emit_event.is_none());
2987
2988 let peers = manager.peers.read();
2990 let context = peers.get(&peer).unwrap();
2991
2992 match &context.state {
2993 PeerState::Connected {
2994 dial_record: None, ..
2995 } => {
2996 let seconary_connection = context.secondary_connection.as_ref().unwrap();
2997 assert_eq!(seconary_connection.address(), &address2);
2998 assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
2999 }
3000 state => panic!("invalid state: {state:?}"),
3001 }
3002 drop(peers);
3003 }
3004
3005 #[tokio::test]
3006 #[cfg(debug_assertions)]
3007 #[should_panic]
3008 async fn dial_failure_for_unknow_connection() {
3009 let _ = tracing_subscriber::fmt()
3010 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3011 .try_init();
3012
3013 let (mut manager, _handle) = TransportManager::new(
3014 Keypair::generate(),
3015 HashSet::new(),
3016 BandwidthSink::new(),
3017 8usize,
3018 ConnectionLimitsConfig::default(),
3019 );
3020
3021 manager.on_dial_failure(ConnectionId::random()).unwrap();
3022 }
3023
3024 #[tokio::test]
3025 #[cfg(debug_assertions)]
3026 #[should_panic]
3027 async fn dial_failure_for_unknow_peer() {
3028 let _ = tracing_subscriber::fmt()
3029 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3030 .try_init();
3031
3032 let (mut manager, _handle) = TransportManager::new(
3033 Keypair::generate(),
3034 HashSet::new(),
3035 BandwidthSink::new(),
3036 8usize,
3037 ConnectionLimitsConfig::default(),
3038 );
3039 let connection_id = ConnectionId::random();
3040 let peer = PeerId::random();
3041 manager.pending_connections.insert(connection_id, peer);
3042 manager.on_dial_failure(connection_id).unwrap();
3043 }
3044
3045 #[tokio::test]
3046 #[cfg(debug_assertions)]
3047 #[should_panic]
3048 async fn connection_closed_for_unknown_peer() {
3049 let _ = tracing_subscriber::fmt()
3050 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3051 .try_init();
3052
3053 let (mut manager, _handle) = TransportManager::new(
3054 Keypair::generate(),
3055 HashSet::new(),
3056 BandwidthSink::new(),
3057 8usize,
3058 ConnectionLimitsConfig::default(),
3059 );
3060 manager.on_connection_closed(PeerId::random(), ConnectionId::random()).unwrap();
3061 }
3062
3063 #[tokio::test]
3064 #[cfg(debug_assertions)]
3065 #[should_panic]
3066 async fn unknown_connection_opened() {
3067 let _ = tracing_subscriber::fmt()
3068 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3069 .try_init();
3070
3071 let (mut manager, _handle) = TransportManager::new(
3072 Keypair::generate(),
3073 HashSet::new(),
3074 BandwidthSink::new(),
3075 8usize,
3076 ConnectionLimitsConfig::default(),
3077 );
3078 manager
3079 .on_connection_opened(
3080 SupportedTransport::Tcp,
3081 ConnectionId::random(),
3082 Multiaddr::empty(),
3083 )
3084 .unwrap();
3085 }
3086
3087 #[tokio::test]
3088 #[cfg(debug_assertions)]
3089 #[should_panic]
3090 async fn connection_opened_for_unknown_peer() {
3091 let _ = tracing_subscriber::fmt()
3092 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3093 .try_init();
3094
3095 let (mut manager, _handle) = TransportManager::new(
3096 Keypair::generate(),
3097 HashSet::new(),
3098 BandwidthSink::new(),
3099 8usize,
3100 ConnectionLimitsConfig::default(),
3101 );
3102 let connection_id = ConnectionId::random();
3103 let peer = PeerId::random();
3104
3105 manager.pending_connections.insert(connection_id, peer);
3106 manager
3107 .on_connection_opened(SupportedTransport::Tcp, connection_id, Multiaddr::empty())
3108 .unwrap();
3109 }
3110
3111 #[tokio::test]
3112 #[cfg(debug_assertions)]
3113 #[should_panic]
3114 async fn connection_established_for_wrong_peer() {
3115 let _ = tracing_subscriber::fmt()
3116 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3117 .try_init();
3118
3119 let (mut manager, _handle) = TransportManager::new(
3120 Keypair::generate(),
3121 HashSet::new(),
3122 BandwidthSink::new(),
3123 8usize,
3124 ConnectionLimitsConfig::default(),
3125 );
3126 let connection_id = ConnectionId::random();
3127 let peer = PeerId::random();
3128
3129 manager.pending_connections.insert(connection_id, peer);
3130 manager
3131 .on_connection_established(
3132 PeerId::random(),
3133 &Endpoint::dialer(Multiaddr::empty(), connection_id),
3134 )
3135 .unwrap();
3136 }
3137
3138 #[tokio::test]
3139 #[cfg(debug_assertions)]
3140 #[should_panic]
3141 async fn open_failure_unknown_connection() {
3142 let _ = tracing_subscriber::fmt()
3143 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3144 .try_init();
3145
3146 let (mut manager, _handle) = TransportManager::new(
3147 Keypair::generate(),
3148 HashSet::new(),
3149 BandwidthSink::new(),
3150 8usize,
3151 ConnectionLimitsConfig::default(),
3152 );
3153
3154 manager
3155 .on_open_failure(SupportedTransport::Tcp, ConnectionId::random())
3156 .unwrap();
3157 }
3158
3159 #[tokio::test]
3160 #[cfg(debug_assertions)]
3161 #[should_panic]
3162 async fn open_failure_unknown_peer() {
3163 let _ = tracing_subscriber::fmt()
3164 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3165 .try_init();
3166
3167 let (mut manager, _handle) = TransportManager::new(
3168 Keypair::generate(),
3169 HashSet::new(),
3170 BandwidthSink::new(),
3171 8usize,
3172 ConnectionLimitsConfig::default(),
3173 );
3174 let connection_id = ConnectionId::random();
3175 let peer = PeerId::random();
3176
3177 manager.pending_connections.insert(connection_id, peer);
3178 manager.on_open_failure(SupportedTransport::Tcp, connection_id).unwrap();
3179 }
3180
3181 #[tokio::test]
3182 async fn no_transports() {
3183 let _ = tracing_subscriber::fmt()
3184 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3185 .try_init();
3186
3187 let (mut manager, _handle) = TransportManager::new(
3188 Keypair::generate(),
3189 HashSet::new(),
3190 BandwidthSink::new(),
3191 8usize,
3192 ConnectionLimitsConfig::default(),
3193 );
3194
3195 assert!(manager.next().await.is_none());
3196 }
3197
3198 #[tokio::test]
3199 async fn dial_already_connected_peer() {
3200 let (mut manager, _handle) = TransportManager::new(
3201 Keypair::generate(),
3202 HashSet::new(),
3203 BandwidthSink::new(),
3204 8usize,
3205 ConnectionLimitsConfig::default(),
3206 );
3207
3208 let peer = {
3209 let peer = PeerId::random();
3210 let mut peers = manager.peers.write();
3211
3212 peers.insert(
3213 peer,
3214 PeerContext {
3215 state: PeerState::Connected {
3216 record: AddressRecord::from_multiaddr(
3217 Multiaddr::empty()
3218 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3219 .with(Protocol::Tcp(8888))
3220 .with(Protocol::P2p(Multihash::from(peer))),
3221 )
3222 .unwrap(),
3223 dial_record: None,
3224 },
3225 secondary_connection: None,
3226 addresses: AddressStore::from_iter(
3227 vec![Multiaddr::empty()
3228 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3229 .with(Protocol::Tcp(8888))
3230 .with(Protocol::P2p(Multihash::from(peer)))]
3231 .into_iter(),
3232 ),
3233 },
3234 );
3235 drop(peers);
3236
3237 peer
3238 };
3239
3240 match manager.dial(peer).await {
3241 Err(Error::AlreadyConnected) => {}
3242 _ => panic!("invalid return value"),
3243 }
3244 }
3245
3246 #[tokio::test]
3247 async fn peer_already_being_dialed() {
3248 let (mut manager, _handle) = TransportManager::new(
3249 Keypair::generate(),
3250 HashSet::new(),
3251 BandwidthSink::new(),
3252 8usize,
3253 ConnectionLimitsConfig::default(),
3254 );
3255
3256 let peer = {
3257 let peer = PeerId::random();
3258 let mut peers = manager.peers.write();
3259
3260 peers.insert(
3261 peer,
3262 PeerContext {
3263 state: PeerState::Dialing {
3264 record: AddressRecord::from_multiaddr(
3265 Multiaddr::empty()
3266 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3267 .with(Protocol::Tcp(8888))
3268 .with(Protocol::P2p(Multihash::from(peer))),
3269 )
3270 .unwrap(),
3271 },
3272 secondary_connection: None,
3273 addresses: AddressStore::from_iter(
3274 vec![Multiaddr::empty()
3275 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3276 .with(Protocol::Tcp(8888))
3277 .with(Protocol::P2p(Multihash::from(peer)))]
3278 .into_iter(),
3279 ),
3280 },
3281 );
3282 drop(peers);
3283
3284 peer
3285 };
3286
3287 manager.dial(peer).await.unwrap();
3288
3289 {
3291 let peers = manager.peers.read();
3292 let peer_context = peers.get(&peer).unwrap();
3293
3294 match &peer_context.state {
3295 PeerState::Dialing { record } => {
3296 assert_eq!(
3297 record.address(),
3298 &Multiaddr::empty()
3299 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3300 .with(Protocol::Tcp(8888))
3301 .with(Protocol::P2p(Multihash::from(peer)))
3302 );
3303 }
3304 state => panic!("invalid state: {state:?}"),
3305 }
3306 }
3307 }
3308
3309 #[tokio::test]
3310 async fn pending_connection_for_disconnected_peer() {
3311 let (mut manager, _handle) = TransportManager::new(
3312 Keypair::generate(),
3313 HashSet::new(),
3314 BandwidthSink::new(),
3315 8usize,
3316 ConnectionLimitsConfig::default(),
3317 );
3318
3319 let peer = {
3320 let peer = PeerId::random();
3321 let mut peers = manager.peers.write();
3322
3323 peers.insert(
3324 peer,
3325 PeerContext {
3326 state: PeerState::Disconnected {
3327 dial_record: Some(
3328 AddressRecord::from_multiaddr(
3329 Multiaddr::empty()
3330 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3331 .with(Protocol::Tcp(8888))
3332 .with(Protocol::P2p(Multihash::from(peer))),
3333 )
3334 .unwrap(),
3335 ),
3336 },
3337 secondary_connection: None,
3338 addresses: AddressStore::new(),
3339 },
3340 );
3341 drop(peers);
3342
3343 peer
3344 };
3345
3346 manager.dial(peer).await.unwrap();
3347 }
3348
3349 #[tokio::test]
3350 async fn dial_address_invalid_transport() {
3351 let _ = tracing_subscriber::fmt()
3352 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3353 .try_init();
3354
3355 let (mut manager, _handle) = TransportManager::new(
3356 Keypair::generate(),
3357 HashSet::new(),
3358 BandwidthSink::new(),
3359 8usize,
3360 ConnectionLimitsConfig::default(),
3361 );
3362
3363 {
3365 let address = Multiaddr::empty().with(Protocol::P2p(Multihash::from(PeerId::random())));
3366 match manager.dial_address(address.clone()).await {
3367 Err(Error::TransportNotSupported(dial_address)) => {
3368 assert_eq!(dial_address, address);
3369 }
3370 _ => panic!("invalid return value"),
3371 }
3372 }
3373
3374 {
3375 let address = Multiaddr::empty()
3377 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3378 .with(Protocol::Udp(8888))
3379 .with(Protocol::Utp)
3380 .with(Protocol::P2p(Multihash::from(PeerId::random())));
3381 match manager.dial_address(address.clone()).await {
3382 Err(Error::TransportNotSupported(dial_address)) => {
3383 assert_eq!(dial_address, address);
3384 }
3385 res => panic!("invalid return value: {res:?}"),
3386 }
3387 }
3388
3389 {
3391 let address = Multiaddr::empty()
3392 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3393 .with(Protocol::Sctp(8888))
3394 .with(Protocol::P2p(Multihash::from(PeerId::random())));
3395 match manager.dial_address(address.clone()).await {
3396 Err(Error::TransportNotSupported(dial_address)) => {
3397 assert_eq!(dial_address, address);
3398 }
3399 _ => panic!("invalid return value"),
3400 }
3401 }
3402
3403 {
3405 let address = Multiaddr::empty()
3406 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3407 .with(Protocol::Tcp(8888))
3408 .with(Protocol::Utp)
3409 .with(Protocol::P2p(Multihash::from(PeerId::random())));
3410 match manager.dial_address(address.clone()).await {
3411 Err(Error::TransportNotSupported(dial_address)) => {
3412 assert_eq!(dial_address, address);
3413 }
3414 _ => panic!("invalid return value"),
3415 }
3416 }
3417 }
3418
3419 #[tokio::test]
3420 async fn dial_address_peer_id_missing() {
3421 let (mut manager, _handle) = TransportManager::new(
3422 Keypair::generate(),
3423 HashSet::new(),
3424 BandwidthSink::new(),
3425 8usize,
3426 ConnectionLimitsConfig::default(),
3427 );
3428
3429 async fn call_manager(manager: &mut TransportManager, address: Multiaddr) {
3430 match manager.dial_address(address).await {
3431 Err(Error::AddressError(AddressError::PeerIdMissing)) => {}
3432 _ => panic!("invalid return value"),
3433 }
3434 }
3435
3436 {
3437 call_manager(
3438 &mut manager,
3439 Multiaddr::empty()
3440 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3441 .with(Protocol::Tcp(8888)),
3442 )
3443 .await;
3444 }
3445
3446 {
3447 call_manager(
3448 &mut manager,
3449 Multiaddr::empty()
3450 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3451 .with(Protocol::Tcp(8888))
3452 .with(Protocol::Wss(std::borrow::Cow::Owned("".to_string()))),
3453 )
3454 .await;
3455 }
3456
3457 {
3458 call_manager(
3459 &mut manager,
3460 Multiaddr::empty()
3461 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
3462 .with(Protocol::Udp(8888))
3463 .with(Protocol::QuicV1),
3464 )
3465 .await;
3466 }
3467 }
3468
3469 #[tokio::test]
3470 async fn inbound_connection_while_dialing() {
3471 let _ = tracing_subscriber::fmt()
3472 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3473 .try_init();
3474
3475 let (mut manager, _handle) = TransportManager::new(
3476 Keypair::generate(),
3477 HashSet::new(),
3478 BandwidthSink::new(),
3479 8usize,
3480 ConnectionLimitsConfig::default(),
3481 );
3482 let peer = PeerId::random();
3483 let dial_address = Multiaddr::empty()
3484 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
3485 .with(Protocol::Tcp(8888))
3486 .with(Protocol::P2p(
3487 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
3488 ));
3489
3490 let connection_id = ConnectionId::random();
3491 let transport = Box::new({
3492 let mut transport = DummyTransport::new();
3493 transport.inject_event(TransportEvent::ConnectionEstablished {
3494 peer,
3495 endpoint: Endpoint::listener(dial_address.clone(), connection_id),
3496 });
3497 transport
3498 });
3499 manager.register_transport(SupportedTransport::Tcp, transport);
3500 manager.add_known_address(
3501 peer,
3502 vec![Multiaddr::empty()
3503 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5)))
3504 .with(Protocol::Tcp(8888))
3505 .with(Protocol::P2p(Multihash::from(peer)))]
3506 .into_iter(),
3507 );
3508
3509 assert!(manager.dial(peer).await.is_ok());
3510 assert!(!manager.pending_connections.is_empty());
3511
3512 {
3513 let peers = manager.peers.read();
3514
3515 match peers.get(&peer) {
3516 Some(PeerContext {
3517 state: PeerState::Opening { .. },
3518 ..
3519 }) => {}
3520 state => panic!("invalid state for peer: {state:?}"),
3521 }
3522 }
3523
3524 match manager.next().await.unwrap() {
3525 TransportEvent::ConnectionEstablished {
3526 peer: event_peer,
3527 endpoint: event_endpoint,
3528 ..
3529 } => {
3530 assert_eq!(peer, event_peer);
3531 assert_eq!(
3532 event_endpoint,
3533 Endpoint::listener(dial_address.clone(), connection_id),
3534 );
3535 }
3536 event => panic!("invalid event: {event:?}"),
3537 }
3538 assert!(manager.pending_connections.is_empty());
3539
3540 let peers = manager.peers.read();
3541 match peers.get(&peer).unwrap() {
3542 PeerContext {
3543 state:
3544 PeerState::Connected {
3545 record,
3546 dial_record,
3547 },
3548 secondary_connection,
3549 addresses,
3550 } => {
3551 assert!(!addresses.contains(record.address()));
3552 assert!(dial_record.is_none());
3553 assert!(secondary_connection.is_none());
3554 assert_eq!(record.address(), &dial_address);
3555 assert_eq!(record.connection_id(), &Some(connection_id));
3556 }
3557 state => panic!("invalid peer state: {state:?}"),
3558 }
3559 }
3560
3561 #[tokio::test]
3562 async fn inbound_connection_for_same_address_while_dialing() {
3563 let _ = tracing_subscriber::fmt()
3564 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3565 .try_init();
3566
3567 let (mut manager, _handle) = TransportManager::new(
3568 Keypair::generate(),
3569 HashSet::new(),
3570 BandwidthSink::new(),
3571 8usize,
3572 ConnectionLimitsConfig::default(),
3573 );
3574 let peer = PeerId::random();
3575 let dial_address = Multiaddr::empty()
3576 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
3577 .with(Protocol::Tcp(8888))
3578 .with(Protocol::P2p(
3579 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
3580 ));
3581
3582 let connection_id = ConnectionId::random();
3583 let transport = Box::new({
3584 let mut transport = DummyTransport::new();
3585 transport.inject_event(TransportEvent::ConnectionEstablished {
3586 peer,
3587 endpoint: Endpoint::listener(dial_address.clone(), connection_id),
3588 });
3589 transport
3590 });
3591 manager.register_transport(SupportedTransport::Tcp, transport);
3592 manager.add_known_address(
3593 peer,
3594 vec![Multiaddr::empty()
3595 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
3596 .with(Protocol::Tcp(8888))
3597 .with(Protocol::P2p(Multihash::from(peer)))]
3598 .into_iter(),
3599 );
3600
3601 assert!(manager.dial(peer).await.is_ok());
3602 assert!(!manager.pending_connections.is_empty());
3603
3604 {
3605 let peers = manager.peers.read();
3606
3607 match peers.get(&peer) {
3608 Some(PeerContext {
3609 state: PeerState::Opening { .. },
3610 ..
3611 }) => {}
3612 state => panic!("invalid state for peer: {state:?}"),
3613 }
3614 }
3615
3616 match manager.next().await.unwrap() {
3617 TransportEvent::ConnectionEstablished {
3618 peer: event_peer,
3619 endpoint: event_endpoint,
3620 ..
3621 } => {
3622 assert_eq!(peer, event_peer);
3623 assert_eq!(
3624 event_endpoint,
3625 Endpoint::listener(dial_address.clone(), connection_id),
3626 );
3627 }
3628 event => panic!("invalid event: {event:?}"),
3629 }
3630 assert!(manager.pending_connections.is_empty());
3631
3632 let peers = manager.peers.read();
3633 match peers.get(&peer).unwrap() {
3634 PeerContext {
3635 state:
3636 PeerState::Connected {
3637 record,
3638 dial_record,
3639 },
3640 secondary_connection,
3641 addresses,
3642 } => {
3643 assert!(addresses.is_empty());
3644 assert!(dial_record.is_none());
3645 assert!(secondary_connection.is_none());
3646 assert_eq!(record.address(), &dial_address);
3647 assert_eq!(record.connection_id(), &Some(connection_id));
3648 }
3649 state => panic!("invalid peer state: {state:?}"),
3650 }
3651 }
3652
3653 #[tokio::test]
3654 async fn manager_limits_incoming_connections() {
3655 let _ = tracing_subscriber::fmt()
3656 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3657 .try_init();
3658
3659 let (mut manager, _handle) = TransportManager::new(
3660 Keypair::generate(),
3661 HashSet::new(),
3662 BandwidthSink::new(),
3663 8usize,
3664 ConnectionLimitsConfig::default()
3665 .max_incoming_connections(Some(3))
3666 .max_outgoing_connections(Some(2)),
3667 );
3668 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
3670
3671 let peer = PeerId::random();
3672 let second_peer = PeerId::random();
3673
3674 let (first_addr, first_connection_id) = setup_dial_addr(peer, 0);
3676 let (second_addr, second_connection_id) = setup_dial_addr(second_peer, 1);
3677 let (_, third_connection_id) = setup_dial_addr(peer, 2);
3678 let (_, remote_connection_id) = setup_dial_addr(peer, 3);
3679
3680 let result = manager
3682 .on_connection_established(
3683 peer,
3684 &Endpoint::listener(first_addr.clone(), first_connection_id),
3685 )
3686 .unwrap();
3687 assert_eq!(result, ConnectionEstablishedResult::Accept);
3688
3689 let result = manager
3691 .on_connection_established(
3692 peer,
3693 &Endpoint::listener(first_addr.clone(), second_connection_id),
3694 )
3695 .unwrap();
3696 assert_eq!(result, ConnectionEstablishedResult::Accept);
3697
3698 let result = manager
3700 .on_connection_established(
3701 second_peer,
3702 &Endpoint::listener(second_addr.clone(), third_connection_id),
3703 )
3704 .unwrap();
3705 assert_eq!(result, ConnectionEstablishedResult::Accept);
3706
3707 let result = manager
3709 .on_connection_established(
3710 second_peer,
3711 &Endpoint::listener(second_addr.clone(), remote_connection_id),
3712 )
3713 .unwrap();
3714 assert_eq!(result, ConnectionEstablishedResult::Reject);
3715
3716 let _ = manager.on_connection_closed(peer, first_connection_id).unwrap();
3718
3719 let result = manager
3721 .on_connection_established(
3722 second_peer,
3723 &Endpoint::listener(second_addr.clone(), remote_connection_id),
3724 )
3725 .unwrap();
3726 assert_eq!(result, ConnectionEstablishedResult::Accept);
3727 }
3728
3729 #[tokio::test]
3730 async fn manager_limits_outbound_connections() {
3731 let _ = tracing_subscriber::fmt()
3732 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3733 .try_init();
3734
3735 let (mut manager, _handle) = TransportManager::new(
3736 Keypair::generate(),
3737 HashSet::new(),
3738 BandwidthSink::new(),
3739 8usize,
3740 ConnectionLimitsConfig::default()
3741 .max_incoming_connections(Some(3))
3742 .max_outgoing_connections(Some(2)),
3743 );
3744 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
3746
3747 let peer = PeerId::random();
3748 let second_peer = PeerId::random();
3749 let third_peer = PeerId::random();
3750
3751 let (first_addr, first_connection_id) = setup_dial_addr(peer, 0);
3753 let (second_addr, second_connection_id) = setup_dial_addr(second_peer, 1);
3754 let (third_addr, third_connection_id) = setup_dial_addr(third_peer, 2);
3755
3756 manager.dial_address(first_addr.clone()).await.unwrap();
3758
3759 manager.dial_address(second_addr.clone()).await.unwrap();
3761
3762 manager.dial_address(third_addr.clone()).await.unwrap();
3764
3765 let result = manager
3766 .on_connection_established(
3767 peer,
3768 &Endpoint::dialer(first_addr.clone(), first_connection_id),
3769 )
3770 .unwrap();
3771
3772 assert_eq!(result, ConnectionEstablishedResult::Accept);
3773
3774 let result = manager
3775 .on_connection_established(
3776 second_peer,
3777 &Endpoint::dialer(second_addr.clone(), second_connection_id),
3778 )
3779 .unwrap();
3780 assert_eq!(result, ConnectionEstablishedResult::Accept);
3781
3782 let result = manager
3784 .on_connection_established(
3785 third_peer,
3786 &Endpoint::dialer(third_addr.clone(), third_connection_id),
3787 )
3788 .unwrap();
3789 assert_eq!(result, ConnectionEstablishedResult::Reject);
3790
3791 let result = manager.dial(peer).await.unwrap_err();
3797 assert!(std::matches!(
3798 result,
3799 Error::ConnectionLimit(limits::ConnectionLimitsError::MaxOutgoingConnectionsExceeded)
3800 ));
3801 let result = manager.dial_address(first_addr.clone()).await.unwrap_err();
3802 assert!(std::matches!(
3803 result,
3804 Error::ConnectionLimit(limits::ConnectionLimitsError::MaxOutgoingConnectionsExceeded)
3805 ));
3806
3807 let _ = manager.on_connection_closed(peer, first_connection_id).unwrap();
3809 manager.dial_address(first_addr.clone()).await.unwrap();
3811
3812 let result = manager
3813 .on_connection_established(peer, &Endpoint::dialer(first_addr, first_connection_id))
3814 .unwrap();
3815 assert_eq!(result, ConnectionEstablishedResult::Accept);
3816 }
3817
3818 #[tokio::test]
3819 async fn reject_unknown_secondary_connections_with_different_connection_ids() {
3820 let _ = tracing_subscriber::fmt()
3821 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3822 .try_init();
3823
3824 let (mut manager, _handle) = TransportManager::new(
3825 Keypair::generate(),
3826 HashSet::new(),
3827 BandwidthSink::new(),
3828 8usize,
3829 ConnectionLimitsConfig::default(),
3830 );
3831 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
3832
3833 let peer = PeerId::random();
3835 let (first_addr, first_connection_id) = setup_dial_addr(peer, 0);
3836 let second_connection_id = ConnectionId::from(1);
3837 let different_connection_id = ConnectionId::from(2);
3838
3839 {
3841 let mut peers = manager.peers.write();
3842
3843 let state = PeerState::Connected {
3844 record: AddressRecord::new(&peer, first_addr.clone(), 0, Some(first_connection_id)),
3845 dial_record: Some(AddressRecord::new(
3846 &peer,
3847 first_addr.clone(),
3848 0,
3849 Some(second_connection_id),
3850 )),
3851 };
3852
3853 let peer_context = PeerContext {
3854 state,
3855 secondary_connection: None,
3856 addresses: AddressStore::from_iter(vec![first_addr.clone()].into_iter()),
3857 };
3858
3859 peers.insert(peer, peer_context);
3860 }
3861
3862 let result = manager
3864 .on_connection_established(
3865 peer,
3866 &Endpoint::dialer(first_addr.clone(), different_connection_id),
3867 )
3868 .unwrap();
3869 assert_eq!(result, ConnectionEstablishedResult::Reject);
3870 }
3871
3872 #[tokio::test]
3873 async fn guard_against_secondary_connections_with_different_connection_ids() {
3874 let _ = tracing_subscriber::fmt()
3876 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
3877 .try_init();
3878
3879 let (mut manager, _handle) = TransportManager::new(
3880 Keypair::generate(),
3881 HashSet::new(),
3882 BandwidthSink::new(),
3883 8usize,
3884 ConnectionLimitsConfig::default(),
3885 );
3886 manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new()));
3887
3888 let peer = PeerId::random();
3890
3891 let setup_dial_addr = |connection_id: u16| {
3892 let dial_address = Multiaddr::empty()
3893 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
3894 .with(Protocol::Tcp(8888 + connection_id))
3895 .with(Protocol::P2p(
3896 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
3897 ));
3898 let connection_id = ConnectionId::from(connection_id as usize);
3899
3900 (dial_address, connection_id)
3901 };
3902
3903 let (first_addr, first_connection_id) = setup_dial_addr(0);
3905 let (second_addr, _second_connection_id) = setup_dial_addr(1);
3906 let (remote_addr, remote_connection_id) = setup_dial_addr(2);
3907
3908 manager.dial_address(first_addr.clone()).await.unwrap();
3910 {
3911 let peers = manager.peers.read();
3912 let peer_context = peers.get(&peer).unwrap();
3913 match &peer_context.state {
3914 PeerState::Dialing { record } => {
3915 assert_eq!(record.address(), &first_addr);
3916 }
3917 state => panic!("invalid state: {state:?}"),
3918 }
3919 }
3920
3921 let result = manager
3923 .on_connection_established(
3924 peer,
3925 &Endpoint::listener(remote_addr.clone(), remote_connection_id),
3926 )
3927 .unwrap();
3928 assert_eq!(result, ConnectionEstablishedResult::Accept);
3929 {
3930 let peers = manager.peers.read();
3931 let peer_context = peers.get(&peer).unwrap();
3932 match &peer_context.state {
3933 PeerState::Connected {
3934 record,
3935 dial_record,
3936 } => {
3937 assert_eq!(record.address(), &remote_addr);
3938 assert_eq!(record.connection_id(), &Some(remote_connection_id));
3939
3940 let dial_record = dial_record.as_ref().unwrap();
3941 assert_eq!(dial_record.address(), &first_addr);
3942 assert_eq!(dial_record.connection_id(), &Some(first_connection_id))
3943 }
3944 state => panic!("invalid state: {state:?}"),
3945 }
3946 }
3947
3948 let event = manager.on_connection_closed(peer, remote_connection_id).unwrap().unwrap();
3950 match event {
3951 TransportEvent::ConnectionClosed {
3952 peer: event_peer,
3953 connection_id: event_connection_id,
3954 } => {
3955 assert_eq!(peer, event_peer);
3956 assert_eq!(event_connection_id, remote_connection_id);
3957 }
3958 event => panic!("invalid event: {event:?}"),
3959 }
3960 {
3961 let peers = manager.peers.read();
3962 let peer_context = peers.get(&peer).unwrap();
3963 match &peer_context.state {
3964 PeerState::Disconnected { dial_record } => {
3965 let dial_record = dial_record.as_ref().unwrap();
3966 assert_eq!(dial_record.address(), &first_addr);
3967 assert_eq!(dial_record.connection_id(), &Some(first_connection_id));
3968 }
3969 state => panic!("invalid state: {state:?}"),
3970 }
3971 }
3972
3973 manager.dial_address(second_addr.clone()).await.unwrap();
3975 {
3977 let peers = manager.peers.read();
3978 let peer_context = peers.get(&peer).unwrap();
3979 match &peer_context.state {
3980 PeerState::Disconnected { dial_record } => {
3981 let dial_record = dial_record.as_ref().unwrap();
3982 assert_eq!(dial_record.address(), &first_addr);
3983 assert_eq!(dial_record.connection_id(), &Some(first_connection_id));
3984 }
3985 state => panic!("invalid state: {state:?}"),
3986 }
3987 }
3988
3989 let result = manager
3991 .on_connection_established(
3992 peer,
3993 &Endpoint::listener(remote_addr.clone(), remote_connection_id),
3994 )
3995 .unwrap();
3996 assert_eq!(result, ConnectionEstablishedResult::Accept);
3997 {
3998 let peers = manager.peers.read();
3999 let peer_context = peers.get(&peer).unwrap();
4000 match &peer_context.state {
4001 PeerState::Connected {
4002 record,
4003 dial_record,
4004 } => {
4005 assert_eq!(record.address(), &remote_addr);
4006 assert_eq!(record.connection_id(), &Some(remote_connection_id));
4007
4008 let dial_record = dial_record.as_ref().unwrap();
4010 assert_eq!(dial_record.address(), &first_addr);
4011 assert_eq!(dial_record.connection_id(), &Some(first_connection_id));
4012 }
4013 state => panic!("invalid state: {state:?}"),
4014 }
4015 }
4016
4017 let result = manager
4019 .on_connection_established(
4020 peer,
4021 &Endpoint::dialer(first_addr.clone(), first_connection_id),
4022 )
4023 .unwrap();
4024 assert_eq!(result, ConnectionEstablishedResult::Accept);
4025 }
4026
4027 #[tokio::test]
4028 async fn do_not_overwrite_dial_addresses() {
4029 let _ = tracing_subscriber::fmt()
4030 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
4031 .try_init();
4032
4033 let (mut manager, _handle) = TransportManager::new(
4034 Keypair::generate(),
4035 HashSet::new(),
4036 BandwidthSink::new(),
4037 8usize,
4038 ConnectionLimitsConfig::default(),
4039 );
4040 let peer = PeerId::random();
4041 let dial_address = Multiaddr::empty()
4042 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
4043 .with(Protocol::Tcp(8888))
4044 .with(Protocol::P2p(
4045 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4046 ));
4047
4048 let connection_id = ConnectionId::from(0);
4049 let transport = Box::new({
4050 let mut transport = DummyTransport::new();
4051 transport.inject_event(TransportEvent::ConnectionEstablished {
4052 peer,
4053 endpoint: Endpoint::listener(dial_address.clone(), connection_id),
4054 });
4055 transport
4056 });
4057 manager.register_transport(SupportedTransport::Tcp, transport);
4058
4059 manager.dial_address(dial_address.clone()).await.unwrap();
4061 {
4063 let peers = manager.peers.read();
4064 let peer_context = peers.get(&peer).unwrap();
4065 match &peer_context.state {
4066 PeerState::Dialing { record } => {
4067 assert_eq!(record.address(), &dial_address);
4068 }
4069 state => panic!("invalid state: {state:?}"),
4070 }
4071
4072 assert!(!peer_context.addresses.contains(&dial_address));
4074 }
4075
4076 let second_address = Multiaddr::empty()
4077 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
4078 .with(Protocol::Tcp(8889))
4079 .with(Protocol::P2p(
4080 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4081 ));
4082
4083 manager.dial_address(second_address.clone()).await.unwrap();
4085 {
4087 let peers = manager.peers.read();
4088 let peer_context = peers.get(&peer).unwrap();
4089 match &peer_context.state {
4090 PeerState::Dialing { record } => {
4092 assert_eq!(record.address(), &dial_address);
4093 }
4094 state => panic!("invalid state: {state:?}"),
4095 }
4096
4097 assert!(!peer_context.addresses.contains(&dial_address));
4098 assert!(!peer_context.addresses.contains(&second_address));
4099 }
4100 }
4101
4102 #[cfg(feature = "websocket")]
4103 #[tokio::test]
4104 async fn opening_errors_are_reported() {
4105 let _ = tracing_subscriber::fmt()
4106 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
4107 .try_init();
4108
4109 let (mut manager, _handle) = TransportManager::new(
4110 Keypair::generate(),
4111 HashSet::new(),
4112 BandwidthSink::new(),
4113 8usize,
4114 ConnectionLimitsConfig::default(),
4115 );
4116 let peer = PeerId::random();
4117 let connection_id = ConnectionId::from(0);
4118
4119 let dial_address_tcp = Multiaddr::empty()
4121 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
4122 .with(Protocol::Tcp(8888))
4123 .with(Protocol::P2p(
4124 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4125 ));
4126 let transport = Box::new({
4127 let mut transport = DummyTransport::new();
4128 transport.inject_event(TransportEvent::OpenFailure {
4129 connection_id,
4130 errors: vec![(dial_address_tcp.clone(), DialError::Timeout)],
4131 });
4132 transport
4133 });
4134 manager.register_transport(SupportedTransport::Tcp, transport);
4135 manager.add_known_address(
4136 peer,
4137 vec![Multiaddr::empty()
4138 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5)))
4139 .with(Protocol::Tcp(8888))
4140 .with(Protocol::P2p(Multihash::from(peer)))]
4141 .into_iter(),
4142 );
4143
4144 let dial_address_ws = Multiaddr::empty()
4146 .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
4147 .with(Protocol::Tcp(8889))
4148 .with(Protocol::Ws(Cow::Borrowed("/")))
4149 .with(Protocol::P2p(
4150 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4151 ));
4152
4153 let transport = Box::new({
4154 let mut transport = DummyTransport::new();
4155 transport.inject_event(TransportEvent::OpenFailure {
4156 connection_id,
4157 errors: vec![(dial_address_ws.clone(), DialError::Timeout)],
4158 });
4159 transport
4160 });
4161 manager.register_transport(SupportedTransport::WebSocket, transport);
4162 manager.add_known_address(
4163 peer,
4164 vec![Multiaddr::empty()
4165 .with(Protocol::Ip4(Ipv4Addr::new(192, 168, 1, 5)))
4166 .with(Protocol::Tcp(8889))
4167 .with(Protocol::Ws(Cow::Borrowed("/")))
4168 .with(Protocol::P2p(
4169 Multihash::from_bytes(&peer.to_bytes()).unwrap(),
4170 ))]
4171 .into_iter(),
4172 );
4173
4174 assert!(manager.dial(peer).await.is_ok());
4176 assert!(!manager.pending_connections.is_empty());
4177
4178 {
4179 let peers = manager.peers.read();
4180
4181 match peers.get(&peer) {
4182 Some(PeerContext {
4183 state: PeerState::Opening { .. },
4184 ..
4185 }) => {}
4186 state => panic!("invalid state for peer: {state:?}"),
4187 }
4188 }
4189
4190 match manager.next().await.unwrap() {
4191 TransportEvent::OpenFailure {
4192 connection_id,
4193 errors,
4194 } => {
4195 assert_eq!(connection_id, ConnectionId::from(0));
4196 assert_eq!(errors.len(), 2);
4197 let tcp = errors.iter().find(|(addr, _)| addr == &dial_address_tcp).unwrap();
4198 assert!(std::matches!(tcp.1, DialError::Timeout));
4199
4200 let ws = errors.iter().find(|(addr, _)| addr == &dial_address_ws).unwrap();
4201 assert!(std::matches!(ws.1, DialError::Timeout));
4202 }
4203 event => panic!("invalid event: {event:?}"),
4204 }
4205 assert!(manager.pending_connections.is_empty());
4206 assert!(manager.opening_errors.is_empty());
4207 }
4208}