1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
57
58mod connection;
59mod executor;
60mod stream;
61mod stream_protocol;
62#[cfg(test)]
63mod test;
64mod upgrade;
65
66pub mod behaviour;
67pub mod dial_opts;
68pub mod dummy;
69pub mod handler;
70#[deprecated(
71 note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead. To keep connections alive 'forever', use `Duration::from_secs(u64::MAX)`."
72)]
73pub mod keep_alive;
74mod listen_opts;
75
76#[doc(hidden)]
78pub mod derive_prelude {
79 pub use crate::behaviour::AddressChange;
80 pub use crate::behaviour::ConnectionClosed;
81 pub use crate::behaviour::ConnectionEstablished;
82 pub use crate::behaviour::DialFailure;
83 pub use crate::behaviour::ExpiredListenAddr;
84 pub use crate::behaviour::ExternalAddrConfirmed;
85 pub use crate::behaviour::ExternalAddrExpired;
86 pub use crate::behaviour::FromSwarm;
87 pub use crate::behaviour::ListenFailure;
88 pub use crate::behaviour::ListenerClosed;
89 pub use crate::behaviour::ListenerError;
90 pub use crate::behaviour::NewExternalAddrCandidate;
91 pub use crate::behaviour::NewListenAddr;
92 pub use crate::behaviour::NewListener;
93 pub use crate::connection::ConnectionId;
94 pub use crate::ConnectionDenied;
95 pub use crate::ConnectionHandler;
96 pub use crate::ConnectionHandlerSelect;
97 pub use crate::DialError;
98 pub use crate::NetworkBehaviour;
99 pub use crate::PollParameters;
100 pub use crate::THandler;
101 pub use crate::THandlerInEvent;
102 pub use crate::THandlerOutEvent;
103 pub use crate::ToSwarm;
104 pub use either::Either;
105 pub use futures::prelude as futures;
106 pub use libp2p_core::transport::ListenerId;
107 pub use libp2p_core::ConnectedPoint;
108 pub use libp2p_core::Endpoint;
109 pub use libp2p_core::Multiaddr;
110 pub use libp2p_identity::PeerId;
111}
112
113pub use behaviour::{
114 AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
115 ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
116 ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate, NewListenAddr,
117 NotifyHandler, PollParameters, ToSwarm,
118};
119pub use connection::pool::ConnectionCounters;
120pub use connection::{ConnectionError, ConnectionId, SupportedProtocols};
121pub use executor::Executor;
122pub use handler::{
123 ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, KeepAlive, OneShotHandler,
124 OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
125};
126#[cfg(feature = "macros")]
127pub use libp2p_swarm_derive::NetworkBehaviour;
128pub use listen_opts::ListenOpts;
129pub use stream::Stream;
130pub use stream_protocol::{InvalidProtocol, StreamProtocol};
131
132use crate::behaviour::ExternalAddrConfirmed;
133use crate::handler::UpgradeInfoSend;
134use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
135use connection::IncomingInfo;
136use connection::{
137 PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
138};
139use dial_opts::{DialOpts, PeerCondition};
140use futures::{prelude::*, stream::FusedStream};
141use libp2p_core::{
142 connection::ConnectedPoint,
143 multiaddr,
144 muxing::StreamMuxerBox,
145 transport::{self, ListenerId, TransportError, TransportEvent},
146 Endpoint, Multiaddr, Transport,
147};
148use libp2p_identity::PeerId;
149use smallvec::SmallVec;
150use std::collections::{HashMap, HashSet};
151use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
152use std::time::Duration;
153use std::{
154 convert::TryFrom,
155 error, fmt, io,
156 pin::Pin,
157 task::{Context, Poll},
158};
159
160#[deprecated(note = "The 'substream' terminology is deprecated. Use 'Stream' instead")]
164pub type NegotiatedSubstream = Stream;
165
166type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
168
169pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
172
173pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
176
177pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
179
180#[deprecated(
182 note = "Will be removed together with `ConnectionHandlerEvent::Close`. See <https://github.com/libp2p/rust-libp2p/issues/3591> for details."
183)]
184#[allow(deprecated)]
185pub type THandlerErr<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::Error;
186
187#[derive(Debug)]
189pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
190 Behaviour(TBehaviourOutEvent),
192 ConnectionEstablished {
194 peer_id: PeerId,
196 connection_id: ConnectionId,
198 endpoint: ConnectedPoint,
200 num_established: NonZeroU32,
203 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
207 established_in: std::time::Duration,
209 },
210 ConnectionClosed {
213 peer_id: PeerId,
215 connection_id: ConnectionId,
217 endpoint: ConnectedPoint,
219 num_established: u32,
221 cause: Option<ConnectionError<THandlerErr>>,
224 },
225 IncomingConnection {
231 connection_id: ConnectionId,
233 local_addr: Multiaddr,
237 send_back_addr: Multiaddr,
239 },
240 IncomingConnectionError {
245 connection_id: ConnectionId,
247 local_addr: Multiaddr,
251 send_back_addr: Multiaddr,
253 error: ListenError,
255 },
256 OutgoingConnectionError {
258 connection_id: ConnectionId,
260 peer_id: Option<PeerId>,
262 error: DialError,
264 },
265 NewListenAddr {
267 listener_id: ListenerId,
269 address: Multiaddr,
271 },
272 ExpiredListenAddr {
274 listener_id: ListenerId,
276 address: Multiaddr,
278 },
279 ListenerClosed {
281 listener_id: ListenerId,
283 addresses: Vec<Multiaddr>,
287 reason: Result<(), io::Error>,
290 },
291 ListenerError {
293 listener_id: ListenerId,
295 error: io::Error,
297 },
298 Dialing {
306 peer_id: Option<PeerId>,
308
309 connection_id: ConnectionId,
311 },
312}
313
314impl<TBehaviourOutEvent, THandlerErr> SwarmEvent<TBehaviourOutEvent, THandlerErr> {
315 #[allow(clippy::result_large_err)]
317 pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
318 match self {
319 SwarmEvent::Behaviour(inner) => Ok(inner),
320 other => Err(other),
321 }
322 }
323}
324
325pub struct Swarm<TBehaviour>
330where
331 TBehaviour: NetworkBehaviour,
332{
333 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
335
336 pool: Pool<THandler<TBehaviour>>,
338
339 local_peer_id: PeerId,
341
342 behaviour: TBehaviour,
345
346 supported_protocols: SmallVec<[Vec<u8>; 16]>,
348
349 confirmed_external_addr: HashSet<Multiaddr>,
350
351 listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
353
354 pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
358}
359
360impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
361
362impl<TBehaviour> Swarm<TBehaviour>
363where
364 TBehaviour: NetworkBehaviour,
365{
366 pub fn new(
369 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
370 behaviour: TBehaviour,
371 local_peer_id: PeerId,
372 config: Config,
373 ) -> Self {
374 Swarm {
375 local_peer_id,
376 transport,
377 pool: Pool::new(local_peer_id, config.pool_config),
378 behaviour,
379 supported_protocols: Default::default(),
380 confirmed_external_addr: Default::default(),
381 listened_addrs: HashMap::new(),
382 pending_event: None,
383 }
384 }
385
386 pub fn network_info(&self) -> NetworkInfo {
388 let num_peers = self.pool.num_peers();
389 let connection_counters = self.pool.counters().clone();
390 NetworkInfo {
391 num_peers,
392 connection_counters,
393 }
394 }
395
396 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
402 let opts = ListenOpts::new(addr);
403 let id = opts.listener_id();
404 self.add_listener(opts)?;
405 Ok(id)
406 }
407
408 pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
413 self.transport.remove_listener(listener_id)
414 }
415
416 pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
444 let dial_opts = opts.into();
445
446 let peer_id = dial_opts.get_peer_id();
447 let condition = dial_opts.peer_condition();
448 let connection_id = dial_opts.connection_id();
449
450 let should_dial = match (condition, peer_id) {
451 (PeerCondition::Always, _) => true,
452 (PeerCondition::Disconnected, None) => true,
453 (PeerCondition::NotDialing, None) => true,
454 (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
455 (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
456 };
457
458 if !should_dial {
459 let e = DialError::DialPeerConditionFalse(condition);
460
461 self.behaviour
462 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
463 peer_id,
464 error: &e,
465 connection_id,
466 }));
467
468 return Err(e);
469 }
470
471 let addresses = {
472 let mut addresses_from_opts = dial_opts.get_addresses();
473
474 match self.behaviour.handle_pending_outbound_connection(
475 connection_id,
476 peer_id,
477 addresses_from_opts.as_slice(),
478 dial_opts.role_override(),
479 ) {
480 Ok(addresses) => {
481 if dial_opts.extend_addresses_through_behaviour() {
482 addresses_from_opts.extend(addresses)
483 } else {
484 let num_addresses = addresses.len();
485
486 if num_addresses > 0 {
487 log::debug!("discarding {num_addresses} addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection {connection_id:?}")
488 }
489 }
490 }
491 Err(cause) => {
492 let error = DialError::Denied { cause };
493
494 self.behaviour
495 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
496 peer_id,
497 error: &error,
498 connection_id,
499 }));
500
501 return Err(error);
502 }
503 }
504
505 let mut unique_addresses = HashSet::new();
506 addresses_from_opts.retain(|addr| {
507 !self.listened_addrs.values().flatten().any(|a| a == addr)
508 && unique_addresses.insert(addr.clone())
509 });
510
511 if addresses_from_opts.is_empty() {
512 let error = DialError::NoAddresses;
513 self.behaviour
514 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
515 peer_id,
516 error: &error,
517 connection_id,
518 }));
519 return Err(error);
520 };
521
522 addresses_from_opts
523 };
524
525 let dials = addresses
526 .into_iter()
527 .map(|a| match p2p_addr(peer_id, a) {
528 Ok(address) => {
529 let dial = match dial_opts.role_override() {
530 Endpoint::Dialer => self.transport.dial(address.clone()),
531 Endpoint::Listener => self.transport.dial_as_listener(address.clone()),
532 };
533 match dial {
534 Ok(fut) => fut
535 .map(|r| (address, r.map_err(TransportError::Other)))
536 .boxed(),
537 Err(err) => futures::future::ready((address, Err(err))).boxed(),
538 }
539 }
540 Err(address) => futures::future::ready((
541 address.clone(),
542 Err(TransportError::MultiaddrNotSupported(address)),
543 ))
544 .boxed(),
545 })
546 .collect();
547
548 self.pool.add_outgoing(
549 dials,
550 peer_id,
551 dial_opts.role_override(),
552 dial_opts.dial_concurrency_override(),
553 connection_id,
554 );
555
556 Ok(())
557 }
558
559 pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
561 self.listened_addrs.values().flatten()
562 }
563
564 pub fn local_peer_id(&self) -> &PeerId {
566 &self.local_peer_id
567 }
568
569 pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
571 self.confirmed_external_addr.iter()
572 }
573
574 fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
575 let addr = opts.address();
576 let listener_id = opts.listener_id();
577
578 if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
579 self.behaviour
580 .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
581 listener_id,
582 err: &e,
583 }));
584
585 return Err(e);
586 }
587
588 self.behaviour
589 .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
590 listener_id,
591 }));
592
593 Ok(())
594 }
595
596 pub fn add_external_address(&mut self, a: Multiaddr) {
601 self.behaviour
602 .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
603 addr: &a,
604 }));
605 self.confirmed_external_addr.insert(a);
606 }
607
608 pub fn remove_external_address(&mut self, addr: &Multiaddr) {
612 self.behaviour
613 .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
614 self.confirmed_external_addr.remove(addr);
615 }
616
617 #[allow(clippy::result_unit_err)]
628 pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
629 let was_connected = self.pool.is_connected(peer_id);
630 self.pool.disconnect(peer_id);
631
632 if was_connected {
633 Ok(())
634 } else {
635 Err(())
636 }
637 }
638
639 pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
649 if let Some(established) = self.pool.get_established(connection_id) {
650 established.start_close();
651 return true;
652 }
653
654 false
655 }
656
657 pub fn is_connected(&self, peer_id: &PeerId) -> bool {
659 self.pool.is_connected(*peer_id)
660 }
661
662 pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
664 self.pool.iter_connected()
665 }
666
667 pub fn behaviour(&self) -> &TBehaviour {
669 &self.behaviour
670 }
671
672 pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
674 &mut self.behaviour
675 }
676
677 #[allow(deprecated)]
678 fn handle_pool_event(
679 &mut self,
680 event: PoolEvent<THandler<TBehaviour>>,
681 ) -> Option<SwarmEvent<TBehaviour::ToSwarm, THandlerErr<TBehaviour>>> {
682 match event {
683 PoolEvent::ConnectionEstablished {
684 peer_id,
685 id,
686 endpoint,
687 connection,
688 concurrent_dial_errors,
689 established_in,
690 } => {
691 let handler = match endpoint.clone() {
692 ConnectedPoint::Dialer {
693 address,
694 role_override,
695 } => {
696 match self.behaviour.handle_established_outbound_connection(
697 id,
698 peer_id,
699 &address,
700 role_override,
701 ) {
702 Ok(handler) => handler,
703 Err(cause) => {
704 let dial_error = DialError::Denied { cause };
705 self.behaviour.on_swarm_event(FromSwarm::DialFailure(
706 DialFailure {
707 connection_id: id,
708 error: &dial_error,
709 peer_id: Some(peer_id),
710 },
711 ));
712
713 return Some(SwarmEvent::OutgoingConnectionError {
714 peer_id: Some(peer_id),
715 connection_id: id,
716 error: dial_error,
717 });
718 }
719 }
720 }
721 ConnectedPoint::Listener {
722 local_addr,
723 send_back_addr,
724 } => {
725 match self.behaviour.handle_established_inbound_connection(
726 id,
727 peer_id,
728 &local_addr,
729 &send_back_addr,
730 ) {
731 Ok(handler) => handler,
732 Err(cause) => {
733 let listen_error = ListenError::Denied { cause };
734 self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
735 ListenFailure {
736 local_addr: &local_addr,
737 send_back_addr: &send_back_addr,
738 error: &listen_error,
739 connection_id: id,
740 },
741 ));
742
743 return Some(SwarmEvent::IncomingConnectionError {
744 connection_id: id,
745 send_back_addr,
746 local_addr,
747 error: listen_error,
748 });
749 }
750 }
751 }
752 };
753
754 let supported_protocols = handler
755 .listen_protocol()
756 .upgrade()
757 .protocol_info()
758 .map(|p| p.as_ref().as_bytes().to_vec())
759 .collect();
760 let other_established_connection_ids = self
761 .pool
762 .iter_established_connections_of_peer(&peer_id)
763 .collect::<Vec<_>>();
764 let num_established = NonZeroU32::new(
765 u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
766 )
767 .expect("n + 1 is always non-zero; qed");
768
769 self.pool
770 .spawn_connection(id, peer_id, &endpoint, connection, handler);
771
772 log::debug!(
773 "Connection established: {:?} {:?}; Total (peer): {}.",
774 peer_id,
775 endpoint,
776 num_established,
777 );
778 let failed_addresses = concurrent_dial_errors
779 .as_ref()
780 .map(|es| {
781 es.iter()
782 .map(|(a, _)| a)
783 .cloned()
784 .collect::<Vec<Multiaddr>>()
785 })
786 .unwrap_or_default();
787 self.behaviour
788 .on_swarm_event(FromSwarm::ConnectionEstablished(
789 behaviour::ConnectionEstablished {
790 peer_id,
791 connection_id: id,
792 endpoint: &endpoint,
793 failed_addresses: &failed_addresses,
794 other_established: other_established_connection_ids.len(),
795 },
796 ));
797 self.supported_protocols = supported_protocols;
798 return Some(SwarmEvent::ConnectionEstablished {
799 peer_id,
800 connection_id: id,
801 num_established,
802 endpoint,
803 concurrent_dial_errors,
804 established_in,
805 });
806 }
807 PoolEvent::PendingOutboundConnectionError {
808 id: connection_id,
809 error,
810 peer,
811 } => {
812 let error = error.into();
813
814 self.behaviour
815 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
816 peer_id: peer,
817 error: &error,
818 connection_id,
819 }));
820
821 if let Some(peer) = peer {
822 log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
823 } else {
824 log::debug!("Connection attempt to unknown peer failed with {:?}", error);
825 }
826
827 return Some(SwarmEvent::OutgoingConnectionError {
828 peer_id: peer,
829 connection_id,
830 error,
831 });
832 }
833 PoolEvent::PendingInboundConnectionError {
834 id,
835 send_back_addr,
836 local_addr,
837 error,
838 } => {
839 let error = error.into();
840
841 log::debug!("Incoming connection failed: {:?}", error);
842 self.behaviour
843 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
844 local_addr: &local_addr,
845 send_back_addr: &send_back_addr,
846 error: &error,
847 connection_id: id,
848 }));
849 return Some(SwarmEvent::IncomingConnectionError {
850 connection_id: id,
851 local_addr,
852 send_back_addr,
853 error,
854 });
855 }
856 PoolEvent::ConnectionClosed {
857 id,
858 connected,
859 error,
860 remaining_established_connection_ids,
861 handler,
862 ..
863 } => {
864 if let Some(error) = error.as_ref() {
865 log::debug!(
866 "Connection closed with error {:?}: {:?}; Total (peer): {}.",
867 error,
868 connected,
869 remaining_established_connection_ids.len()
870 );
871 } else {
872 log::debug!(
873 "Connection closed: {:?}; Total (peer): {}.",
874 connected,
875 remaining_established_connection_ids.len()
876 );
877 }
878 let peer_id = connected.peer_id;
879 let endpoint = connected.endpoint;
880 let num_established =
881 u32::try_from(remaining_established_connection_ids.len()).unwrap();
882
883 self.behaviour
884 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
885 peer_id,
886 connection_id: id,
887 endpoint: &endpoint,
888 handler,
889 remaining_established: num_established as usize,
890 }));
891 return Some(SwarmEvent::ConnectionClosed {
892 peer_id,
893 connection_id: id,
894 endpoint,
895 cause: error,
896 num_established,
897 });
898 }
899 PoolEvent::ConnectionEvent { peer_id, id, event } => {
900 self.behaviour
901 .on_connection_handler_event(peer_id, id, event);
902 }
903 PoolEvent::AddressChange {
904 peer_id,
905 id,
906 new_endpoint,
907 old_endpoint,
908 } => {
909 self.behaviour
910 .on_swarm_event(FromSwarm::AddressChange(AddressChange {
911 peer_id,
912 connection_id: id,
913 old: &old_endpoint,
914 new: &new_endpoint,
915 }));
916 }
917 }
918
919 None
920 }
921
922 #[allow(deprecated)]
923 fn handle_transport_event(
924 &mut self,
925 event: TransportEvent<
926 <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
927 io::Error,
928 >,
929 ) -> Option<SwarmEvent<TBehaviour::ToSwarm, THandlerErr<TBehaviour>>> {
930 match event {
931 TransportEvent::Incoming {
932 listener_id: _,
933 upgrade,
934 local_addr,
935 send_back_addr,
936 } => {
937 let connection_id = ConnectionId::next();
938
939 match self.behaviour.handle_pending_inbound_connection(
940 connection_id,
941 &local_addr,
942 &send_back_addr,
943 ) {
944 Ok(()) => {}
945 Err(cause) => {
946 let listen_error = ListenError::Denied { cause };
947
948 self.behaviour
949 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
950 local_addr: &local_addr,
951 send_back_addr: &send_back_addr,
952 error: &listen_error,
953 connection_id,
954 }));
955
956 return Some(SwarmEvent::IncomingConnectionError {
957 connection_id,
958 local_addr,
959 send_back_addr,
960 error: listen_error,
961 });
962 }
963 }
964
965 self.pool.add_incoming(
966 upgrade,
967 IncomingInfo {
968 local_addr: &local_addr,
969 send_back_addr: &send_back_addr,
970 },
971 connection_id,
972 );
973
974 Some(SwarmEvent::IncomingConnection {
975 connection_id,
976 local_addr,
977 send_back_addr,
978 })
979 }
980 TransportEvent::NewAddress {
981 listener_id,
982 listen_addr,
983 } => {
984 log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
985 let addrs = self.listened_addrs.entry(listener_id).or_default();
986 if !addrs.contains(&listen_addr) {
987 addrs.push(listen_addr.clone())
988 }
989 self.behaviour
990 .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
991 listener_id,
992 addr: &listen_addr,
993 }));
994 Some(SwarmEvent::NewListenAddr {
995 listener_id,
996 address: listen_addr,
997 })
998 }
999 TransportEvent::AddressExpired {
1000 listener_id,
1001 listen_addr,
1002 } => {
1003 log::debug!(
1004 "Listener {:?}; Expired address {:?}.",
1005 listener_id,
1006 listen_addr
1007 );
1008 if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1009 addrs.retain(|a| a != &listen_addr);
1010 }
1011 self.behaviour
1012 .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1013 listener_id,
1014 addr: &listen_addr,
1015 }));
1016 Some(SwarmEvent::ExpiredListenAddr {
1017 listener_id,
1018 address: listen_addr,
1019 })
1020 }
1021 TransportEvent::ListenerClosed {
1022 listener_id,
1023 reason,
1024 } => {
1025 log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
1026 let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1027 for addr in addrs.iter() {
1028 self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1029 ExpiredListenAddr { listener_id, addr },
1030 ));
1031 }
1032 self.behaviour
1033 .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1034 listener_id,
1035 reason: reason.as_ref().copied(),
1036 }));
1037 Some(SwarmEvent::ListenerClosed {
1038 listener_id,
1039 addresses: addrs.to_vec(),
1040 reason,
1041 })
1042 }
1043 TransportEvent::ListenerError { listener_id, error } => {
1044 self.behaviour
1045 .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1046 listener_id,
1047 err: &error,
1048 }));
1049 Some(SwarmEvent::ListenerError { listener_id, error })
1050 }
1051 }
1052 }
1053
1054 #[allow(deprecated)]
1055 fn handle_behaviour_event(
1056 &mut self,
1057 event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1058 ) -> Option<SwarmEvent<TBehaviour::ToSwarm, THandlerErr<TBehaviour>>> {
1059 match event {
1060 ToSwarm::GenerateEvent(event) => return Some(SwarmEvent::Behaviour(event)),
1061 ToSwarm::Dial { opts } => {
1062 let peer_id = opts.get_peer_id();
1063 let connection_id = opts.connection_id();
1064 if let Ok(()) = self.dial(opts) {
1065 return Some(SwarmEvent::Dialing {
1066 peer_id,
1067 connection_id,
1068 });
1069 }
1070 }
1071 ToSwarm::ListenOn { opts } => {
1072 let _ = self.add_listener(opts);
1074 }
1075 ToSwarm::RemoveListener { id } => {
1076 self.remove_listener(id);
1077 }
1078 ToSwarm::NotifyHandler {
1079 peer_id,
1080 handler,
1081 event,
1082 } => {
1083 assert!(self.pending_event.is_none());
1084 let handler = match handler {
1085 NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1086 NotifyHandler::Any => {
1087 let ids = self
1088 .pool
1089 .iter_established_connections_of_peer(&peer_id)
1090 .collect();
1091 PendingNotifyHandler::Any(ids)
1092 }
1093 };
1094
1095 self.pending_event = Some((peer_id, handler, event));
1096 }
1097 ToSwarm::NewExternalAddrCandidate(addr) => {
1098 let translated_addresses = {
1101 let mut addrs: Vec<_> = self
1102 .listened_addrs
1103 .values()
1104 .flatten()
1105 .filter_map(|server| self.transport.address_translation(server, &addr))
1106 .collect();
1107
1108 addrs.sort_unstable();
1110 addrs.dedup();
1111 addrs
1112 };
1113
1114 if translated_addresses.is_empty() {
1116 self.behaviour
1117 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1118 NewExternalAddrCandidate { addr: &addr },
1119 ));
1120 } else {
1121 for addr in translated_addresses {
1122 self.behaviour
1123 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1124 NewExternalAddrCandidate { addr: &addr },
1125 ));
1126 }
1127 }
1128 }
1129 ToSwarm::ExternalAddrConfirmed(addr) => {
1130 self.add_external_address(addr);
1131 }
1132 ToSwarm::ExternalAddrExpired(addr) => {
1133 self.remove_external_address(&addr);
1134 }
1135 ToSwarm::CloseConnection {
1136 peer_id,
1137 connection,
1138 } => match connection {
1139 CloseConnection::One(connection_id) => {
1140 if let Some(conn) = self.pool.get_established(connection_id) {
1141 conn.start_close();
1142 }
1143 }
1144 CloseConnection::All => {
1145 self.pool.disconnect(peer_id);
1146 }
1147 },
1148 }
1149
1150 None
1151 }
1152
1153 #[allow(deprecated)]
1157 fn poll_next_event(
1158 mut self: Pin<&mut Self>,
1159 cx: &mut Context<'_>,
1160 ) -> Poll<SwarmEvent<TBehaviour::ToSwarm, THandlerErr<TBehaviour>>> {
1161 let this = &mut *self;
1164
1165 loop {
1175 match this.pending_event.take() {
1176 Some((peer_id, handler, event)) => match handler {
1179 PendingNotifyHandler::One(conn_id) => {
1180 match this.pool.get_established(conn_id) {
1181 Some(conn) => match notify_one(conn, event, cx) {
1182 None => continue,
1183 Some(event) => {
1184 this.pending_event = Some((peer_id, handler, event));
1185 }
1186 },
1187 None => continue,
1188 }
1189 }
1190 PendingNotifyHandler::Any(ids) => {
1191 match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1192 None => continue,
1193 Some((event, ids)) => {
1194 let handler = PendingNotifyHandler::Any(ids);
1195 this.pending_event = Some((peer_id, handler, event));
1196 }
1197 }
1198 }
1199 },
1200 None => {
1202 let behaviour_poll = {
1203 let mut parameters = SwarmPollParameters {
1204 supported_protocols: &this.supported_protocols,
1205 };
1206 this.behaviour.poll(cx, &mut parameters)
1207 };
1208
1209 match behaviour_poll {
1210 Poll::Pending => {}
1211 Poll::Ready(behaviour_event) => {
1212 if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event)
1213 {
1214 return Poll::Ready(swarm_event);
1215 }
1216
1217 continue;
1218 }
1219 }
1220 }
1221 }
1222
1223 match this.pool.poll(cx) {
1225 Poll::Pending => {}
1226 Poll::Ready(pool_event) => {
1227 if let Some(swarm_event) = this.handle_pool_event(pool_event) {
1228 return Poll::Ready(swarm_event);
1229 }
1230
1231 continue;
1232 }
1233 };
1234
1235 match Pin::new(&mut this.transport).poll(cx) {
1237 Poll::Pending => {}
1238 Poll::Ready(transport_event) => {
1239 if let Some(swarm_event) = this.handle_transport_event(transport_event) {
1240 return Poll::Ready(swarm_event);
1241 }
1242
1243 continue;
1244 }
1245 }
1246
1247 return Poll::Pending;
1248 }
1249 }
1250}
1251
1252enum PendingNotifyHandler {
1259 One(ConnectionId),
1260 Any(SmallVec<[ConnectionId; 10]>),
1261}
1262
1263fn notify_one<THandlerInEvent>(
1272 conn: &mut EstablishedConnection<THandlerInEvent>,
1273 event: THandlerInEvent,
1274 cx: &mut Context<'_>,
1275) -> Option<THandlerInEvent> {
1276 match conn.poll_ready_notify_handler(cx) {
1277 Poll::Pending => Some(event),
1278 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
1280 let _ = conn.notify_handler(event);
1282 None
1283 }
1284 }
1285}
1286
1287fn notify_any<THandler, TBehaviour>(
1298 ids: SmallVec<[ConnectionId; 10]>,
1299 pool: &mut Pool<THandler>,
1300 event: THandlerInEvent<TBehaviour>,
1301 cx: &mut Context<'_>,
1302) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1303where
1304 TBehaviour: NetworkBehaviour,
1305 THandler: ConnectionHandler<
1306 FromBehaviour = THandlerInEvent<TBehaviour>,
1307 ToBehaviour = THandlerOutEvent<TBehaviour>,
1308 >,
1309{
1310 let mut pending = SmallVec::new();
1311 let mut event = Some(event); for id in ids.into_iter() {
1313 if let Some(conn) = pool.get_established(id) {
1314 match conn.poll_ready_notify_handler(cx) {
1315 Poll::Pending => pending.push(id),
1316 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
1318 let e = event.take().expect("by (1),(2)");
1319 if let Err(e) = conn.notify_handler(e) {
1320 event = Some(e) } else {
1322 break;
1323 }
1324 }
1325 }
1326 }
1327 }
1328
1329 event.and_then(|e| {
1330 if !pending.is_empty() {
1331 Some((e, pending))
1332 } else {
1333 None
1334 }
1335 })
1336}
1337
1338impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1346where
1347 TBehaviour: NetworkBehaviour,
1348{
1349 #[allow(deprecated)]
1350 type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>, THandlerErr<TBehaviour>>;
1351
1352 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1353 self.as_mut().poll_next_event(cx).map(Some)
1354 }
1355}
1356
1357impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1359where
1360 TBehaviour: NetworkBehaviour,
1361{
1362 fn is_terminated(&self) -> bool {
1363 false
1364 }
1365}
1366
1367pub struct SwarmPollParameters<'a> {
1370 supported_protocols: &'a [Vec<u8>],
1371}
1372
1373impl<'a> PollParameters for SwarmPollParameters<'a> {
1374 type SupportedProtocolsIter = std::iter::Cloned<std::slice::Iter<'a, std::vec::Vec<u8>>>;
1375
1376 fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
1377 self.supported_protocols.iter().cloned()
1378 }
1379}
1380
1381pub struct Config {
1382 pool_config: PoolConfig,
1383}
1384
1385impl Config {
1386 pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1389 Self {
1390 pool_config: PoolConfig::new(Some(Box::new(executor))),
1391 }
1392 }
1393
1394 #[cfg(feature = "wasm-bindgen")]
1404 pub fn with_wasm_executor() -> Self {
1405 Self::with_executor(crate::executor::WasmBindgenExecutor)
1406 }
1407
1408 #[cfg(all(
1410 feature = "tokio",
1411 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1412 ))]
1413 pub fn with_tokio_executor() -> Self {
1414 Self::with_executor(crate::executor::TokioExecutor)
1415 }
1416
1417 #[cfg(all(
1419 feature = "async-std",
1420 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1421 ))]
1422 pub fn with_async_std_executor() -> Self {
1423 Self::with_executor(crate::executor::AsyncStdExecutor)
1424 }
1425
1426 pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1436 self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1437 self
1438 }
1439
1440 pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1452 self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1453 self
1454 }
1455
1456 pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1458 self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1459 self
1460 }
1461
1462 pub fn with_substream_upgrade_protocol_override(
1473 mut self,
1474 v: libp2p_core::upgrade::Version,
1475 ) -> Self {
1476 self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1477 self
1478 }
1479
1480 pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1490 self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1491 self
1492 }
1493
1494 pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1498 self.pool_config.idle_connection_timeout = timeout;
1499 self
1500 }
1501}
1502
1503#[deprecated(
1505 note = "Use the new `libp2p::SwarmBuilder` instead of `libp2p::swarm::SwarmBuilder` or create a `Swarm` directly via `Swarm::new`."
1506)]
1507pub struct SwarmBuilder<TBehaviour> {
1508 local_peer_id: PeerId,
1509 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1510 behaviour: TBehaviour,
1511 pool_config: PoolConfig,
1512}
1513
1514#[allow(deprecated)]
1515impl<TBehaviour> SwarmBuilder<TBehaviour>
1516where
1517 TBehaviour: NetworkBehaviour,
1518{
1519 pub fn with_executor(
1523 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1524 behaviour: TBehaviour,
1525 local_peer_id: PeerId,
1526 executor: impl Executor + Send + 'static,
1527 ) -> Self {
1528 Self {
1529 local_peer_id,
1530 transport,
1531 behaviour,
1532 pool_config: PoolConfig::new(Some(Box::new(executor))),
1533 }
1534 }
1535
1536 #[cfg(feature = "wasm-bindgen")]
1546 pub fn with_wasm_executor(
1547 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1548 behaviour: TBehaviour,
1549 local_peer_id: PeerId,
1550 ) -> Self {
1551 Self::with_executor(
1552 transport,
1553 behaviour,
1554 local_peer_id,
1555 crate::executor::WasmBindgenExecutor,
1556 )
1557 }
1558
1559 #[cfg(all(
1562 feature = "tokio",
1563 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1564 ))]
1565 pub fn with_tokio_executor(
1566 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1567 behaviour: TBehaviour,
1568 local_peer_id: PeerId,
1569 ) -> Self {
1570 Self::with_executor(
1571 transport,
1572 behaviour,
1573 local_peer_id,
1574 crate::executor::TokioExecutor,
1575 )
1576 }
1577
1578 #[cfg(all(
1581 feature = "async-std",
1582 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1583 ))]
1584 pub fn with_async_std_executor(
1585 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1586 behaviour: TBehaviour,
1587 local_peer_id: PeerId,
1588 ) -> Self {
1589 Self::with_executor(
1590 transport,
1591 behaviour,
1592 local_peer_id,
1593 crate::executor::AsyncStdExecutor,
1594 )
1595 }
1596
1597 pub fn without_executor(
1605 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
1606 behaviour: TBehaviour,
1607 local_peer_id: PeerId,
1608 ) -> Self {
1609 Self {
1610 local_peer_id,
1611 transport,
1612 behaviour,
1613 pool_config: PoolConfig::new(None),
1614 }
1615 }
1616
1617 pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1627 self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1628 self
1629 }
1630
1631 pub fn per_connection_event_buffer_size(mut self, n: usize) -> Self {
1643 self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1644 self
1645 }
1646
1647 pub fn dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1649 self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1650 self
1651 }
1652
1653 pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
1664 self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1665 self
1666 }
1667
1668 pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1677 self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1678 self
1679 }
1680
1681 pub fn idle_connection_timeout(mut self, timeout: Duration) -> Self {
1685 self.pool_config.idle_connection_timeout = timeout;
1686 self
1687 }
1688
1689 pub fn build(self) -> Swarm<TBehaviour> {
1691 log::info!("Local peer id: {}", self.local_peer_id);
1692 Swarm {
1693 local_peer_id: self.local_peer_id,
1694 transport: self.transport,
1695 pool: Pool::new(self.local_peer_id, self.pool_config),
1696 behaviour: self.behaviour,
1697 supported_protocols: Default::default(),
1698 confirmed_external_addr: Default::default(),
1699 listened_addrs: HashMap::new(),
1700 pending_event: None,
1701 }
1702 }
1703}
1704
1705#[derive(Debug)]
1707pub enum DialError {
1708 LocalPeerId {
1710 endpoint: ConnectedPoint,
1711 },
1712 NoAddresses,
1714 DialPeerConditionFalse(dial_opts::PeerCondition),
1717 Aborted,
1719 WrongPeerId {
1721 obtained: PeerId,
1722 endpoint: ConnectedPoint,
1723 },
1724 Denied {
1725 cause: ConnectionDenied,
1726 },
1727 Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1729}
1730
1731impl From<PendingOutboundConnectionError> for DialError {
1732 fn from(error: PendingOutboundConnectionError) -> Self {
1733 match error {
1734 PendingConnectionError::Aborted => DialError::Aborted,
1735 PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1736 DialError::WrongPeerId { obtained, endpoint }
1737 }
1738 PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1739 PendingConnectionError::Transport(e) => DialError::Transport(e),
1740 }
1741 }
1742}
1743
1744impl fmt::Display for DialError {
1745 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1746 match self {
1747 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1748 DialError::LocalPeerId { endpoint } => write!(
1749 f,
1750 "Dial error: tried to dial local peer id at {endpoint:?}."
1751 ),
1752 DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1753 DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1754 DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1755 DialError::Aborted => write!(
1756 f,
1757 "Dial error: Pending connection attempt has been aborted."
1758 ),
1759 DialError::WrongPeerId { obtained, endpoint } => write!(
1760 f,
1761 "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1762 ),
1763 DialError::Transport(errors) => {
1764 write!(f, "Failed to negotiate transport protocol(s): [")?;
1765
1766 for (addr, error) in errors {
1767 write!(f, "({addr}")?;
1768 print_error_chain(f, error)?;
1769 write!(f, ")")?;
1770 }
1771 write!(f, "]")?;
1772
1773 Ok(())
1774 }
1775 DialError::Denied { .. } => {
1776 write!(f, "Dial error")
1777 }
1778 }
1779 }
1780}
1781
1782fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1783 write!(f, ": {e}")?;
1784
1785 if let Some(source) = e.source() {
1786 print_error_chain(f, source)?;
1787 }
1788
1789 Ok(())
1790}
1791
1792impl error::Error for DialError {
1793 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1794 match self {
1795 DialError::LocalPeerId { .. } => None,
1796 DialError::NoAddresses => None,
1797 DialError::DialPeerConditionFalse(_) => None,
1798 DialError::Aborted => None,
1799 DialError::WrongPeerId { .. } => None,
1800 DialError::Transport(_) => None,
1801 DialError::Denied { cause } => Some(cause),
1802 }
1803 }
1804}
1805
1806#[derive(Debug)]
1808pub enum ListenError {
1809 Aborted,
1811 WrongPeerId {
1813 obtained: PeerId,
1814 endpoint: ConnectedPoint,
1815 },
1816 LocalPeerId {
1818 endpoint: ConnectedPoint,
1819 },
1820 Denied {
1821 cause: ConnectionDenied,
1822 },
1823 Transport(TransportError<io::Error>),
1825}
1826
1827impl From<PendingInboundConnectionError> for ListenError {
1828 fn from(error: PendingInboundConnectionError) -> Self {
1829 match error {
1830 PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1831 PendingInboundConnectionError::Aborted => ListenError::Aborted,
1832 PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1833 ListenError::WrongPeerId { obtained, endpoint }
1834 }
1835 PendingInboundConnectionError::LocalPeerId { endpoint } => {
1836 ListenError::LocalPeerId { endpoint }
1837 }
1838 }
1839 }
1840}
1841
1842impl fmt::Display for ListenError {
1843 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1844 match self {
1845 ListenError::Aborted => write!(
1846 f,
1847 "Listen error: Pending connection attempt has been aborted."
1848 ),
1849 ListenError::WrongPeerId { obtained, endpoint } => write!(
1850 f,
1851 "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1852 ),
1853 ListenError::Transport(_) => {
1854 write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1855 }
1856 ListenError::Denied { cause } => {
1857 write!(f, "Listen error: Denied: {cause}")
1858 }
1859 ListenError::LocalPeerId { endpoint } => {
1860 write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1861 }
1862 }
1863 }
1864}
1865
1866impl error::Error for ListenError {
1867 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1868 match self {
1869 ListenError::WrongPeerId { .. } => None,
1870 ListenError::Transport(err) => Some(err),
1871 ListenError::Aborted => None,
1872 ListenError::Denied { cause } => Some(cause),
1873 ListenError::LocalPeerId { .. } => None,
1874 }
1875 }
1876}
1877
1878#[derive(Debug)]
1882pub struct ConnectionDenied {
1883 inner: Box<dyn error::Error + Send + Sync + 'static>,
1884}
1885
1886impl ConnectionDenied {
1887 pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1888 Self {
1889 inner: cause.into(),
1890 }
1891 }
1892
1893 pub fn downcast<E>(self) -> Result<E, Self>
1895 where
1896 E: error::Error + Send + Sync + 'static,
1897 {
1898 let inner = self
1899 .inner
1900 .downcast::<E>()
1901 .map_err(|inner| ConnectionDenied { inner })?;
1902
1903 Ok(*inner)
1904 }
1905
1906 pub fn downcast_ref<E>(&self) -> Option<&E>
1908 where
1909 E: error::Error + Send + Sync + 'static,
1910 {
1911 self.inner.downcast_ref::<E>()
1912 }
1913}
1914
1915impl fmt::Display for ConnectionDenied {
1916 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1917 write!(f, "connection denied")
1918 }
1919}
1920
1921impl error::Error for ConnectionDenied {
1922 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1923 Some(self.inner.as_ref())
1924 }
1925}
1926
1927#[derive(Clone, Debug)]
1929pub struct NetworkInfo {
1930 num_peers: usize,
1932 connection_counters: ConnectionCounters,
1934}
1935
1936impl NetworkInfo {
1937 pub fn num_peers(&self) -> usize {
1940 self.num_peers
1941 }
1942
1943 pub fn connection_counters(&self) -> &ConnectionCounters {
1945 &self.connection_counters
1946 }
1947}
1948
1949fn p2p_addr(peer: Option<PeerId>, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
1960 let peer = match peer {
1961 Some(p) => p,
1962 None => return Ok(addr),
1963 };
1964
1965 if let Some(multiaddr::Protocol::P2p(peer_id)) = addr.iter().last() {
1966 if peer_id != peer {
1967 return Err(addr);
1968 }
1969
1970 return Ok(addr);
1971 }
1972
1973 Ok(addr.with(multiaddr::Protocol::P2p(peer)))
1974}
1975
1976#[cfg(test)]
1977mod tests {
1978 use super::*;
1979 use crate::dummy;
1980 use crate::test::{CallTraceBehaviour, MockBehaviour};
1981 use futures::future;
1982 use libp2p_core::multiaddr::multiaddr;
1983 use libp2p_core::transport::memory::MemoryTransportError;
1984 use libp2p_core::transport::TransportEvent;
1985 use libp2p_core::Endpoint;
1986 use libp2p_core::{multiaddr, transport, upgrade};
1987 use libp2p_identity as identity;
1988 use libp2p_plaintext as plaintext;
1989 use libp2p_yamux as yamux;
1990 use quickcheck::*;
1991
1992 enum State {
1995 Connecting,
1996 Disconnecting,
1997 }
1998
1999 fn new_test_swarm(
2000 config: Config,
2001 ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
2002 let id_keys = identity::Keypair::generate_ed25519();
2003 let local_public_key = id_keys.public();
2004 let transport = transport::MemoryTransport::default()
2005 .upgrade(upgrade::Version::V1)
2006 .authenticate(plaintext::Config::new(&id_keys))
2007 .multiplex(yamux::Config::default())
2008 .boxed();
2009 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
2010
2011 Swarm::new(
2012 transport,
2013 behaviour,
2014 local_public_key.into(),
2015 config.with_idle_connection_timeout(Duration::from_secs(5)),
2016 )
2017 }
2018
2019 fn swarms_connected<TBehaviour>(
2020 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
2021 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
2022 num_connections: usize,
2023 ) -> bool
2024 where
2025 TBehaviour: NetworkBehaviour,
2026 THandlerOutEvent<TBehaviour>: Clone,
2027 {
2028 swarm1
2029 .behaviour()
2030 .num_connections_to_peer(*swarm2.local_peer_id())
2031 == num_connections
2032 && swarm2
2033 .behaviour()
2034 .num_connections_to_peer(*swarm1.local_peer_id())
2035 == num_connections
2036 && swarm1.is_connected(swarm2.local_peer_id())
2037 && swarm2.is_connected(swarm1.local_peer_id())
2038 }
2039
2040 fn swarms_disconnected<TBehaviour: NetworkBehaviour>(
2041 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
2042 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
2043 ) -> bool
2044 where
2045 TBehaviour: NetworkBehaviour,
2046 THandlerOutEvent<TBehaviour>: Clone,
2047 {
2048 swarm1
2049 .behaviour()
2050 .num_connections_to_peer(*swarm2.local_peer_id())
2051 == 0
2052 && swarm2
2053 .behaviour()
2054 .num_connections_to_peer(*swarm1.local_peer_id())
2055 == 0
2056 && !swarm1.is_connected(swarm2.local_peer_id())
2057 && !swarm2.is_connected(swarm1.local_peer_id())
2058 }
2059
2060 #[tokio::test]
2066 async fn test_swarm_disconnect() {
2067 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2068 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2069
2070 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2071 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2072
2073 swarm1.listen_on(addr1.clone()).unwrap();
2074 swarm2.listen_on(addr2.clone()).unwrap();
2075
2076 let swarm1_id = *swarm1.local_peer_id();
2077
2078 let mut reconnected = false;
2079 let num_connections = 10;
2080
2081 for _ in 0..num_connections {
2082 swarm1.dial(addr2.clone()).unwrap();
2083 }
2084 let mut state = State::Connecting;
2085
2086 future::poll_fn(move |cx| loop {
2087 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2088 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2089 match state {
2090 State::Connecting => {
2091 if swarms_connected(&swarm1, &swarm2, num_connections) {
2092 if reconnected {
2093 return Poll::Ready(());
2094 }
2095 swarm2
2096 .disconnect_peer_id(swarm1_id)
2097 .expect("Error disconnecting");
2098 state = State::Disconnecting;
2099 }
2100 }
2101 State::Disconnecting => {
2102 if swarms_disconnected(&swarm1, &swarm2) {
2103 if reconnected {
2104 return Poll::Ready(());
2105 }
2106 reconnected = true;
2107 for _ in 0..num_connections {
2108 swarm2.dial(addr1.clone()).unwrap();
2109 }
2110 state = State::Connecting;
2111 }
2112 }
2113 }
2114
2115 if poll1.is_pending() && poll2.is_pending() {
2116 return Poll::Pending;
2117 }
2118 })
2119 .await
2120 }
2121
2122 #[tokio::test]
2129 async fn test_behaviour_disconnect_all() {
2130 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2131 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2132
2133 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2134 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2135
2136 swarm1.listen_on(addr1.clone()).unwrap();
2137 swarm2.listen_on(addr2.clone()).unwrap();
2138
2139 let swarm1_id = *swarm1.local_peer_id();
2140
2141 let mut reconnected = false;
2142 let num_connections = 10;
2143
2144 for _ in 0..num_connections {
2145 swarm1.dial(addr2.clone()).unwrap();
2146 }
2147 let mut state = State::Connecting;
2148
2149 future::poll_fn(move |cx| loop {
2150 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2151 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2152 match state {
2153 State::Connecting => {
2154 if swarms_connected(&swarm1, &swarm2, num_connections) {
2155 if reconnected {
2156 return Poll::Ready(());
2157 }
2158 swarm2
2159 .behaviour
2160 .inner()
2161 .next_action
2162 .replace(ToSwarm::CloseConnection {
2163 peer_id: swarm1_id,
2164 connection: CloseConnection::All,
2165 });
2166 state = State::Disconnecting;
2167 continue;
2168 }
2169 }
2170 State::Disconnecting => {
2171 if swarms_disconnected(&swarm1, &swarm2) {
2172 reconnected = true;
2173 for _ in 0..num_connections {
2174 swarm2.dial(addr1.clone()).unwrap();
2175 }
2176 state = State::Connecting;
2177 continue;
2178 }
2179 }
2180 }
2181
2182 if poll1.is_pending() && poll2.is_pending() {
2183 return Poll::Pending;
2184 }
2185 })
2186 .await
2187 }
2188
2189 #[tokio::test]
2196 async fn test_behaviour_disconnect_one() {
2197 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2198 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2199
2200 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2201 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
2202
2203 swarm1.listen_on(addr1).unwrap();
2204 swarm2.listen_on(addr2.clone()).unwrap();
2205
2206 let swarm1_id = *swarm1.local_peer_id();
2207
2208 let num_connections = 10;
2209
2210 for _ in 0..num_connections {
2211 swarm1.dial(addr2.clone()).unwrap();
2212 }
2213 let mut state = State::Connecting;
2214 let mut disconnected_conn_id = None;
2215
2216 future::poll_fn(move |cx| loop {
2217 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
2218 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
2219 match state {
2220 State::Connecting => {
2221 if swarms_connected(&swarm1, &swarm2, num_connections) {
2222 disconnected_conn_id = {
2223 let conn_id =
2224 swarm2.behaviour.on_connection_established[num_connections / 2].1;
2225 swarm2.behaviour.inner().next_action.replace(
2226 ToSwarm::CloseConnection {
2227 peer_id: swarm1_id,
2228 connection: CloseConnection::One(conn_id),
2229 },
2230 );
2231 Some(conn_id)
2232 };
2233 state = State::Disconnecting;
2234 }
2235 }
2236 State::Disconnecting => {
2237 for s in &[&swarm1, &swarm2] {
2238 assert!(s
2239 .behaviour
2240 .on_connection_closed
2241 .iter()
2242 .all(|(.., remaining_conns)| *remaining_conns > 0));
2243 assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2244 s.behaviour.assert_connected(num_connections, 1);
2245 }
2246 if [&swarm1, &swarm2]
2247 .iter()
2248 .all(|s| s.behaviour.on_connection_closed.len() == 1)
2249 {
2250 let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2251 assert_eq!(Some(conn_id), disconnected_conn_id);
2252 return Poll::Ready(());
2253 }
2254 }
2255 }
2256
2257 if poll1.is_pending() && poll2.is_pending() {
2258 return Poll::Pending;
2259 }
2260 })
2261 .await
2262 }
2263
2264 #[test]
2265 fn concurrent_dialing() {
2266 #[derive(Clone, Debug)]
2267 struct DialConcurrencyFactor(NonZeroU8);
2268
2269 impl Arbitrary for DialConcurrencyFactor {
2270 fn arbitrary(g: &mut Gen) -> Self {
2271 Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2272 }
2273 }
2274
2275 fn prop(concurrency_factor: DialConcurrencyFactor) {
2276 tokio::runtime::Runtime::new().unwrap().block_on(async {
2277 let mut swarm = new_test_swarm(
2278 Config::with_tokio_executor()
2279 .with_dial_concurrency_factor(concurrency_factor.0),
2280 );
2281
2282 let num_listen_addrs = concurrency_factor.0.get() + 2;
2286 let mut listen_addresses = Vec::new();
2287 let mut transports = Vec::new();
2288 for _ in 0..num_listen_addrs {
2289 let mut transport = transport::MemoryTransport::default().boxed();
2290 transport
2291 .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2292 .unwrap();
2293
2294 match transport.select_next_some().await {
2295 TransportEvent::NewAddress { listen_addr, .. } => {
2296 listen_addresses.push(listen_addr);
2297 }
2298 _ => panic!("Expected `NewListenAddr` event."),
2299 }
2300
2301 transports.push(transport);
2302 }
2303
2304 swarm
2307 .dial(
2308 DialOpts::peer_id(PeerId::random())
2309 .addresses(listen_addresses)
2310 .build(),
2311 )
2312 .unwrap();
2313 for mut transport in transports.into_iter() {
2314 match futures::future::select(transport.select_next_some(), swarm.next()).await
2315 {
2316 future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2317 future::Either::Left(_) => {
2318 panic!("Unexpected transport event.")
2319 }
2320 future::Either::Right((e, _)) => {
2321 panic!("Expect swarm to not emit any event {e:?}")
2322 }
2323 }
2324 }
2325
2326 match swarm.next().await.unwrap() {
2327 SwarmEvent::OutgoingConnectionError { .. } => {}
2328 e => panic!("Unexpected swarm event {e:?}"),
2329 }
2330 })
2331 }
2332
2333 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2334 }
2335
2336 #[tokio::test]
2337 async fn invalid_peer_id() {
2338 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2342 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2343
2344 swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2345
2346 let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2347 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2348 Poll::Pending => Poll::Pending,
2349 _ => panic!("Was expecting the listen address to be reported"),
2350 })
2351 .await;
2352
2353 let other_id = PeerId::random();
2354 let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2355
2356 swarm2.dial(other_addr.clone()).unwrap();
2357
2358 let (peer_id, error) = future::poll_fn(|cx| {
2359 if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2360 swarm1.poll_next_unpin(cx)
2361 {}
2362
2363 match swarm2.poll_next_unpin(cx) {
2364 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2365 peer_id, error, ..
2366 })) => Poll::Ready((peer_id, error)),
2367 Poll::Ready(x) => panic!("unexpected {x:?}"),
2368 Poll::Pending => Poll::Pending,
2369 }
2370 })
2371 .await;
2372 assert_eq!(peer_id.unwrap(), other_id);
2373 match error {
2374 DialError::WrongPeerId { obtained, endpoint } => {
2375 assert_eq!(obtained, *swarm1.local_peer_id());
2376 assert_eq!(
2377 endpoint,
2378 ConnectedPoint::Dialer {
2379 address: other_addr,
2380 role_override: Endpoint::Dialer,
2381 }
2382 );
2383 }
2384 x => panic!("wrong error {x:?}"),
2385 }
2386 }
2387
2388 #[tokio::test]
2389 async fn dial_self() {
2390 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2401 swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2402
2403 let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2404 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2405 Poll::Pending => Poll::Pending,
2406 _ => panic!("Was expecting the listen address to be reported"),
2407 })
2408 .await;
2409
2410 swarm.listened_addrs.clear(); swarm.dial(local_address.clone()).unwrap();
2413
2414 let mut got_dial_err = false;
2415 let mut got_inc_err = false;
2416 future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2417 loop {
2418 match swarm.poll_next_unpin(cx) {
2419 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2420 peer_id,
2421 error: DialError::LocalPeerId { .. },
2422 ..
2423 })) => {
2424 assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2425 assert!(!got_dial_err);
2426 got_dial_err = true;
2427 if got_inc_err {
2428 return Poll::Ready(Ok(()));
2429 }
2430 }
2431 Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2432 local_addr, ..
2433 })) => {
2434 assert!(!got_inc_err);
2435 assert_eq!(local_addr, local_address);
2436 got_inc_err = true;
2437 if got_dial_err {
2438 return Poll::Ready(Ok(()));
2439 }
2440 }
2441 Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2442 assert_eq!(local_addr, local_address);
2443 }
2444 Poll::Ready(ev) => {
2445 panic!("Unexpected event: {ev:?}")
2446 }
2447 Poll::Pending => break Poll::Pending,
2448 }
2449 }
2450 })
2451 .await
2452 .unwrap();
2453 }
2454
2455 #[tokio::test]
2456 async fn dial_self_by_id() {
2457 let swarm = new_test_swarm(Config::with_tokio_executor());
2460 let peer_id = *swarm.local_peer_id();
2461 assert!(!swarm.is_connected(&peer_id));
2462 }
2463
2464 #[tokio::test]
2465 async fn multiple_addresses_err() {
2466 let target = PeerId::random();
2469
2470 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2471
2472 let addresses = HashSet::from([
2473 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2474 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2475 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2476 multiaddr![Udp(rand::random::<u16>())],
2477 multiaddr![Udp(rand::random::<u16>())],
2478 multiaddr![Udp(rand::random::<u16>())],
2479 multiaddr![Udp(rand::random::<u16>())],
2480 multiaddr![Udp(rand::random::<u16>())],
2481 ]);
2482
2483 swarm
2484 .dial(
2485 DialOpts::peer_id(target)
2486 .addresses(addresses.iter().cloned().collect())
2487 .build(),
2488 )
2489 .unwrap();
2490
2491 match swarm.next().await.unwrap() {
2492 SwarmEvent::OutgoingConnectionError {
2493 peer_id,
2494 error: DialError::Transport(errors),
2496 ..
2497 } => {
2498 assert_eq!(target, peer_id.unwrap());
2499
2500 let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2501 let expected_addresses = addresses
2502 .into_iter()
2503 .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2504 .collect::<Vec<_>>();
2505
2506 assert_eq!(expected_addresses, failed_addresses);
2507 }
2508 e => panic!("Unexpected event: {e:?}"),
2509 }
2510 }
2511
2512 #[tokio::test]
2513 async fn aborting_pending_connection_surfaces_error() {
2514 let _ = env_logger::try_init();
2515
2516 let mut dialer = new_test_swarm(Config::with_tokio_executor());
2517 let mut listener = new_test_swarm(Config::with_tokio_executor());
2518
2519 let listener_peer_id = *listener.local_peer_id();
2520 listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2521 let listener_address = match listener.next().await.unwrap() {
2522 SwarmEvent::NewListenAddr { address, .. } => address,
2523 e => panic!("Unexpected network event: {e:?}"),
2524 };
2525
2526 dialer
2527 .dial(
2528 DialOpts::peer_id(listener_peer_id)
2529 .addresses(vec![listener_address])
2530 .build(),
2531 )
2532 .unwrap();
2533
2534 dialer
2535 .disconnect_peer_id(listener_peer_id)
2536 .expect_err("Expect peer to not yet be connected.");
2537
2538 match dialer.next().await.unwrap() {
2539 SwarmEvent::OutgoingConnectionError {
2540 error: DialError::Aborted,
2541 ..
2542 } => {}
2543 e => panic!("Unexpected swarm event {e:?}."),
2544 }
2545 }
2546
2547 #[test]
2548 fn dial_error_prints_sources() {
2549 let error = DialError::Transport(vec![(
2551 "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2552 TransportError::Other(io::Error::new(
2553 io::ErrorKind::Other,
2554 MemoryTransportError::Unreachable,
2555 )),
2556 )]);
2557
2558 let string = format!("{error}");
2559
2560 assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2562 }
2563}