1use crate::{
22 addresses::PublicAddresses,
23 crypto::ed25519::Keypair,
24 error::ImmediateDialError,
25 executor::Executor,
26 protocol::ProtocolSet,
27 transport::manager::{
28 address::{AddressRecord, AddressStore},
29 types::{PeerContext, PeerState, SupportedTransport},
30 ProtocolContext, TransportManagerEvent, LOG_TARGET,
31 },
32 types::{protocol::ProtocolName, ConnectionId},
33 BandwidthSink, PeerId,
34};
35
36use multiaddr::{Multiaddr, Protocol};
37use parking_lot::RwLock;
38use tokio::sync::mpsc::{error::TrySendError, Sender};
39
40use std::{
41 collections::{HashMap, HashSet},
42 sync::{
43 atomic::{AtomicUsize, Ordering},
44 Arc,
45 },
46};
47
48pub enum InnerTransportManagerCommand {
51 DialPeer {
53 peer: PeerId,
55 },
56
57 DialAddress {
59 address: Multiaddr,
61 },
62}
63
64#[derive(Debug, Clone)]
66pub struct TransportManagerHandle {
67 local_peer_id: PeerId,
69
70 peers: Arc<RwLock<HashMap<PeerId, PeerContext>>>,
72
73 cmd_tx: Sender<InnerTransportManagerCommand>,
75
76 supported_transport: HashSet<SupportedTransport>,
78
79 listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
81
82 public_addresses: PublicAddresses,
84}
85
86impl TransportManagerHandle {
87 pub fn new(
89 local_peer_id: PeerId,
90 peers: Arc<RwLock<HashMap<PeerId, PeerContext>>>,
91 cmd_tx: Sender<InnerTransportManagerCommand>,
92 supported_transport: HashSet<SupportedTransport>,
93 listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
94 public_addresses: PublicAddresses,
95 ) -> Self {
96 Self {
97 peers,
98 cmd_tx,
99 local_peer_id,
100 supported_transport,
101 listen_addresses,
102 public_addresses,
103 }
104 }
105
106 pub(crate) fn register_transport(&mut self, transport: SupportedTransport) {
108 self.supported_transport.insert(transport);
109 }
110
111 pub(crate) fn public_addresses(&self) -> PublicAddresses {
113 self.public_addresses.clone()
114 }
115
116 pub(crate) fn listen_addresses(&self) -> HashSet<Multiaddr> {
118 self.listen_addresses.read().clone()
119 }
120
121 pub fn supported_transport(&self, address: &Multiaddr) -> bool {
123 let mut iter = address.iter();
124
125 match iter.next() {
126 Some(Protocol::Ip4(address)) =>
127 if address.is_unspecified() {
128 return false;
129 },
130 Some(Protocol::Ip6(address)) =>
131 if address.is_unspecified() {
132 return false;
133 },
134 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => {}
135 _ => return false,
136 }
137
138 match iter.next() {
139 None => false,
140 Some(Protocol::Tcp(_)) => match iter.next() {
141 Some(Protocol::P2p(_)) =>
142 self.supported_transport.contains(&SupportedTransport::Tcp),
143 #[cfg(feature = "websocket")]
144 Some(Protocol::Ws(_)) =>
145 self.supported_transport.contains(&SupportedTransport::WebSocket),
146 #[cfg(feature = "websocket")]
147 Some(Protocol::Wss(_)) =>
148 self.supported_transport.contains(&SupportedTransport::WebSocket),
149 _ => false,
150 },
151 #[cfg(feature = "quic")]
152 Some(Protocol::Udp(_)) => match (
153 iter.next(),
154 self.supported_transport.contains(&SupportedTransport::Quic),
155 ) {
156 (Some(Protocol::QuicV1), true) => true,
157 _ => false,
158 },
159 _ => false,
160 }
161 }
162
163 fn is_local_address(&self, address: &Multiaddr) -> bool {
165 let address: Multiaddr = address
166 .iter()
167 .take_while(|protocol| !std::matches!(protocol, Protocol::P2p(_)))
168 .collect();
169
170 self.listen_addresses.read().contains(&address)
171 }
172
173 pub fn add_known_address(
179 &mut self,
180 peer: &PeerId,
181 addresses: impl Iterator<Item = Multiaddr>,
182 ) -> usize {
183 let mut peers = self.peers.write();
184 let addresses = addresses
185 .filter_map(|address| {
186 (self.supported_transport(&address) && !self.is_local_address(&address))
187 .then_some(AddressRecord::from_multiaddr(address)?)
188 })
189 .collect::<HashSet<_>>();
190
191 let num_added = addresses.len();
193 if num_added == 0 {
194 tracing::debug!(
195 target: LOG_TARGET,
196 ?peer,
197 "didn't add any addresses for peer because transport is not supported",
198 );
199
200 return 0usize;
201 }
202
203 tracing::trace!(
204 target: LOG_TARGET,
205 ?peer,
206 ?addresses,
207 "add known addresses",
208 );
209
210 match peers.get_mut(peer) {
211 Some(context) =>
212 for record in addresses {
213 if !context.addresses.contains(record.address()) {
214 context.addresses.insert(record);
215 }
216 },
217 None => {
218 peers.insert(
219 *peer,
220 PeerContext {
221 state: PeerState::Disconnected { dial_record: None },
222 addresses: AddressStore::from_iter(addresses),
223 secondary_connection: None,
224 },
225 );
226 }
227 }
228
229 num_added
230 }
231
232 pub fn dial(&self, peer: &PeerId) -> Result<(), ImmediateDialError> {
236 if peer == &self.local_peer_id {
237 return Err(ImmediateDialError::TriedToDialSelf);
238 }
239
240 {
241 match self.peers.read().get(peer) {
242 Some(PeerContext {
243 state: PeerState::Connected { .. },
244 ..
245 }) => return Err(ImmediateDialError::AlreadyConnected),
246 Some(PeerContext {
247 state: PeerState::Disconnected { dial_record },
248 addresses,
249 ..
250 }) => {
251 if addresses.is_empty() {
252 return Err(ImmediateDialError::NoAddressAvailable);
253 }
254
255 if dial_record.is_some() {
257 tracing::debug!(
258 target: LOG_TARGET,
259 ?peer,
260 ?dial_record,
261 "peer is aready being dialed",
262 );
263 return Ok(());
264 }
265 }
266 Some(PeerContext {
267 state: PeerState::Dialing { .. } | PeerState::Opening { .. },
268 ..
269 }) => return Ok(()),
270 None => return Err(ImmediateDialError::NoAddressAvailable),
271 }
272 }
273
274 self.cmd_tx
275 .try_send(InnerTransportManagerCommand::DialPeer { peer: *peer })
276 .map_err(|error| match error {
277 TrySendError::Full(_) => ImmediateDialError::ChannelClogged,
278 TrySendError::Closed(_) => ImmediateDialError::TaskClosed,
279 })
280 }
281
282 pub fn dial_address(&self, address: Multiaddr) -> Result<(), ImmediateDialError> {
286 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
287 return Err(ImmediateDialError::PeerIdMissing);
288 }
289
290 self.cmd_tx
291 .try_send(InnerTransportManagerCommand::DialAddress { address })
292 .map_err(|error| match error {
293 TrySendError::Full(_) => ImmediateDialError::ChannelClogged,
294 TrySendError::Closed(_) => ImmediateDialError::TaskClosed,
295 })
296 }
297}
298
299pub struct TransportHandle {
301 pub keypair: Keypair,
302 pub tx: Sender<TransportManagerEvent>,
303 pub protocols: HashMap<ProtocolName, ProtocolContext>,
304 pub next_connection_id: Arc<AtomicUsize>,
305 pub next_substream_id: Arc<AtomicUsize>,
306 pub bandwidth_sink: BandwidthSink,
307 pub executor: Arc<dyn Executor>,
308}
309
310impl TransportHandle {
311 pub fn protocol_set(&self, connection_id: ConnectionId) -> ProtocolSet {
312 ProtocolSet::new(
313 connection_id,
314 self.tx.clone(),
315 self.next_substream_id.clone(),
316 self.protocols.clone(),
317 )
318 }
319
320 pub fn next_connection_id(&mut self) -> ConnectionId {
322 let connection_id = self.next_connection_id.fetch_add(1usize, Ordering::Relaxed);
323
324 ConnectionId::from(connection_id)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use multihash::Multihash;
332 use parking_lot::lock_api::RwLock;
333 use tokio::sync::mpsc::{channel, Receiver};
334
335 fn make_transport_manager_handle() -> (
336 TransportManagerHandle,
337 Receiver<InnerTransportManagerCommand>,
338 ) {
339 let (cmd_tx, cmd_rx) = channel(64);
340
341 let local_peer_id = PeerId::random();
342 (
343 TransportManagerHandle {
344 local_peer_id,
345 cmd_tx,
346 peers: Default::default(),
347 supported_transport: HashSet::new(),
348 listen_addresses: Default::default(),
349 public_addresses: PublicAddresses::new(local_peer_id),
350 },
351 cmd_rx,
352 )
353 }
354
355 #[tokio::test]
356 async fn tcp_supported() {
357 let (mut handle, _rx) = make_transport_manager_handle();
358 handle.supported_transport.insert(SupportedTransport::Tcp);
359
360 let address =
361 "/dns4/google.com/tcp/24928/p2p/12D3KooWKrUnV42yDR7G6DewmgHtFaVCJWLjQRi2G9t5eJD3BvTy"
362 .parse()
363 .unwrap();
364 assert!(handle.supported_transport(&address));
365 }
366
367 #[cfg(feature = "websocket")]
368 #[tokio::test]
369 async fn websocket_supported() {
370 let (mut handle, _rx) = make_transport_manager_handle();
371 handle.supported_transport.insert(SupportedTransport::WebSocket);
372
373 let address =
374 "/dns4/google.com/tcp/24928/ws/p2p/12D3KooWKrUnV42yDR7G6DewmgHtFaVCJWLjQRi2G9t5eJD3BvTy"
375 .parse()
376 .unwrap();
377 assert!(handle.supported_transport(&address));
378 }
379
380 #[test]
381 fn transport_not_supported() {
382 let (handle, _rx) = make_transport_manager_handle();
383
384 assert!(!handle.supported_transport(
386 &Multiaddr::empty().with(Protocol::P2p(Multihash::from(PeerId::random())))
387 ));
388
389 assert!(!handle.supported_transport(
391 &Multiaddr::empty().with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
392 ));
393
394 assert!(!handle.supported_transport(
396 &Multiaddr::empty()
397 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
398 .with(Protocol::Udp(8888))
399 .with(Protocol::Utp)
400 ));
401
402 assert!(!handle.supported_transport(
404 &Multiaddr::empty()
405 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
406 .with(Protocol::Sctp(8888))
407 ));
408 }
409
410 #[test]
411 fn zero_addresses_added() {
412 let (mut handle, _rx) = make_transport_manager_handle();
413 handle.supported_transport.insert(SupportedTransport::Tcp);
414
415 assert!(
416 handle.add_known_address(
417 &PeerId::random(),
418 vec![
419 Multiaddr::empty()
420 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
421 .with(Protocol::Udp(8888))
422 .with(Protocol::Utp),
423 Multiaddr::empty()
424 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
425 .with(Protocol::Tcp(8888))
426 .with(Protocol::Wss(std::borrow::Cow::Owned("/".to_string()))),
427 ]
428 .into_iter()
429 ) == 0usize
430 );
431 }
432
433 #[tokio::test]
434 async fn dial_already_connected_peer() {
435 let (mut handle, _rx) = make_transport_manager_handle();
436 handle.supported_transport.insert(SupportedTransport::Tcp);
437
438 let peer = {
439 let peer = PeerId::random();
440 let mut peers = handle.peers.write();
441
442 peers.insert(
443 peer,
444 PeerContext {
445 state: PeerState::Connected {
446 record: AddressRecord::from_multiaddr(
447 Multiaddr::empty()
448 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
449 .with(Protocol::Tcp(8888))
450 .with(Protocol::P2p(Multihash::from(peer))),
451 )
452 .unwrap(),
453 dial_record: None,
454 },
455 secondary_connection: None,
456 addresses: AddressStore::from_iter(
457 vec![Multiaddr::empty()
458 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
459 .with(Protocol::Tcp(8888))
460 .with(Protocol::P2p(Multihash::from(peer)))]
461 .into_iter(),
462 ),
463 },
464 );
465 drop(peers);
466
467 peer
468 };
469
470 match handle.dial(&peer) {
471 Err(ImmediateDialError::AlreadyConnected) => {}
472 _ => panic!("invalid return value"),
473 }
474 }
475
476 #[tokio::test]
477 async fn peer_already_being_dialed() {
478 let (mut handle, _rx) = make_transport_manager_handle();
479 handle.supported_transport.insert(SupportedTransport::Tcp);
480
481 let peer = {
482 let peer = PeerId::random();
483 let mut peers = handle.peers.write();
484
485 peers.insert(
486 peer,
487 PeerContext {
488 state: PeerState::Dialing {
489 record: AddressRecord::from_multiaddr(
490 Multiaddr::empty()
491 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
492 .with(Protocol::Tcp(8888))
493 .with(Protocol::P2p(Multihash::from(peer))),
494 )
495 .unwrap(),
496 },
497 secondary_connection: None,
498 addresses: AddressStore::from_iter(
499 vec![Multiaddr::empty()
500 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
501 .with(Protocol::Tcp(8888))
502 .with(Protocol::P2p(Multihash::from(peer)))]
503 .into_iter(),
504 ),
505 },
506 );
507 drop(peers);
508
509 peer
510 };
511
512 match handle.dial(&peer) {
513 Ok(()) => {}
514 _ => panic!("invalid return value"),
515 }
516 }
517
518 #[tokio::test]
519 async fn no_address_available_for_peer() {
520 let (mut handle, _rx) = make_transport_manager_handle();
521 handle.supported_transport.insert(SupportedTransport::Tcp);
522
523 let peer = {
524 let peer = PeerId::random();
525 let mut peers = handle.peers.write();
526
527 peers.insert(
528 peer,
529 PeerContext {
530 state: PeerState::Disconnected { dial_record: None },
531 secondary_connection: None,
532 addresses: AddressStore::new(),
533 },
534 );
535 drop(peers);
536
537 peer
538 };
539
540 let err = handle.dial(&peer).unwrap_err();
541 assert!(matches!(err, ImmediateDialError::NoAddressAvailable));
542 }
543
544 #[tokio::test]
545 async fn pending_connection_for_disconnected_peer() {
546 let (mut handle, mut rx) = make_transport_manager_handle();
547 handle.supported_transport.insert(SupportedTransport::Tcp);
548
549 let peer = {
550 let peer = PeerId::random();
551 let mut peers = handle.peers.write();
552
553 peers.insert(
554 peer,
555 PeerContext {
556 state: PeerState::Disconnected {
557 dial_record: Some(
558 AddressRecord::from_multiaddr(
559 Multiaddr::empty()
560 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
561 .with(Protocol::Tcp(8888))
562 .with(Protocol::P2p(Multihash::from(peer))),
563 )
564 .unwrap(),
565 ),
566 },
567 secondary_connection: None,
568 addresses: AddressStore::from_iter(
569 vec![Multiaddr::empty()
570 .with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
571 .with(Protocol::Tcp(8888))
572 .with(Protocol::P2p(Multihash::from(peer)))]
573 .into_iter(),
574 ),
575 },
576 );
577 drop(peers);
578
579 peer
580 };
581
582 match handle.dial(&peer) {
583 Ok(()) => {}
584 _ => panic!("invalid return value"),
585 }
586 assert!(rx.try_recv().is_err());
587 }
588
589 #[tokio::test]
590 async fn try_to_dial_self() {
591 let (mut handle, mut rx) = make_transport_manager_handle();
592 handle.supported_transport.insert(SupportedTransport::Tcp);
593
594 let err = handle.dial(&handle.local_peer_id).unwrap_err();
595 assert_eq!(err, ImmediateDialError::TriedToDialSelf);
596
597 assert!(rx.try_recv().is_err());
598 }
599
600 #[test]
601 fn is_local_address() {
602 let (cmd_tx, _cmd_rx) = channel(64);
603
604 let local_peer_id = PeerId::random();
605 let first_addr: Multiaddr = "/ip6/::1/tcp/8888".parse().expect("valid multiaddress");
606 let second_addr: Multiaddr = "/ip4/127.0.0.1/tcp/8888".parse().expect("valid multiaddress");
607
608 let listen_addresses = Arc::new(RwLock::new(
609 [first_addr.clone(), second_addr.clone()].iter().cloned().collect(),
610 ));
611 println!("{:?}", listen_addresses);
612
613 let handle = TransportManagerHandle {
614 local_peer_id,
615 cmd_tx,
616 peers: Default::default(),
617 supported_transport: HashSet::new(),
618 listen_addresses,
619 public_addresses: PublicAddresses::new(local_peer_id),
620 };
621
622 assert!(handle.is_local_address(
624 &"/ip6/::1/tcp/8888".parse::<Multiaddr>().expect("valid multiaddress")
625 ));
626 assert!(handle
627 .is_local_address(&"/ip4/127.0.0.1/tcp/8888".parse().expect("valid multiaddress")));
628 assert!(handle.is_local_address(
629 &"/ip6/::1/tcp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
630 .parse()
631 .expect("valid multiaddress")
632 ));
633 assert!(handle.is_local_address(
634 &"/ip4/127.0.0.1/tcp/8888/p2p/12D3KooWT2ouvz5uMmCvHJGzAGRHiqDts5hzXR7NdoQ27pGdzp9Q"
635 .parse()
636 .expect("valid multiaddress")
637 ));
638
639 assert!(handle.is_local_address(
641 &"/ip6/::1/tcp/8888/p2p/12D3KooWPGxxxQiBEBZ52RY31Z2chn4xsDrGCMouZ88izJrak2T1"
642 .parse::<Multiaddr>()
643 .expect("valid multiaddress")
644 ));
645 assert!(handle.is_local_address(
646 &"/ip4/127.0.0.1/tcp/8888/p2p/12D3KooWPGxxxQiBEBZ52RY31Z2chn4xsDrGCMouZ88izJrak2T1"
647 .parse()
648 .expect("valid multiaddress")
649 ));
650
651 assert!(!handle
653 .is_local_address(&"/ip4/127.0.0.1/tcp/9999".parse().expect("valid multiaddress")));
654 assert!(!handle
656 .is_local_address(&"/ip4/127.0.0.1/tcp/7777".parse().expect("valid multiaddress")));
657 }
658}