1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
57
58mod connection;
59mod executor;
60mod stream;
61mod stream_protocol;
62#[cfg(test)]
63mod test;
64mod upgrade;
65
66pub mod behaviour;
67pub mod dial_opts;
68pub mod dummy;
69pub mod handler;
70mod listen_opts;
71mod translation;
72
73#[doc(hidden)]
75pub mod derive_prelude {
76 pub use crate::behaviour::AddressChange;
77 pub use crate::behaviour::ConnectionClosed;
78 pub use crate::behaviour::ConnectionEstablished;
79 pub use crate::behaviour::DialFailure;
80 pub use crate::behaviour::ExpiredListenAddr;
81 pub use crate::behaviour::ExternalAddrConfirmed;
82 pub use crate::behaviour::ExternalAddrExpired;
83 pub use crate::behaviour::FromSwarm;
84 pub use crate::behaviour::ListenFailure;
85 pub use crate::behaviour::ListenerClosed;
86 pub use crate::behaviour::ListenerError;
87 pub use crate::behaviour::NewExternalAddrCandidate;
88 pub use crate::behaviour::NewExternalAddrOfPeer;
89 pub use crate::behaviour::NewListenAddr;
90 pub use crate::behaviour::NewListener;
91 pub use crate::connection::ConnectionId;
92 pub use crate::ConnectionDenied;
93 pub use crate::ConnectionHandler;
94 pub use crate::ConnectionHandlerSelect;
95 pub use crate::DialError;
96 pub use crate::NetworkBehaviour;
97 pub use crate::THandler;
98 pub use crate::THandlerInEvent;
99 pub use crate::THandlerOutEvent;
100 pub use crate::ToSwarm;
101 pub use either::Either;
102 pub use futures::prelude as futures;
103 pub use libp2p_core::transport::{ListenerId, PortUse};
104 pub use libp2p_core::ConnectedPoint;
105 pub use libp2p_core::Endpoint;
106 pub use libp2p_core::Multiaddr;
107 pub use libp2p_identity::PeerId;
108}
109
110pub use behaviour::{
111 AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
112 ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
113 ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
114 NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
115};
116pub use connection::pool::ConnectionCounters;
117pub use connection::{ConnectionError, ConnectionId, SupportedProtocols};
118pub use executor::Executor;
119pub use handler::{
120 ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
121 OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
122};
123#[cfg(feature = "macros")]
124pub use libp2p_swarm_derive::NetworkBehaviour;
125pub use listen_opts::ListenOpts;
126pub use stream::Stream;
127pub use stream_protocol::{InvalidProtocol, StreamProtocol};
128
129use crate::behaviour::ExternalAddrConfirmed;
130use crate::handler::UpgradeInfoSend;
131use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
132use connection::IncomingInfo;
133use connection::{
134 PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
135};
136use dial_opts::{DialOpts, PeerCondition};
137use futures::{prelude::*, stream::FusedStream};
138
139use libp2p_core::{
140 connection::ConnectedPoint,
141 muxing::StreamMuxerBox,
142 transport::{self, ListenerId, TransportError, TransportEvent},
143 Multiaddr, Transport,
144};
145use libp2p_identity::PeerId;
146
147use smallvec::SmallVec;
148use std::collections::{HashMap, HashSet, VecDeque};
149use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
150use std::time::Duration;
151use std::{
152 error, fmt, io,
153 pin::Pin,
154 task::{Context, Poll},
155};
156use tracing::Instrument;
157#[doc(hidden)]
158pub use translation::_address_translation;
159
160type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
162
163pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
166
167pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
170
171pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
173
174#[derive(Debug)]
176#[non_exhaustive]
177pub enum SwarmEvent<TBehaviourOutEvent> {
178 Behaviour(TBehaviourOutEvent),
180 ConnectionEstablished {
182 peer_id: PeerId,
184 connection_id: ConnectionId,
186 endpoint: ConnectedPoint,
188 num_established: NonZeroU32,
191 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
195 established_in: std::time::Duration,
197 },
198 ConnectionClosed {
201 peer_id: PeerId,
203 connection_id: ConnectionId,
205 endpoint: ConnectedPoint,
207 num_established: u32,
209 cause: Option<ConnectionError>,
212 },
213 IncomingConnection {
219 connection_id: ConnectionId,
221 local_addr: Multiaddr,
225 send_back_addr: Multiaddr,
227 },
228 IncomingConnectionError {
233 connection_id: ConnectionId,
235 local_addr: Multiaddr,
239 send_back_addr: Multiaddr,
241 error: ListenError,
243 },
244 OutgoingConnectionError {
246 connection_id: ConnectionId,
248 peer_id: Option<PeerId>,
250 error: DialError,
252 },
253 NewListenAddr {
255 listener_id: ListenerId,
257 address: Multiaddr,
259 },
260 ExpiredListenAddr {
262 listener_id: ListenerId,
264 address: Multiaddr,
266 },
267 ListenerClosed {
269 listener_id: ListenerId,
271 addresses: Vec<Multiaddr>,
275 reason: Result<(), io::Error>,
278 },
279 ListenerError {
281 listener_id: ListenerId,
283 error: io::Error,
285 },
286 Dialing {
294 peer_id: Option<PeerId>,
296
297 connection_id: ConnectionId,
299 },
300 NewExternalAddrCandidate { address: Multiaddr },
302 ExternalAddrConfirmed { address: Multiaddr },
304 ExternalAddrExpired { address: Multiaddr },
306 NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
308}
309
310impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
311 #[allow(clippy::result_large_err)]
313 pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
314 match self {
315 SwarmEvent::Behaviour(inner) => Ok(inner),
316 other => Err(other),
317 }
318 }
319}
320
321pub struct Swarm<TBehaviour>
326where
327 TBehaviour: NetworkBehaviour,
328{
329 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
331
332 pool: Pool<THandler<TBehaviour>>,
334
335 local_peer_id: PeerId,
337
338 behaviour: TBehaviour,
341
342 supported_protocols: SmallVec<[Vec<u8>; 16]>,
344
345 confirmed_external_addr: HashSet<Multiaddr>,
346
347 listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
349
350 pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
354
355 pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
356}
357
358impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
359
360impl<TBehaviour> Swarm<TBehaviour>
361where
362 TBehaviour: NetworkBehaviour,
363{
364 pub fn new(
367 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
368 behaviour: TBehaviour,
369 local_peer_id: PeerId,
370 config: Config,
371 ) -> Self {
372 tracing::info!(%local_peer_id);
373
374 Swarm {
375 local_peer_id,
376 transport,
377 pool: Pool::new(local_peer_id, config.pool_config),
378 behaviour,
379 supported_protocols: Default::default(),
380 confirmed_external_addr: Default::default(),
381 listened_addrs: HashMap::new(),
382 pending_handler_event: None,
383 pending_swarm_events: VecDeque::default(),
384 }
385 }
386
387 pub fn network_info(&self) -> NetworkInfo {
389 let num_peers = self.pool.num_peers();
390 let connection_counters = self.pool.counters().clone();
391 NetworkInfo {
392 num_peers,
393 connection_counters,
394 }
395 }
396
397 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
403 let opts = ListenOpts::new(addr);
404 let id = opts.listener_id();
405 self.add_listener(opts)?;
406 Ok(id)
407 }
408
409 pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
414 self.transport.remove_listener(listener_id)
415 }
416
417 pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
445 let dial_opts = opts.into();
446
447 let peer_id = dial_opts.get_peer_id();
448 let condition = dial_opts.peer_condition();
449 let connection_id = dial_opts.connection_id();
450
451 let should_dial = match (condition, peer_id) {
452 (_, None) => true,
453 (PeerCondition::Always, _) => true,
454 (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
455 (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
456 (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
457 !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
458 }
459 };
460
461 if !should_dial {
462 let e = DialError::DialPeerConditionFalse(condition);
463
464 self.behaviour
465 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
466 peer_id,
467 error: &e,
468 connection_id,
469 }));
470
471 return Err(e);
472 }
473
474 let addresses = {
475 let mut addresses_from_opts = dial_opts.get_addresses();
476
477 match self.behaviour.handle_pending_outbound_connection(
478 connection_id,
479 peer_id,
480 addresses_from_opts.as_slice(),
481 dial_opts.role_override(),
482 ) {
483 Ok(addresses) => {
484 if dial_opts.extend_addresses_through_behaviour() {
485 addresses_from_opts.extend(addresses)
486 } else {
487 let num_addresses = addresses.len();
488
489 if num_addresses > 0 {
490 tracing::debug!(
491 connection=%connection_id,
492 discarded_addresses_count=%num_addresses,
493 "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
494 )
495 }
496 }
497 }
498 Err(cause) => {
499 let error = DialError::Denied { cause };
500
501 self.behaviour
502 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
503 peer_id,
504 error: &error,
505 connection_id,
506 }));
507
508 return Err(error);
509 }
510 }
511
512 let mut unique_addresses = HashSet::new();
513 addresses_from_opts.retain(|addr| {
514 !self.listened_addrs.values().flatten().any(|a| a == addr)
515 && unique_addresses.insert(addr.clone())
516 });
517
518 if addresses_from_opts.is_empty() {
519 let error = DialError::NoAddresses;
520 self.behaviour
521 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
522 peer_id,
523 error: &error,
524 connection_id,
525 }));
526 return Err(error);
527 };
528
529 addresses_from_opts
530 };
531
532 let dials = addresses
533 .into_iter()
534 .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
535 Ok(address) => {
536 let dial = self.transport.dial(
537 address.clone(),
538 transport::DialOpts {
539 role: dial_opts.role_override(),
540 port_use: dial_opts.port_use(),
541 },
542 );
543 let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
544 span.follows_from(tracing::Span::current());
545 match dial {
546 Ok(fut) => fut
547 .map(|r| (address, r.map_err(TransportError::Other)))
548 .instrument(span)
549 .boxed(),
550 Err(err) => futures::future::ready((address, Err(err))).boxed(),
551 }
552 }
553 Err(address) => futures::future::ready((
554 address.clone(),
555 Err(TransportError::MultiaddrNotSupported(address)),
556 ))
557 .boxed(),
558 })
559 .collect();
560
561 self.pool.add_outgoing(
562 dials,
563 peer_id,
564 dial_opts.role_override(),
565 dial_opts.port_use(),
566 dial_opts.dial_concurrency_override(),
567 connection_id,
568 );
569
570 Ok(())
571 }
572
573 pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
575 self.listened_addrs.values().flatten()
576 }
577
578 pub fn local_peer_id(&self) -> &PeerId {
580 &self.local_peer_id
581 }
582
583 pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
585 self.confirmed_external_addr.iter()
586 }
587
588 fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
589 let addr = opts.address();
590 let listener_id = opts.listener_id();
591
592 if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
593 self.behaviour
594 .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
595 listener_id,
596 err: &e,
597 }));
598
599 return Err(e);
600 }
601
602 self.behaviour
603 .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
604 listener_id,
605 }));
606
607 Ok(())
608 }
609
610 pub fn add_external_address(&mut self, a: Multiaddr) {
615 self.behaviour
616 .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
617 addr: &a,
618 }));
619 self.confirmed_external_addr.insert(a);
620 }
621
622 pub fn remove_external_address(&mut self, addr: &Multiaddr) {
626 self.behaviour
627 .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
628 self.confirmed_external_addr.remove(addr);
629 }
630
631 pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
635 self.behaviour
636 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
637 peer_id,
638 addr: &addr,
639 }))
640 }
641
642 #[allow(clippy::result_unit_err)]
649 pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
650 let was_connected = self.pool.is_connected(peer_id);
651 self.pool.disconnect(peer_id);
652
653 if was_connected {
654 Ok(())
655 } else {
656 Err(())
657 }
658 }
659
660 pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
670 if let Some(established) = self.pool.get_established(connection_id) {
671 established.start_close();
672 return true;
673 }
674
675 false
676 }
677
678 pub fn is_connected(&self, peer_id: &PeerId) -> bool {
680 self.pool.is_connected(*peer_id)
681 }
682
683 pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
685 self.pool.iter_connected()
686 }
687
688 pub fn behaviour(&self) -> &TBehaviour {
690 &self.behaviour
691 }
692
693 pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
695 &mut self.behaviour
696 }
697
698 fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
699 match event {
700 PoolEvent::ConnectionEstablished {
701 peer_id,
702 id,
703 endpoint,
704 connection,
705 concurrent_dial_errors,
706 established_in,
707 } => {
708 let handler = match endpoint.clone() {
709 ConnectedPoint::Dialer {
710 address,
711 role_override,
712 port_use,
713 } => {
714 match self.behaviour.handle_established_outbound_connection(
715 id,
716 peer_id,
717 &address,
718 role_override,
719 port_use,
720 ) {
721 Ok(handler) => handler,
722 Err(cause) => {
723 let dial_error = DialError::Denied { cause };
724 self.behaviour.on_swarm_event(FromSwarm::DialFailure(
725 DialFailure {
726 connection_id: id,
727 error: &dial_error,
728 peer_id: Some(peer_id),
729 },
730 ));
731
732 self.pending_swarm_events.push_back(
733 SwarmEvent::OutgoingConnectionError {
734 peer_id: Some(peer_id),
735 connection_id: id,
736 error: dial_error,
737 },
738 );
739 return;
740 }
741 }
742 }
743 ConnectedPoint::Listener {
744 local_addr,
745 send_back_addr,
746 } => {
747 match self.behaviour.handle_established_inbound_connection(
748 id,
749 peer_id,
750 &local_addr,
751 &send_back_addr,
752 ) {
753 Ok(handler) => handler,
754 Err(cause) => {
755 let listen_error = ListenError::Denied { cause };
756 self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
757 ListenFailure {
758 local_addr: &local_addr,
759 send_back_addr: &send_back_addr,
760 error: &listen_error,
761 connection_id: id,
762 peer_id: Some(peer_id),
763 },
764 ));
765
766 self.pending_swarm_events.push_back(
767 SwarmEvent::IncomingConnectionError {
768 connection_id: id,
769 send_back_addr,
770 local_addr,
771 error: listen_error,
772 },
773 );
774 return;
775 }
776 }
777 }
778 };
779
780 let supported_protocols = handler
781 .listen_protocol()
782 .upgrade()
783 .protocol_info()
784 .map(|p| p.as_ref().as_bytes().to_vec())
785 .collect();
786 let other_established_connection_ids = self
787 .pool
788 .iter_established_connections_of_peer(&peer_id)
789 .collect::<Vec<_>>();
790 let num_established = NonZeroU32::new(
791 u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
792 )
793 .expect("n + 1 is always non-zero; qed");
794
795 self.pool
796 .spawn_connection(id, peer_id, &endpoint, connection, handler);
797
798 tracing::debug!(
799 peer=%peer_id,
800 ?endpoint,
801 total_peers=%num_established,
802 "Connection established"
803 );
804 let failed_addresses = concurrent_dial_errors
805 .as_ref()
806 .map(|es| {
807 es.iter()
808 .map(|(a, _)| a)
809 .cloned()
810 .collect::<Vec<Multiaddr>>()
811 })
812 .unwrap_or_default();
813 self.behaviour
814 .on_swarm_event(FromSwarm::ConnectionEstablished(
815 behaviour::ConnectionEstablished {
816 peer_id,
817 connection_id: id,
818 endpoint: &endpoint,
819 failed_addresses: &failed_addresses,
820 other_established: other_established_connection_ids.len(),
821 },
822 ));
823 self.supported_protocols = supported_protocols;
824 self.pending_swarm_events
825 .push_back(SwarmEvent::ConnectionEstablished {
826 peer_id,
827 connection_id: id,
828 num_established,
829 endpoint,
830 concurrent_dial_errors,
831 established_in,
832 });
833 }
834 PoolEvent::PendingOutboundConnectionError {
835 id: connection_id,
836 error,
837 peer,
838 } => {
839 let error = error.into();
840
841 self.behaviour
842 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
843 peer_id: peer,
844 error: &error,
845 connection_id,
846 }));
847
848 if let Some(peer) = peer {
849 tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
850 } else {
851 tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
852 }
853
854 self.pending_swarm_events
855 .push_back(SwarmEvent::OutgoingConnectionError {
856 peer_id: peer,
857 connection_id,
858 error,
859 });
860 }
861 PoolEvent::PendingInboundConnectionError {
862 id,
863 send_back_addr,
864 local_addr,
865 error,
866 } => {
867 let error = error.into();
868
869 tracing::debug!("Incoming connection failed: {:?}", error);
870 self.behaviour
871 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
872 local_addr: &local_addr,
873 send_back_addr: &send_back_addr,
874 error: &error,
875 connection_id: id,
876 peer_id: None,
877 }));
878 self.pending_swarm_events
879 .push_back(SwarmEvent::IncomingConnectionError {
880 connection_id: id,
881 local_addr,
882 send_back_addr,
883 error,
884 });
885 }
886 PoolEvent::ConnectionClosed {
887 id,
888 connected,
889 error,
890 remaining_established_connection_ids,
891 ..
892 } => {
893 if let Some(error) = error.as_ref() {
894 tracing::debug!(
895 total_peers=%remaining_established_connection_ids.len(),
896 "Connection closed with error {:?}: {:?}",
897 error,
898 connected,
899 );
900 } else {
901 tracing::debug!(
902 total_peers=%remaining_established_connection_ids.len(),
903 "Connection closed: {:?}",
904 connected
905 );
906 }
907 let peer_id = connected.peer_id;
908 let endpoint = connected.endpoint;
909 let num_established =
910 u32::try_from(remaining_established_connection_ids.len()).unwrap();
911
912 self.behaviour
913 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
914 peer_id,
915 connection_id: id,
916 endpoint: &endpoint,
917 cause: error.as_ref(),
918 remaining_established: num_established as usize,
919 }));
920 self.pending_swarm_events
921 .push_back(SwarmEvent::ConnectionClosed {
922 peer_id,
923 connection_id: id,
924 endpoint,
925 cause: error,
926 num_established,
927 });
928 }
929 PoolEvent::ConnectionEvent { peer_id, id, event } => {
930 self.behaviour
931 .on_connection_handler_event(peer_id, id, event);
932 }
933 PoolEvent::AddressChange {
934 peer_id,
935 id,
936 new_endpoint,
937 old_endpoint,
938 } => {
939 self.behaviour
940 .on_swarm_event(FromSwarm::AddressChange(AddressChange {
941 peer_id,
942 connection_id: id,
943 old: &old_endpoint,
944 new: &new_endpoint,
945 }));
946 }
947 }
948 }
949
950 fn handle_transport_event(
951 &mut self,
952 event: TransportEvent<
953 <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
954 io::Error,
955 >,
956 ) {
957 match event {
958 TransportEvent::Incoming {
959 listener_id: _,
960 upgrade,
961 local_addr,
962 send_back_addr,
963 } => {
964 let connection_id = ConnectionId::next();
965
966 match self.behaviour.handle_pending_inbound_connection(
967 connection_id,
968 &local_addr,
969 &send_back_addr,
970 ) {
971 Ok(()) => {}
972 Err(cause) => {
973 let listen_error = ListenError::Denied { cause };
974
975 self.behaviour
976 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
977 local_addr: &local_addr,
978 send_back_addr: &send_back_addr,
979 error: &listen_error,
980 connection_id,
981 peer_id: None,
982 }));
983
984 self.pending_swarm_events
985 .push_back(SwarmEvent::IncomingConnectionError {
986 connection_id,
987 local_addr,
988 send_back_addr,
989 error: listen_error,
990 });
991 return;
992 }
993 }
994
995 self.pool.add_incoming(
996 upgrade,
997 IncomingInfo {
998 local_addr: &local_addr,
999 send_back_addr: &send_back_addr,
1000 },
1001 connection_id,
1002 );
1003
1004 self.pending_swarm_events
1005 .push_back(SwarmEvent::IncomingConnection {
1006 connection_id,
1007 local_addr,
1008 send_back_addr,
1009 })
1010 }
1011 TransportEvent::NewAddress {
1012 listener_id,
1013 listen_addr,
1014 } => {
1015 tracing::debug!(
1016 listener=?listener_id,
1017 address=%listen_addr,
1018 "New listener address"
1019 );
1020 let addrs = self.listened_addrs.entry(listener_id).or_default();
1021 if !addrs.contains(&listen_addr) {
1022 addrs.push(listen_addr.clone())
1023 }
1024 self.behaviour
1025 .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1026 listener_id,
1027 addr: &listen_addr,
1028 }));
1029 self.pending_swarm_events
1030 .push_back(SwarmEvent::NewListenAddr {
1031 listener_id,
1032 address: listen_addr,
1033 })
1034 }
1035 TransportEvent::AddressExpired {
1036 listener_id,
1037 listen_addr,
1038 } => {
1039 tracing::debug!(
1040 listener=?listener_id,
1041 address=%listen_addr,
1042 "Expired listener address"
1043 );
1044 if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1045 addrs.retain(|a| a != &listen_addr);
1046 }
1047 self.behaviour
1048 .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1049 listener_id,
1050 addr: &listen_addr,
1051 }));
1052 self.pending_swarm_events
1053 .push_back(SwarmEvent::ExpiredListenAddr {
1054 listener_id,
1055 address: listen_addr,
1056 })
1057 }
1058 TransportEvent::ListenerClosed {
1059 listener_id,
1060 reason,
1061 } => {
1062 tracing::debug!(
1063 listener=?listener_id,
1064 ?reason,
1065 "Listener closed"
1066 );
1067 let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1068 for addr in addrs.iter() {
1069 self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1070 ExpiredListenAddr { listener_id, addr },
1071 ));
1072 }
1073 self.behaviour
1074 .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1075 listener_id,
1076 reason: reason.as_ref().copied(),
1077 }));
1078 self.pending_swarm_events
1079 .push_back(SwarmEvent::ListenerClosed {
1080 listener_id,
1081 addresses: addrs.to_vec(),
1082 reason,
1083 })
1084 }
1085 TransportEvent::ListenerError { listener_id, error } => {
1086 self.behaviour
1087 .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1088 listener_id,
1089 err: &error,
1090 }));
1091 self.pending_swarm_events
1092 .push_back(SwarmEvent::ListenerError { listener_id, error })
1093 }
1094 }
1095 }
1096
1097 fn handle_behaviour_event(
1098 &mut self,
1099 event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1100 ) {
1101 match event {
1102 ToSwarm::GenerateEvent(event) => {
1103 self.pending_swarm_events
1104 .push_back(SwarmEvent::Behaviour(event));
1105 }
1106 ToSwarm::Dial { opts } => {
1107 let peer_id = opts.get_peer_id();
1108 let connection_id = opts.connection_id();
1109 if let Ok(()) = self.dial(opts) {
1110 self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1111 peer_id,
1112 connection_id,
1113 });
1114 }
1115 }
1116 ToSwarm::ListenOn { opts } => {
1117 let _ = self.add_listener(opts);
1119 }
1120 ToSwarm::RemoveListener { id } => {
1121 self.remove_listener(id);
1122 }
1123 ToSwarm::NotifyHandler {
1124 peer_id,
1125 handler,
1126 event,
1127 } => {
1128 assert!(self.pending_handler_event.is_none());
1129 let handler = match handler {
1130 NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1131 NotifyHandler::Any => {
1132 let ids = self
1133 .pool
1134 .iter_established_connections_of_peer(&peer_id)
1135 .collect();
1136 PendingNotifyHandler::Any(ids)
1137 }
1138 };
1139
1140 self.pending_handler_event = Some((peer_id, handler, event));
1141 }
1142 ToSwarm::NewExternalAddrCandidate(addr) => {
1143 self.behaviour
1144 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1145 NewExternalAddrCandidate { addr: &addr },
1146 ));
1147 self.pending_swarm_events
1148 .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1149 }
1150 ToSwarm::ExternalAddrConfirmed(addr) => {
1151 self.add_external_address(addr.clone());
1152 self.pending_swarm_events
1153 .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1154 }
1155 ToSwarm::ExternalAddrExpired(addr) => {
1156 self.remove_external_address(&addr);
1157 self.pending_swarm_events
1158 .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1159 }
1160 ToSwarm::CloseConnection {
1161 peer_id,
1162 connection,
1163 } => match connection {
1164 CloseConnection::One(connection_id) => {
1165 if let Some(conn) = self.pool.get_established(connection_id) {
1166 conn.start_close();
1167 }
1168 }
1169 CloseConnection::All => {
1170 self.pool.disconnect(peer_id);
1171 }
1172 },
1173 ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1174 self.behaviour
1175 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1176 peer_id,
1177 addr: &address,
1178 }));
1179 self.pending_swarm_events
1180 .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1181 }
1182 }
1183 }
1184
1185 #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1189 fn poll_next_event(
1190 mut self: Pin<&mut Self>,
1191 cx: &mut Context<'_>,
1192 ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1193 let this = &mut *self;
1196
1197 loop {
1207 if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1208 return Poll::Ready(swarm_event);
1209 }
1210
1211 match this.pending_handler_event.take() {
1212 Some((peer_id, handler, event)) => match handler {
1215 PendingNotifyHandler::One(conn_id) => {
1216 match this.pool.get_established(conn_id) {
1217 Some(conn) => match notify_one(conn, event, cx) {
1218 None => continue,
1219 Some(event) => {
1220 this.pending_handler_event = Some((peer_id, handler, event));
1221 }
1222 },
1223 None => continue,
1224 }
1225 }
1226 PendingNotifyHandler::Any(ids) => {
1227 match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1228 None => continue,
1229 Some((event, ids)) => {
1230 let handler = PendingNotifyHandler::Any(ids);
1231 this.pending_handler_event = Some((peer_id, handler, event));
1232 }
1233 }
1234 }
1235 },
1236 None => match this.behaviour.poll(cx) {
1238 Poll::Pending => {}
1239 Poll::Ready(behaviour_event) => {
1240 this.handle_behaviour_event(behaviour_event);
1241
1242 continue;
1243 }
1244 },
1245 }
1246
1247 match this.pool.poll(cx) {
1249 Poll::Pending => {}
1250 Poll::Ready(pool_event) => {
1251 this.handle_pool_event(pool_event);
1252 continue;
1253 }
1254 }
1255
1256 match Pin::new(&mut this.transport).poll(cx) {
1258 Poll::Pending => {}
1259 Poll::Ready(transport_event) => {
1260 this.handle_transport_event(transport_event);
1261 continue;
1262 }
1263 }
1264
1265 return Poll::Pending;
1266 }
1267 }
1268}
1269
1270enum PendingNotifyHandler {
1277 One(ConnectionId),
1278 Any(SmallVec<[ConnectionId; 10]>),
1279}
1280
1281fn notify_one<THandlerInEvent>(
1290 conn: &mut EstablishedConnection<THandlerInEvent>,
1291 event: THandlerInEvent,
1292 cx: &mut Context<'_>,
1293) -> Option<THandlerInEvent> {
1294 match conn.poll_ready_notify_handler(cx) {
1295 Poll::Pending => Some(event),
1296 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
1298 let _ = conn.notify_handler(event);
1300 None
1301 }
1302 }
1303}
1304
1305fn notify_any<THandler, TBehaviour>(
1316 ids: SmallVec<[ConnectionId; 10]>,
1317 pool: &mut Pool<THandler>,
1318 event: THandlerInEvent<TBehaviour>,
1319 cx: &mut Context<'_>,
1320) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1321where
1322 TBehaviour: NetworkBehaviour,
1323 THandler: ConnectionHandler<
1324 FromBehaviour = THandlerInEvent<TBehaviour>,
1325 ToBehaviour = THandlerOutEvent<TBehaviour>,
1326 >,
1327{
1328 let mut pending = SmallVec::new();
1329 let mut event = Some(event); for id in ids.into_iter() {
1331 if let Some(conn) = pool.get_established(id) {
1332 match conn.poll_ready_notify_handler(cx) {
1333 Poll::Pending => pending.push(id),
1334 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
1336 let e = event.take().expect("by (1),(2)");
1337 if let Err(e) = conn.notify_handler(e) {
1338 event = Some(e) } else {
1340 break;
1341 }
1342 }
1343 }
1344 }
1345 }
1346
1347 event.and_then(|e| {
1348 if !pending.is_empty() {
1349 Some((e, pending))
1350 } else {
1351 None
1352 }
1353 })
1354}
1355
1356impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1364where
1365 TBehaviour: NetworkBehaviour,
1366{
1367 type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1368
1369 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1370 self.as_mut().poll_next_event(cx).map(Some)
1371 }
1372}
1373
1374impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1376where
1377 TBehaviour: NetworkBehaviour,
1378{
1379 fn is_terminated(&self) -> bool {
1380 false
1381 }
1382}
1383
1384pub struct Config {
1385 pool_config: PoolConfig,
1386}
1387
1388impl Config {
1389 pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1392 Self {
1393 pool_config: PoolConfig::new(Some(Box::new(executor))),
1394 }
1395 }
1396
1397 #[doc(hidden)]
1398 pub fn without_executor() -> Self {
1400 Self {
1401 pool_config: PoolConfig::new(None),
1402 }
1403 }
1404
1405 #[cfg(feature = "wasm-bindgen")]
1415 pub fn with_wasm_executor() -> Self {
1416 Self::with_executor(crate::executor::WasmBindgenExecutor)
1417 }
1418
1419 #[cfg(all(
1421 feature = "tokio",
1422 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1423 ))]
1424 pub fn with_tokio_executor() -> Self {
1425 Self::with_executor(crate::executor::TokioExecutor)
1426 }
1427
1428 #[cfg(all(
1430 feature = "async-std",
1431 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1432 ))]
1433 pub fn with_async_std_executor() -> Self {
1434 Self::with_executor(crate::executor::AsyncStdExecutor)
1435 }
1436
1437 pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1447 self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1448 self
1449 }
1450
1451 pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1463 self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1464 self
1465 }
1466
1467 pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1469 self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1470 self
1471 }
1472
1473 pub fn with_substream_upgrade_protocol_override(
1484 mut self,
1485 v: libp2p_core::upgrade::Version,
1486 ) -> Self {
1487 self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1488 self
1489 }
1490
1491 pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1501 self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1502 self
1503 }
1504
1505 pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1509 self.pool_config.idle_connection_timeout = timeout;
1510 self
1511 }
1512}
1513
1514#[derive(Debug)]
1516pub enum DialError {
1517 LocalPeerId { endpoint: ConnectedPoint },
1519 NoAddresses,
1521 DialPeerConditionFalse(dial_opts::PeerCondition),
1524 Aborted,
1526 WrongPeerId {
1528 obtained: PeerId,
1529 endpoint: ConnectedPoint,
1530 },
1531 Denied { cause: ConnectionDenied },
1535 Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1537}
1538
1539impl From<PendingOutboundConnectionError> for DialError {
1540 fn from(error: PendingOutboundConnectionError) -> Self {
1541 match error {
1542 PendingConnectionError::Aborted => DialError::Aborted,
1543 PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1544 DialError::WrongPeerId { obtained, endpoint }
1545 }
1546 PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1547 PendingConnectionError::Transport(e) => DialError::Transport(e),
1548 }
1549 }
1550}
1551
1552impl fmt::Display for DialError {
1553 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1554 match self {
1555 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1556 DialError::LocalPeerId { endpoint } => write!(
1557 f,
1558 "Dial error: tried to dial local peer id at {endpoint:?}."
1559 ),
1560 DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1561 DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1562 DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1563 DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1564 DialError::Aborted => write!(
1565 f,
1566 "Dial error: Pending connection attempt has been aborted."
1567 ),
1568 DialError::WrongPeerId { obtained, endpoint } => write!(
1569 f,
1570 "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1571 ),
1572 DialError::Transport(errors) => {
1573 write!(f, "Failed to negotiate transport protocol(s): [")?;
1574
1575 for (addr, error) in errors {
1576 write!(f, "({addr}")?;
1577 print_error_chain(f, error)?;
1578 write!(f, ")")?;
1579 }
1580 write!(f, "]")?;
1581
1582 Ok(())
1583 }
1584 DialError::Denied { .. } => {
1585 write!(f, "Dial error")
1586 }
1587 }
1588 }
1589}
1590
1591fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1592 write!(f, ": {e}")?;
1593
1594 if let Some(source) = e.source() {
1595 print_error_chain(f, source)?;
1596 }
1597
1598 Ok(())
1599}
1600
1601impl error::Error for DialError {
1602 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1603 match self {
1604 DialError::LocalPeerId { .. } => None,
1605 DialError::NoAddresses => None,
1606 DialError::DialPeerConditionFalse(_) => None,
1607 DialError::Aborted => None,
1608 DialError::WrongPeerId { .. } => None,
1609 DialError::Transport(_) => None,
1610 DialError::Denied { cause } => Some(cause),
1611 }
1612 }
1613}
1614
1615#[derive(Debug)]
1617pub enum ListenError {
1618 Aborted,
1620 WrongPeerId {
1622 obtained: PeerId,
1623 endpoint: ConnectedPoint,
1624 },
1625 LocalPeerId {
1627 endpoint: ConnectedPoint,
1628 },
1629 Denied {
1630 cause: ConnectionDenied,
1631 },
1632 Transport(TransportError<io::Error>),
1634}
1635
1636impl From<PendingInboundConnectionError> for ListenError {
1637 fn from(error: PendingInboundConnectionError) -> Self {
1638 match error {
1639 PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1640 PendingInboundConnectionError::Aborted => ListenError::Aborted,
1641 PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1642 ListenError::WrongPeerId { obtained, endpoint }
1643 }
1644 PendingInboundConnectionError::LocalPeerId { endpoint } => {
1645 ListenError::LocalPeerId { endpoint }
1646 }
1647 }
1648 }
1649}
1650
1651impl fmt::Display for ListenError {
1652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1653 match self {
1654 ListenError::Aborted => write!(
1655 f,
1656 "Listen error: Pending connection attempt has been aborted."
1657 ),
1658 ListenError::WrongPeerId { obtained, endpoint } => write!(
1659 f,
1660 "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1661 ),
1662 ListenError::Transport(_) => {
1663 write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1664 }
1665 ListenError::Denied { cause } => {
1666 write!(f, "Listen error: Denied: {cause}")
1667 }
1668 ListenError::LocalPeerId { endpoint } => {
1669 write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1670 }
1671 }
1672 }
1673}
1674
1675impl error::Error for ListenError {
1676 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1677 match self {
1678 ListenError::WrongPeerId { .. } => None,
1679 ListenError::Transport(err) => Some(err),
1680 ListenError::Aborted => None,
1681 ListenError::Denied { cause } => Some(cause),
1682 ListenError::LocalPeerId { .. } => None,
1683 }
1684 }
1685}
1686
1687#[derive(Debug)]
1691pub struct ConnectionDenied {
1692 inner: Box<dyn error::Error + Send + Sync + 'static>,
1693}
1694
1695impl ConnectionDenied {
1696 pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1697 Self {
1698 inner: cause.into(),
1699 }
1700 }
1701
1702 pub fn downcast<E>(self) -> Result<E, Self>
1704 where
1705 E: error::Error + Send + Sync + 'static,
1706 {
1707 let inner = self
1708 .inner
1709 .downcast::<E>()
1710 .map_err(|inner| ConnectionDenied { inner })?;
1711
1712 Ok(*inner)
1713 }
1714
1715 pub fn downcast_ref<E>(&self) -> Option<&E>
1717 where
1718 E: error::Error + Send + Sync + 'static,
1719 {
1720 self.inner.downcast_ref::<E>()
1721 }
1722}
1723
1724impl fmt::Display for ConnectionDenied {
1725 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1726 write!(f, "connection denied")
1727 }
1728}
1729
1730impl error::Error for ConnectionDenied {
1731 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1732 Some(self.inner.as_ref())
1733 }
1734}
1735
1736#[derive(Clone, Debug)]
1738pub struct NetworkInfo {
1739 num_peers: usize,
1741 connection_counters: ConnectionCounters,
1743}
1744
1745impl NetworkInfo {
1746 pub fn num_peers(&self) -> usize {
1749 self.num_peers
1750 }
1751
1752 pub fn connection_counters(&self) -> &ConnectionCounters {
1754 &self.connection_counters
1755 }
1756}
1757
1758#[cfg(test)]
1759mod tests {
1760 use super::*;
1761 use crate::test::{CallTraceBehaviour, MockBehaviour};
1762 use libp2p_core::multiaddr::multiaddr;
1763 use libp2p_core::transport::memory::MemoryTransportError;
1764 use libp2p_core::transport::{PortUse, TransportEvent};
1765 use libp2p_core::Endpoint;
1766 use libp2p_core::{multiaddr, transport, upgrade};
1767 use libp2p_identity as identity;
1768 use libp2p_plaintext as plaintext;
1769 use libp2p_yamux as yamux;
1770 use quickcheck::*;
1771
1772 enum State {
1775 Connecting,
1776 Disconnecting,
1777 }
1778
1779 fn new_test_swarm(
1780 config: Config,
1781 ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1782 let id_keys = identity::Keypair::generate_ed25519();
1783 let local_public_key = id_keys.public();
1784 let transport = transport::MemoryTransport::default()
1785 .upgrade(upgrade::Version::V1)
1786 .authenticate(plaintext::Config::new(&id_keys))
1787 .multiplex(yamux::Config::default())
1788 .boxed();
1789 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1790
1791 Swarm::new(
1792 transport,
1793 behaviour,
1794 local_public_key.into(),
1795 config.with_idle_connection_timeout(Duration::from_secs(5)),
1796 )
1797 }
1798
1799 fn swarms_connected<TBehaviour>(
1800 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1801 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1802 num_connections: usize,
1803 ) -> bool
1804 where
1805 TBehaviour: NetworkBehaviour,
1806 THandlerOutEvent<TBehaviour>: Clone,
1807 {
1808 swarm1
1809 .behaviour()
1810 .num_connections_to_peer(*swarm2.local_peer_id())
1811 == num_connections
1812 && swarm2
1813 .behaviour()
1814 .num_connections_to_peer(*swarm1.local_peer_id())
1815 == num_connections
1816 && swarm1.is_connected(swarm2.local_peer_id())
1817 && swarm2.is_connected(swarm1.local_peer_id())
1818 }
1819
1820 fn swarms_disconnected<TBehaviour>(
1821 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1822 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1823 ) -> bool
1824 where
1825 TBehaviour: NetworkBehaviour,
1826 THandlerOutEvent<TBehaviour>: Clone,
1827 {
1828 swarm1
1829 .behaviour()
1830 .num_connections_to_peer(*swarm2.local_peer_id())
1831 == 0
1832 && swarm2
1833 .behaviour()
1834 .num_connections_to_peer(*swarm1.local_peer_id())
1835 == 0
1836 && !swarm1.is_connected(swarm2.local_peer_id())
1837 && !swarm2.is_connected(swarm1.local_peer_id())
1838 }
1839
1840 #[tokio::test]
1846 async fn test_swarm_disconnect() {
1847 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1848 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1849
1850 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1851 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1852
1853 swarm1.listen_on(addr1.clone()).unwrap();
1854 swarm2.listen_on(addr2.clone()).unwrap();
1855
1856 let swarm1_id = *swarm1.local_peer_id();
1857
1858 let mut reconnected = false;
1859 let num_connections = 10;
1860
1861 for _ in 0..num_connections {
1862 swarm1.dial(addr2.clone()).unwrap();
1863 }
1864 let mut state = State::Connecting;
1865
1866 future::poll_fn(move |cx| loop {
1867 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1868 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1869 match state {
1870 State::Connecting => {
1871 if swarms_connected(&swarm1, &swarm2, num_connections) {
1872 if reconnected {
1873 return Poll::Ready(());
1874 }
1875 swarm2
1876 .disconnect_peer_id(swarm1_id)
1877 .expect("Error disconnecting");
1878 state = State::Disconnecting;
1879 }
1880 }
1881 State::Disconnecting => {
1882 if swarms_disconnected(&swarm1, &swarm2) {
1883 if reconnected {
1884 return Poll::Ready(());
1885 }
1886 reconnected = true;
1887 for _ in 0..num_connections {
1888 swarm2.dial(addr1.clone()).unwrap();
1889 }
1890 state = State::Connecting;
1891 }
1892 }
1893 }
1894
1895 if poll1.is_pending() && poll2.is_pending() {
1896 return Poll::Pending;
1897 }
1898 })
1899 .await
1900 }
1901
1902 #[tokio::test]
1909 async fn test_behaviour_disconnect_all() {
1910 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1911 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1912
1913 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1914 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1915
1916 swarm1.listen_on(addr1.clone()).unwrap();
1917 swarm2.listen_on(addr2.clone()).unwrap();
1918
1919 let swarm1_id = *swarm1.local_peer_id();
1920
1921 let mut reconnected = false;
1922 let num_connections = 10;
1923
1924 for _ in 0..num_connections {
1925 swarm1.dial(addr2.clone()).unwrap();
1926 }
1927 let mut state = State::Connecting;
1928
1929 future::poll_fn(move |cx| loop {
1930 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1931 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1932 match state {
1933 State::Connecting => {
1934 if swarms_connected(&swarm1, &swarm2, num_connections) {
1935 if reconnected {
1936 return Poll::Ready(());
1937 }
1938 swarm2
1939 .behaviour
1940 .inner()
1941 .next_action
1942 .replace(ToSwarm::CloseConnection {
1943 peer_id: swarm1_id,
1944 connection: CloseConnection::All,
1945 });
1946 state = State::Disconnecting;
1947 continue;
1948 }
1949 }
1950 State::Disconnecting => {
1951 if swarms_disconnected(&swarm1, &swarm2) {
1952 reconnected = true;
1953 for _ in 0..num_connections {
1954 swarm2.dial(addr1.clone()).unwrap();
1955 }
1956 state = State::Connecting;
1957 continue;
1958 }
1959 }
1960 }
1961
1962 if poll1.is_pending() && poll2.is_pending() {
1963 return Poll::Pending;
1964 }
1965 })
1966 .await
1967 }
1968
1969 #[tokio::test]
1976 async fn test_behaviour_disconnect_one() {
1977 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1978 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1979
1980 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1981 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1982
1983 swarm1.listen_on(addr1).unwrap();
1984 swarm2.listen_on(addr2.clone()).unwrap();
1985
1986 let swarm1_id = *swarm1.local_peer_id();
1987
1988 let num_connections = 10;
1989
1990 for _ in 0..num_connections {
1991 swarm1.dial(addr2.clone()).unwrap();
1992 }
1993 let mut state = State::Connecting;
1994 let mut disconnected_conn_id = None;
1995
1996 future::poll_fn(move |cx| loop {
1997 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1998 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1999 match state {
2000 State::Connecting => {
2001 if swarms_connected(&swarm1, &swarm2, num_connections) {
2002 disconnected_conn_id = {
2003 let conn_id =
2004 swarm2.behaviour.on_connection_established[num_connections / 2].1;
2005 swarm2.behaviour.inner().next_action.replace(
2006 ToSwarm::CloseConnection {
2007 peer_id: swarm1_id,
2008 connection: CloseConnection::One(conn_id),
2009 },
2010 );
2011 Some(conn_id)
2012 };
2013 state = State::Disconnecting;
2014 }
2015 }
2016 State::Disconnecting => {
2017 for s in &[&swarm1, &swarm2] {
2018 assert!(s
2019 .behaviour
2020 .on_connection_closed
2021 .iter()
2022 .all(|(.., remaining_conns)| *remaining_conns > 0));
2023 assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2024 s.behaviour.assert_connected(num_connections, 1);
2025 }
2026 if [&swarm1, &swarm2]
2027 .iter()
2028 .all(|s| s.behaviour.on_connection_closed.len() == 1)
2029 {
2030 let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2031 assert_eq!(Some(conn_id), disconnected_conn_id);
2032 return Poll::Ready(());
2033 }
2034 }
2035 }
2036
2037 if poll1.is_pending() && poll2.is_pending() {
2038 return Poll::Pending;
2039 }
2040 })
2041 .await
2042 }
2043
2044 #[test]
2045 fn concurrent_dialing() {
2046 #[derive(Clone, Debug)]
2047 struct DialConcurrencyFactor(NonZeroU8);
2048
2049 impl Arbitrary for DialConcurrencyFactor {
2050 fn arbitrary(g: &mut Gen) -> Self {
2051 Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2052 }
2053 }
2054
2055 fn prop(concurrency_factor: DialConcurrencyFactor) {
2056 tokio::runtime::Runtime::new().unwrap().block_on(async {
2057 let mut swarm = new_test_swarm(
2058 Config::with_tokio_executor()
2059 .with_dial_concurrency_factor(concurrency_factor.0),
2060 );
2061
2062 let num_listen_addrs = concurrency_factor.0.get() + 2;
2066 let mut listen_addresses = Vec::new();
2067 let mut transports = Vec::new();
2068 for _ in 0..num_listen_addrs {
2069 let mut transport = transport::MemoryTransport::default().boxed();
2070 transport
2071 .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2072 .unwrap();
2073
2074 match transport.select_next_some().await {
2075 TransportEvent::NewAddress { listen_addr, .. } => {
2076 listen_addresses.push(listen_addr);
2077 }
2078 _ => panic!("Expected `NewListenAddr` event."),
2079 }
2080
2081 transports.push(transport);
2082 }
2083
2084 swarm
2087 .dial(
2088 DialOpts::peer_id(PeerId::random())
2089 .addresses(listen_addresses)
2090 .build(),
2091 )
2092 .unwrap();
2093 for mut transport in transports.into_iter() {
2094 match futures::future::select(transport.select_next_some(), swarm.next()).await
2095 {
2096 future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2097 future::Either::Left(_) => {
2098 panic!("Unexpected transport event.")
2099 }
2100 future::Either::Right((e, _)) => {
2101 panic!("Expect swarm to not emit any event {e:?}")
2102 }
2103 }
2104 }
2105
2106 match swarm.next().await.unwrap() {
2107 SwarmEvent::OutgoingConnectionError { .. } => {}
2108 e => panic!("Unexpected swarm event {e:?}"),
2109 }
2110 })
2111 }
2112
2113 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2114 }
2115
2116 #[tokio::test]
2117 async fn invalid_peer_id() {
2118 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2122 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2123
2124 swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2125
2126 let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2127 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2128 Poll::Pending => Poll::Pending,
2129 _ => panic!("Was expecting the listen address to be reported"),
2130 })
2131 .await;
2132
2133 let other_id = PeerId::random();
2134 let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2135
2136 swarm2.dial(other_addr.clone()).unwrap();
2137
2138 let (peer_id, error) = future::poll_fn(|cx| {
2139 if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2140 swarm1.poll_next_unpin(cx)
2141 {}
2142
2143 match swarm2.poll_next_unpin(cx) {
2144 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2145 peer_id, error, ..
2146 })) => Poll::Ready((peer_id, error)),
2147 Poll::Ready(x) => panic!("unexpected {x:?}"),
2148 Poll::Pending => Poll::Pending,
2149 }
2150 })
2151 .await;
2152 assert_eq!(peer_id.unwrap(), other_id);
2153 match error {
2154 DialError::WrongPeerId { obtained, endpoint } => {
2155 assert_eq!(obtained, *swarm1.local_peer_id());
2156 assert_eq!(
2157 endpoint,
2158 ConnectedPoint::Dialer {
2159 address: other_addr,
2160 role_override: Endpoint::Dialer,
2161 port_use: PortUse::Reuse,
2162 }
2163 );
2164 }
2165 x => panic!("wrong error {x:?}"),
2166 }
2167 }
2168
2169 #[tokio::test]
2170 async fn dial_self() {
2171 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2182 swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2183
2184 let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2185 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2186 Poll::Pending => Poll::Pending,
2187 _ => panic!("Was expecting the listen address to be reported"),
2188 })
2189 .await;
2190
2191 swarm.listened_addrs.clear(); swarm.dial(local_address.clone()).unwrap();
2194
2195 let mut got_dial_err = false;
2196 let mut got_inc_err = false;
2197 future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2198 loop {
2199 match swarm.poll_next_unpin(cx) {
2200 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2201 peer_id,
2202 error: DialError::LocalPeerId { .. },
2203 ..
2204 })) => {
2205 assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2206 assert!(!got_dial_err);
2207 got_dial_err = true;
2208 if got_inc_err {
2209 return Poll::Ready(Ok(()));
2210 }
2211 }
2212 Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2213 local_addr, ..
2214 })) => {
2215 assert!(!got_inc_err);
2216 assert_eq!(local_addr, local_address);
2217 got_inc_err = true;
2218 if got_dial_err {
2219 return Poll::Ready(Ok(()));
2220 }
2221 }
2222 Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2223 assert_eq!(local_addr, local_address);
2224 }
2225 Poll::Ready(ev) => {
2226 panic!("Unexpected event: {ev:?}")
2227 }
2228 Poll::Pending => break Poll::Pending,
2229 }
2230 }
2231 })
2232 .await
2233 .unwrap();
2234 }
2235
2236 #[tokio::test]
2237 async fn dial_self_by_id() {
2238 let swarm = new_test_swarm(Config::with_tokio_executor());
2241 let peer_id = *swarm.local_peer_id();
2242 assert!(!swarm.is_connected(&peer_id));
2243 }
2244
2245 #[tokio::test]
2246 async fn multiple_addresses_err() {
2247 let target = PeerId::random();
2250
2251 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2252
2253 let addresses = HashSet::from([
2254 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2255 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2256 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2257 multiaddr![Udp(rand::random::<u16>())],
2258 multiaddr![Udp(rand::random::<u16>())],
2259 multiaddr![Udp(rand::random::<u16>())],
2260 multiaddr![Udp(rand::random::<u16>())],
2261 multiaddr![Udp(rand::random::<u16>())],
2262 ]);
2263
2264 swarm
2265 .dial(
2266 DialOpts::peer_id(target)
2267 .addresses(addresses.iter().cloned().collect())
2268 .build(),
2269 )
2270 .unwrap();
2271
2272 match swarm.next().await.unwrap() {
2273 SwarmEvent::OutgoingConnectionError {
2274 peer_id,
2275 error: DialError::Transport(errors),
2277 ..
2278 } => {
2279 assert_eq!(target, peer_id.unwrap());
2280
2281 let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2282 let expected_addresses = addresses
2283 .into_iter()
2284 .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2285 .collect::<Vec<_>>();
2286
2287 assert_eq!(expected_addresses, failed_addresses);
2288 }
2289 e => panic!("Unexpected event: {e:?}"),
2290 }
2291 }
2292
2293 #[tokio::test]
2294 async fn aborting_pending_connection_surfaces_error() {
2295 let _ = tracing_subscriber::fmt()
2296 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2297 .try_init();
2298
2299 let mut dialer = new_test_swarm(Config::with_tokio_executor());
2300 let mut listener = new_test_swarm(Config::with_tokio_executor());
2301
2302 let listener_peer_id = *listener.local_peer_id();
2303 listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2304 let listener_address = match listener.next().await.unwrap() {
2305 SwarmEvent::NewListenAddr { address, .. } => address,
2306 e => panic!("Unexpected network event: {e:?}"),
2307 };
2308
2309 dialer
2310 .dial(
2311 DialOpts::peer_id(listener_peer_id)
2312 .addresses(vec![listener_address])
2313 .build(),
2314 )
2315 .unwrap();
2316
2317 dialer
2318 .disconnect_peer_id(listener_peer_id)
2319 .expect_err("Expect peer to not yet be connected.");
2320
2321 match dialer.next().await.unwrap() {
2322 SwarmEvent::OutgoingConnectionError {
2323 error: DialError::Aborted,
2324 ..
2325 } => {}
2326 e => panic!("Unexpected swarm event {e:?}."),
2327 }
2328 }
2329
2330 #[test]
2331 fn dial_error_prints_sources() {
2332 let error = DialError::Transport(vec![(
2334 "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2335 TransportError::Other(io::Error::new(
2336 io::ErrorKind::Other,
2337 MemoryTransportError::Unreachable,
2338 )),
2339 )]);
2340
2341 let string = format!("{error}");
2342
2343 assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2345 }
2346}