1use crate::handler::{self, Handler, InEvent};
22use crate::protocol::{Info, UpgradeError};
23use libp2p_core::multiaddr::Protocol;
24use libp2p_core::transport::PortUse;
25use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr};
26use libp2p_identity::PeerId;
27use libp2p_identity::PublicKey;
28use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
29use libp2p_swarm::{
30 ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
31 NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm,
32 _address_translation,
33};
34use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
35
36use std::collections::hash_map::Entry;
37use std::num::NonZeroUsize;
38use std::{
39 collections::{HashMap, HashSet, VecDeque},
40 task::Context,
41 task::Poll,
42 time::Duration,
43};
44
45fn is_quic_addr(addr: &Multiaddr, v1: bool) -> bool {
47 use Protocol::*;
48 let mut iter = addr.iter();
49 let Some(first) = iter.next() else {
50 return false;
51 };
52 let Some(second) = iter.next() else {
53 return false;
54 };
55 let Some(third) = iter.next() else {
56 return false;
57 };
58 let fourth = iter.next();
59 let fifth = iter.next();
60
61 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_))
62 && matches!(second, Udp(_))
63 && if v1 {
64 matches!(third, QuicV1)
65 } else {
66 matches!(third, Quic)
67 }
68 && matches!(fourth, Some(P2p(_)) | None)
69 && fifth.is_none()
70}
71
72fn is_tcp_addr(addr: &Multiaddr) -> bool {
73 use Protocol::*;
74
75 let mut iter = addr.iter();
76
77 let first = match iter.next() {
78 None => return false,
79 Some(p) => p,
80 };
81 let second = match iter.next() {
82 None => return false,
83 Some(p) => p,
84 };
85
86 matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
87}
88
89pub struct Behaviour {
95 config: Config,
96 connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
98
99 our_observed_addresses: HashMap<ConnectionId, Multiaddr>,
101
102 outbound_connections_with_ephemeral_port: HashSet<ConnectionId>,
104
105 events: VecDeque<ToSwarm<Event, InEvent>>,
107 discovered_peers: PeerCache,
109
110 listen_addresses: ListenAddresses,
111 external_addresses: ExternalAddresses,
112}
113
114#[non_exhaustive]
116#[derive(Debug, Clone)]
117pub struct Config {
118 pub protocol_version: String,
121 pub local_public_key: PublicKey,
123 pub agent_version: String,
128 pub interval: Duration,
134
135 pub push_listen_addr_updates: bool,
144
145 pub cache_size: usize,
150}
151
152impl Config {
153 pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self {
156 Self {
157 protocol_version,
158 agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
159 local_public_key,
160 interval: Duration::from_secs(5 * 60),
161 push_listen_addr_updates: false,
162 cache_size: 100,
163 }
164 }
165
166 pub fn with_agent_version(mut self, v: String) -> Self {
168 self.agent_version = v;
169 self
170 }
171
172 pub fn with_interval(mut self, d: Duration) -> Self {
175 self.interval = d;
176 self
177 }
178
179 pub fn with_push_listen_addr_updates(mut self, b: bool) -> Self {
183 self.push_listen_addr_updates = b;
184 self
185 }
186
187 pub fn with_cache_size(mut self, cache_size: usize) -> Self {
189 self.cache_size = cache_size;
190 self
191 }
192}
193
194impl Behaviour {
195 pub fn new(config: Config) -> Self {
197 let discovered_peers = match NonZeroUsize::new(config.cache_size) {
198 None => PeerCache::disabled(),
199 Some(size) => PeerCache::enabled(size),
200 };
201
202 Self {
203 config,
204 connected: HashMap::new(),
205 our_observed_addresses: Default::default(),
206 outbound_connections_with_ephemeral_port: Default::default(),
207 events: VecDeque::new(),
208 discovered_peers,
209 listen_addresses: Default::default(),
210 external_addresses: Default::default(),
211 }
212 }
213
214 pub fn push<I>(&mut self, peers: I)
216 where
217 I: IntoIterator<Item = PeerId>,
218 {
219 for p in peers {
220 if !self.connected.contains_key(&p) {
221 tracing::debug!(peer=%p, "Not pushing to peer because we are not connected");
222 continue;
223 }
224
225 self.events.push_back(ToSwarm::NotifyHandler {
226 peer_id: p,
227 handler: NotifyHandler::Any,
228 event: InEvent::Push,
229 });
230 }
231 }
232
233 fn on_connection_established(
234 &mut self,
235 ConnectionEstablished {
236 peer_id,
237 connection_id: conn,
238 endpoint,
239 failed_addresses,
240 ..
241 }: ConnectionEstablished,
242 ) {
243 let addr = match endpoint {
244 ConnectedPoint::Dialer { address, .. } => address.clone(),
245 ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
246 };
247
248 self.connected
249 .entry(peer_id)
250 .or_default()
251 .insert(conn, addr);
252
253 if let Some(cache) = self.discovered_peers.0.as_mut() {
254 for addr in failed_addresses {
255 cache.remove(&peer_id, addr);
256 }
257 }
258 }
259
260 fn all_addresses(&self) -> HashSet<Multiaddr> {
261 self.listen_addresses
262 .iter()
263 .chain(self.external_addresses.iter())
264 .cloned()
265 .collect()
266 }
267
268 fn emit_new_external_addr_candidate_event(
269 &mut self,
270 connection_id: ConnectionId,
271 observed: &Multiaddr,
272 ) {
273 if self
274 .outbound_connections_with_ephemeral_port
275 .contains(&connection_id)
276 {
277 let translated_addresses = {
280 let mut addrs: Vec<_> = self
281 .listen_addresses
282 .iter()
283 .filter_map(|server| {
284 if (is_tcp_addr(server) && is_tcp_addr(observed))
285 || (is_quic_addr(server, true) && is_quic_addr(observed, true))
286 || (is_quic_addr(server, false) && is_quic_addr(observed, false))
287 {
288 _address_translation(server, observed)
289 } else {
290 None
291 }
292 })
293 .collect();
294
295 addrs.sort_unstable();
297 addrs.dedup();
298 addrs
299 };
300
301 if translated_addresses.is_empty() {
303 self.events
304 .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
305 } else {
306 for addr in translated_addresses {
307 self.events
308 .push_back(ToSwarm::NewExternalAddrCandidate(addr));
309 }
310 }
311 return;
312 }
313
314 self.events
317 .push_back(ToSwarm::NewExternalAddrCandidate(observed.clone()));
318 }
319}
320
321impl NetworkBehaviour for Behaviour {
322 type ConnectionHandler = Handler;
323 type ToSwarm = Event;
324
325 fn handle_established_inbound_connection(
326 &mut self,
327 _: ConnectionId,
328 peer: PeerId,
329 _: &Multiaddr,
330 remote_addr: &Multiaddr,
331 ) -> Result<THandler<Self>, ConnectionDenied> {
332 Ok(Handler::new(
333 self.config.interval,
334 peer,
335 self.config.local_public_key.clone(),
336 self.config.protocol_version.clone(),
337 self.config.agent_version.clone(),
338 remote_addr.clone(),
339 self.all_addresses(),
340 ))
341 }
342
343 fn handle_established_outbound_connection(
344 &mut self,
345 connection_id: ConnectionId,
346 peer: PeerId,
347 addr: &Multiaddr,
348 _: Endpoint,
349 port_use: PortUse,
350 ) -> Result<THandler<Self>, ConnectionDenied> {
351 let mut addr = addr.clone();
355 if matches!(addr.iter().last(), Some(multiaddr::Protocol::P2p(_))) {
356 addr.pop();
357 }
358
359 if port_use == PortUse::New {
360 self.outbound_connections_with_ephemeral_port
361 .insert(connection_id);
362 }
363
364 Ok(Handler::new(
365 self.config.interval,
366 peer,
367 self.config.local_public_key.clone(),
368 self.config.protocol_version.clone(),
369 self.config.agent_version.clone(),
370 addr.clone(), self.all_addresses(),
372 ))
373 }
374
375 fn on_connection_handler_event(
376 &mut self,
377 peer_id: PeerId,
378 connection_id: ConnectionId,
379 event: THandlerOutEvent<Self>,
380 ) {
381 match event {
382 handler::Event::Identified(mut info) => {
383 info.listen_addrs
385 .retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));
386
387 let observed = info.observed_addr.clone();
388 self.events
389 .push_back(ToSwarm::GenerateEvent(Event::Received {
390 connection_id,
391 peer_id,
392 info: info.clone(),
393 }));
394
395 if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
396 for address in &info.listen_addrs {
397 if discovered_peers.add(peer_id, address.clone()) {
398 self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
399 peer_id,
400 address: address.clone(),
401 });
402 }
403 }
404 }
405
406 match self.our_observed_addresses.entry(connection_id) {
407 Entry::Vacant(not_yet_observed) => {
408 not_yet_observed.insert(observed.clone());
409 self.emit_new_external_addr_candidate_event(connection_id, &observed);
410 }
411 Entry::Occupied(already_observed) if already_observed.get() == &observed => {
412 }
414 Entry::Occupied(mut already_observed) => {
415 tracing::info!(
416 old_address=%already_observed.get(),
417 new_address=%observed,
418 "Our observed address on connection {connection_id} changed",
419 );
420
421 *already_observed.get_mut() = observed.clone();
422 self.emit_new_external_addr_candidate_event(connection_id, &observed);
423 }
424 }
425 }
426 handler::Event::Identification => {
427 self.events.push_back(ToSwarm::GenerateEvent(Event::Sent {
428 connection_id,
429 peer_id,
430 }));
431 }
432 handler::Event::IdentificationPushed(info) => {
433 self.events.push_back(ToSwarm::GenerateEvent(Event::Pushed {
434 connection_id,
435 peer_id,
436 info,
437 }));
438 }
439 handler::Event::IdentificationError(error) => {
440 self.events.push_back(ToSwarm::GenerateEvent(Event::Error {
441 connection_id,
442 peer_id,
443 error,
444 }));
445 }
446 }
447 }
448
449 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
450 fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
451 if let Some(event) = self.events.pop_front() {
452 return Poll::Ready(event);
453 }
454
455 Poll::Pending
456 }
457
458 fn handle_pending_outbound_connection(
459 &mut self,
460 _connection_id: ConnectionId,
461 maybe_peer: Option<PeerId>,
462 _addresses: &[Multiaddr],
463 _effective_role: Endpoint,
464 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
465 let peer = match maybe_peer {
466 None => return Ok(vec![]),
467 Some(peer) => peer,
468 };
469
470 Ok(self.discovered_peers.get(&peer))
471 }
472
473 fn on_swarm_event(&mut self, event: FromSwarm) {
474 let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
475 let external_addr_changed = self.external_addresses.on_swarm_event(&event);
476
477 if listen_addr_changed || external_addr_changed {
478 let change_events = self
480 .connected
481 .iter()
482 .flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
483 .map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
484 peer_id,
485 handler: NotifyHandler::One(*connection_id),
486 event: InEvent::AddressesChanged(self.all_addresses()),
487 })
488 .collect::<Vec<_>>();
489
490 self.events.extend(change_events)
491 }
492
493 if listen_addr_changed && self.config.push_listen_addr_updates {
494 let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
496 peer_id: *peer,
497 handler: NotifyHandler::Any,
498 event: InEvent::Push,
499 });
500
501 self.events.extend(push_events);
502 }
503
504 match event {
505 FromSwarm::ConnectionEstablished(connection_established) => {
506 self.on_connection_established(connection_established)
507 }
508 FromSwarm::ConnectionClosed(ConnectionClosed {
509 peer_id,
510 connection_id,
511 remaining_established,
512 ..
513 }) => {
514 if remaining_established == 0 {
515 self.connected.remove(&peer_id);
516 } else if let Some(addrs) = self.connected.get_mut(&peer_id) {
517 addrs.remove(&connection_id);
518 }
519
520 self.our_observed_addresses.remove(&connection_id);
521 self.outbound_connections_with_ephemeral_port
522 .remove(&connection_id);
523 }
524 FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
525 if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
526 (peer_id, self.discovered_peers.0.as_mut(), error)
527 {
528 for (addr, _error) in errors {
529 cache.remove(&peer_id, addr);
530 }
531 }
532 }
533 _ => {}
534 }
535 }
536}
537
538#[allow(clippy::large_enum_variant)]
540#[derive(Debug)]
541pub enum Event {
542 Received {
544 connection_id: ConnectionId,
546 peer_id: PeerId,
548 info: Info,
550 },
551 Sent {
554 connection_id: ConnectionId,
556 peer_id: PeerId,
558 },
559 Pushed {
562 connection_id: ConnectionId,
564 peer_id: PeerId,
566 info: Info,
569 },
570 Error {
572 connection_id: ConnectionId,
574 peer_id: PeerId,
576 error: StreamUpgradeError<UpgradeError>,
578 },
579}
580
581impl Event {
582 pub fn connection_id(&self) -> ConnectionId {
583 match self {
584 Event::Received { connection_id, .. }
585 | Event::Sent { connection_id, .. }
586 | Event::Pushed { connection_id, .. }
587 | Event::Error { connection_id, .. } => *connection_id,
588 }
589 }
590}
591
592fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
595 let last_component = addr.iter().last();
596 if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
597 return multi_addr_peer_id == *peer_id;
598 }
599 true
600}
601
602struct PeerCache(Option<PeerAddresses>);
603
604impl PeerCache {
605 fn disabled() -> Self {
606 Self(None)
607 }
608
609 fn enabled(size: NonZeroUsize) -> Self {
610 Self(Some(PeerAddresses::new(size)))
611 }
612
613 fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
614 if let Some(cache) = self.0.as_mut() {
615 cache.get(peer).collect()
616 } else {
617 Vec::new()
618 }
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 #[test]
627 fn check_multiaddr_matches_peer_id() {
628 let peer_id = PeerId::random();
629 let other_peer_id = PeerId::random();
630 let mut addr: Multiaddr = "/ip4/147.75.69.143/tcp/4001"
631 .parse()
632 .expect("failed to parse multiaddr");
633
634 let addr_without_peer_id: Multiaddr = addr.clone();
635 let mut addr_with_other_peer_id = addr.clone();
636
637 addr.push(multiaddr::Protocol::P2p(peer_id));
638 addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id));
639
640 assert!(multiaddr_matches_peer_id(&addr, &peer_id));
641 assert!(!multiaddr_matches_peer_id(
642 &addr_with_other_peer_id,
643 &peer_id
644 ));
645 assert!(multiaddr_matches_peer_id(&addr_without_peer_id, &peer_id));
646 }
647}