1use crate::{utils::interval, LOG_TARGET};
23use either::Either;
24
25use fnv::FnvHashMap;
26use futures::prelude::*;
27use libp2p::{
28 core::{transport::PortUse, ConnectedPoint, Endpoint},
29 identify::{
30 Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent,
31 Info as IdentifyInfo,
32 },
33 identity::PublicKey,
34 multiaddr::Protocol,
35 ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent},
36 swarm::{
37 behaviour::{
38 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
39 ListenFailure,
40 },
41 ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId,
42 NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
43 },
44 Multiaddr, PeerId,
45};
46use log::{debug, error, trace, warn};
47use parking_lot::Mutex;
48use schnellru::{ByLength, LruMap};
49use smallvec::SmallVec;
50
51use std::{
52 collections::{hash_map::Entry, HashSet, VecDeque},
53 iter,
54 pin::Pin,
55 sync::Arc,
56 task::{Context, Poll},
57 time::{Duration, Instant},
58};
59
60const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
62const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
64const MAX_EXTERNAL_ADDRESSES: u32 = 32;
66const MIN_ADDRESS_CONFIRMATIONS: usize = 3;
69
70pub struct PeerInfoBehaviour {
72 ping: Ping,
74 identify: Identify,
76 nodes_info: FnvHashMap<PeerId, NodeInfo>,
78 garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
80 local_peer_id: PeerId,
82 public_addresses: Vec<Multiaddr>,
84 listen_addresses: HashSet<Multiaddr>,
86 address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
88 external_addresses: ExternalAddresses,
91 pending_actions: VecDeque<ToSwarm<PeerInfoEvent, THandlerInEvent<PeerInfoBehaviour>>>,
93}
94
95#[derive(Debug)]
97struct NodeInfo {
98 info_expire: Option<Instant>,
101 endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>,
103 client_version: Option<String>,
105 latest_ping: Option<Duration>,
107}
108
109impl NodeInfo {
110 fn new(endpoint: ConnectedPoint) -> Self {
111 let mut endpoints = SmallVec::new();
112 endpoints.push(endpoint);
113 Self { info_expire: None, endpoints, client_version: None, latest_ping: None }
114 }
115}
116
117#[derive(Debug, Clone, Default)]
119pub struct ExternalAddresses {
120 addresses: Arc<Mutex<HashSet<Multiaddr>>>,
121}
122
123impl ExternalAddresses {
124 pub fn add(&mut self, addr: Multiaddr) -> bool {
126 self.addresses.lock().insert(addr)
127 }
128
129 pub fn remove(&mut self, addr: &Multiaddr) -> bool {
131 self.addresses.lock().remove(addr)
132 }
133}
134
135impl PeerInfoBehaviour {
136 pub fn new(
138 user_agent: String,
139 local_public_key: PublicKey,
140 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
141 public_addresses: Vec<Multiaddr>,
142 ) -> Self {
143 let identify = {
144 let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key.clone())
145 .with_agent_version(user_agent)
146 .with_cache_size(0);
148 Identify::new(cfg)
149 };
150
151 Self {
152 ping: Ping::new(PingConfig::new()),
153 identify,
154 nodes_info: FnvHashMap::default(),
155 garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
156 local_peer_id: local_public_key.to_peer_id(),
157 public_addresses,
158 listen_addresses: HashSet::new(),
159 address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
160 external_addresses: ExternalAddresses { addresses: external_addresses },
161 pending_actions: Default::default(),
162 }
163 }
164
165 pub fn node(&self, peer_id: &PeerId) -> Option<Node> {
171 self.nodes_info.get(peer_id).map(Node)
172 }
173
174 fn handle_ping_report(
177 &mut self,
178 peer_id: &PeerId,
179 ping_time: Duration,
180 connection: ConnectionId,
181 ) {
182 trace!(target: LOG_TARGET, "Ping time with {:?} via {:?}: {:?}", peer_id, connection, ping_time);
183 if let Some(entry) = self.nodes_info.get_mut(peer_id) {
184 entry.latest_ping = Some(ping_time);
185 } else {
186 error!(target: LOG_TARGET,
187 "Received ping from node we're not connected to {:?} via {:?}", peer_id, connection);
188 }
189 }
190
191 fn with_local_peer_id(&self, address: Multiaddr) -> Result<Multiaddr, Multiaddr> {
194 if let Some(Protocol::P2p(peer_id)) = address.iter().last() {
195 if peer_id == self.local_peer_id {
196 Ok(address)
197 } else {
198 Err(address)
199 }
200 } else {
201 Ok(address.with(Protocol::P2p(self.local_peer_id)))
202 }
203 }
204
205 fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
208 trace!(target: LOG_TARGET, "Identified {:?} => {:?}", peer_id, info);
209 if let Some(entry) = self.nodes_info.get_mut(peer_id) {
210 entry.client_version = Some(info.agent_version.clone());
211 } else {
212 error!(target: LOG_TARGET,
213 "Received identify message from node we're not connected to {peer_id:?}");
214 }
215 match self.with_local_peer_id(info.observed_addr.clone()) {
217 Ok(observed_addr) => {
218 let (is_new, expired) = self.is_new_external_address(&observed_addr, *peer_id);
219 if is_new && self.external_addresses.add(observed_addr.clone()) {
220 trace!(
221 target: LOG_TARGET,
222 "Observed address reported by Identify confirmed as external {}",
223 observed_addr,
224 );
225 self.pending_actions.push_back(ToSwarm::ExternalAddrConfirmed(observed_addr));
226 }
227 if let Some(expired) = expired {
228 trace!(target: LOG_TARGET, "Removing replaced external address: {expired}");
229 self.external_addresses.remove(&expired);
230 self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(expired));
231 }
232 },
233 Err(addr) => {
234 warn!(
235 target: LOG_TARGET,
236 "Identify reported observed address for a peer that is not us: {addr}",
237 );
238 },
239 }
240 }
241
242 fn is_same_address(left: &Multiaddr, right: &Multiaddr) -> bool {
245 let mut left = left.iter();
246 let mut right = right.iter();
247
248 loop {
249 match (left.next(), right.next()) {
250 (None, None) => return true,
251 (None, Some(Protocol::P2p(_))) => return true,
252 (Some(Protocol::P2p(_)), None) => return true,
253 (left, right) if left != right => return false,
254 _ => {},
255 }
256 }
257 }
258
259 fn is_new_external_address(
263 &mut self,
264 address: &Multiaddr,
265 peer_id: PeerId,
266 ) -> (bool, Option<Multiaddr>) {
267 trace!(target: LOG_TARGET, "Verify new external address: {address}");
268
269 if self
274 .listen_addresses
275 .iter()
276 .chain(self.public_addresses.iter())
277 .any(|known_address| PeerInfoBehaviour::is_same_address(&known_address, &address))
278 {
279 return (true, None)
280 }
281
282 match self.address_confirmations.get(address) {
283 Some(confirmations) => {
284 confirmations.insert(peer_id);
285
286 if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
287 return (true, None)
288 }
289 },
290 None => {
291 let oldest = (self.address_confirmations.len() >=
292 self.address_confirmations.limiter().max_length() as usize)
293 .then(|| {
294 self.address_confirmations.pop_oldest().map(|(address, peers)| {
295 if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
296 return Some(address)
297 } else {
298 None
299 }
300 })
301 })
302 .flatten()
303 .flatten();
304
305 self.address_confirmations
306 .insert(address.clone(), iter::once(peer_id).collect());
307
308 return (false, oldest)
309 },
310 }
311
312 (false, None)
313 }
314}
315
316pub struct Node<'a>(&'a NodeInfo);
318
319impl<'a> Node<'a> {
320 pub fn endpoint(&self) -> Option<&'a ConnectedPoint> {
324 self.0.endpoints.get(0)
325 }
326
327 pub fn client_version(&self) -> Option<&'a str> {
329 self.0.client_version.as_deref()
330 }
331
332 pub fn latest_ping(&self) -> Option<Duration> {
335 self.0.latest_ping
336 }
337}
338
339#[derive(Debug)]
341pub enum PeerInfoEvent {
342 Identified {
345 peer_id: PeerId,
347 info: IdentifyInfo,
349 },
350}
351
352impl NetworkBehaviour for PeerInfoBehaviour {
353 type ConnectionHandler = ConnectionHandlerSelect<
354 <Ping as NetworkBehaviour>::ConnectionHandler,
355 <Identify as NetworkBehaviour>::ConnectionHandler,
356 >;
357 type ToSwarm = PeerInfoEvent;
358
359 fn handle_pending_inbound_connection(
360 &mut self,
361 connection_id: ConnectionId,
362 local_addr: &Multiaddr,
363 remote_addr: &Multiaddr,
364 ) -> Result<(), ConnectionDenied> {
365 self.ping
366 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
367 self.identify
368 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
369 }
370
371 fn handle_pending_outbound_connection(
372 &mut self,
373 _connection_id: ConnectionId,
374 _maybe_peer: Option<PeerId>,
375 _addresses: &[Multiaddr],
376 _effective_role: Endpoint,
377 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
378 Ok(Vec::new())
381 }
382
383 fn handle_established_inbound_connection(
384 &mut self,
385 connection_id: ConnectionId,
386 peer: PeerId,
387 local_addr: &Multiaddr,
388 remote_addr: &Multiaddr,
389 ) -> Result<THandler<Self>, ConnectionDenied> {
390 let ping_handler = self.ping.handle_established_inbound_connection(
391 connection_id,
392 peer,
393 local_addr,
394 remote_addr,
395 )?;
396 let identify_handler = self.identify.handle_established_inbound_connection(
397 connection_id,
398 peer,
399 local_addr,
400 remote_addr,
401 )?;
402 Ok(ping_handler.select(identify_handler))
403 }
404
405 fn handle_established_outbound_connection(
406 &mut self,
407 connection_id: ConnectionId,
408 peer: PeerId,
409 addr: &Multiaddr,
410 role_override: Endpoint,
411 port_use: PortUse,
412 ) -> Result<THandler<Self>, ConnectionDenied> {
413 let ping_handler = self.ping.handle_established_outbound_connection(
414 connection_id,
415 peer,
416 addr,
417 role_override,
418 port_use,
419 )?;
420 let identify_handler = self.identify.handle_established_outbound_connection(
421 connection_id,
422 peer,
423 addr,
424 role_override,
425 port_use,
426 )?;
427 Ok(ping_handler.select(identify_handler))
428 }
429
430 fn on_swarm_event(&mut self, event: FromSwarm) {
431 match event {
432 FromSwarm::ConnectionEstablished(
433 e @ ConnectionEstablished { peer_id, endpoint, .. },
434 ) => {
435 self.ping.on_swarm_event(FromSwarm::ConnectionEstablished(e));
436 self.identify.on_swarm_event(FromSwarm::ConnectionEstablished(e));
437
438 match self.nodes_info.entry(peer_id) {
439 Entry::Vacant(e) => {
440 e.insert(NodeInfo::new(endpoint.clone()));
441 },
442 Entry::Occupied(e) => {
443 let e = e.into_mut();
444 if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false)
445 {
446 e.client_version = None;
447 e.latest_ping = None;
448 }
449 e.info_expire = None;
450 e.endpoints.push(endpoint.clone());
451 },
452 }
453 },
454 FromSwarm::ConnectionClosed(ConnectionClosed {
455 peer_id,
456 connection_id,
457 endpoint,
458 cause,
459 remaining_established,
460 }) => {
461 self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
462 peer_id,
463 connection_id,
464 endpoint,
465 cause,
466 remaining_established,
467 }));
468 self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
469 peer_id,
470 connection_id,
471 endpoint,
472 cause,
473 remaining_established,
474 }));
475
476 if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
477 if remaining_established == 0 {
478 entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
479 }
480 entry.endpoints.retain(|ep| ep != endpoint)
481 } else {
482 error!(target: LOG_TARGET,
483 "Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
484 }
485 },
486 FromSwarm::DialFailure(DialFailure { peer_id, error, connection_id }) => {
487 self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure {
488 peer_id,
489 error,
490 connection_id,
491 }));
492 self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure {
493 peer_id,
494 error,
495 connection_id,
496 }));
497 },
498 FromSwarm::ListenerClosed(e) => {
499 self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
500 self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
501 },
502 FromSwarm::ListenFailure(ListenFailure {
503 local_addr,
504 send_back_addr,
505 error,
506 connection_id,
507 peer_id,
508 }) => {
509 self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
510 local_addr,
511 send_back_addr,
512 error,
513 connection_id,
514 peer_id,
515 }));
516 self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
517 local_addr,
518 send_back_addr,
519 error,
520 connection_id,
521 peer_id,
522 }));
523 },
524 FromSwarm::ListenerError(e) => {
525 self.ping.on_swarm_event(FromSwarm::ListenerError(e));
526 self.identify.on_swarm_event(FromSwarm::ListenerError(e));
527 },
528 FromSwarm::ExternalAddrExpired(e) => {
529 self.ping.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
530 self.identify.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
531 },
532 FromSwarm::NewListener(e) => {
533 self.ping.on_swarm_event(FromSwarm::NewListener(e));
534 self.identify.on_swarm_event(FromSwarm::NewListener(e));
535 },
536 FromSwarm::NewListenAddr(e) => {
537 self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
538 self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
539 self.listen_addresses.insert(e.addr.clone());
540 },
541 FromSwarm::ExpiredListenAddr(e) => {
542 self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
543 self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
544 self.listen_addresses.remove(e.addr);
545 match self.with_local_peer_id(e.addr.clone()) {
547 Ok(addr) => {
548 self.external_addresses.remove(&addr);
549 self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(addr));
550 },
551 Err(addr) => {
552 warn!(
553 target: LOG_TARGET,
554 "Listen address expired with peer ID that is not us: {addr}",
555 );
556 },
557 }
558 },
559 FromSwarm::NewExternalAddrCandidate(e) => {
560 self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
561 self.identify.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
562 },
563 FromSwarm::ExternalAddrConfirmed(e) => {
564 self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
565 self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
566 },
567 FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
568 self.ping.on_swarm_event(FromSwarm::AddressChange(e));
569 self.identify.on_swarm_event(FromSwarm::AddressChange(e));
570
571 if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
572 if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
573 *endpoint = new.clone();
574 } else {
575 error!(target: LOG_TARGET,
576 "Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
577 }
578 } else {
579 error!(target: LOG_TARGET,
580 "Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
581 }
582 },
583 FromSwarm::NewExternalAddrOfPeer(e) => {
584 self.ping.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
585 self.identify.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
586 },
587 event => {
588 debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
589 self.ping.on_swarm_event(event);
590 self.identify.on_swarm_event(event);
591 },
592 }
593 }
594
595 fn on_connection_handler_event(
596 &mut self,
597 peer_id: PeerId,
598 connection_id: ConnectionId,
599 event: THandlerOutEvent<Self>,
600 ) {
601 match event {
602 Either::Left(event) =>
603 self.ping.on_connection_handler_event(peer_id, connection_id, event),
604 Either::Right(event) =>
605 self.identify.on_connection_handler_event(peer_id, connection_id, event),
606 }
607 }
608
609 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
610 if let Some(event) = self.pending_actions.pop_front() {
611 return Poll::Ready(event)
612 }
613
614 loop {
615 match self.ping.poll(cx) {
616 Poll::Pending => break,
617 Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
618 if let PingEvent { peer, result: Ok(rtt), connection } = ev {
619 self.handle_ping_report(&peer, rtt, connection)
620 }
621 },
622 Poll::Ready(event) => {
623 return Poll::Ready(event.map_in(Either::Left).map_out(|_| {
624 unreachable!("`GenerateEvent` is handled in a branch above; qed")
625 }));
626 },
627 }
628 }
629
630 loop {
631 match self.identify.poll(cx) {
632 Poll::Pending => break,
633 Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
634 IdentifyEvent::Received { peer_id, info, .. } => {
635 self.handle_identify_report(&peer_id, &info);
636 let event = PeerInfoEvent::Identified { peer_id, info };
637 return Poll::Ready(ToSwarm::GenerateEvent(event))
638 },
639 IdentifyEvent::Error { connection_id, peer_id, error } => {
640 debug!(
641 target: LOG_TARGET,
642 "Identification with peer {peer_id:?}({connection_id}) failed => {error}"
643 );
644 },
645 IdentifyEvent::Pushed { .. } => {},
646 IdentifyEvent::Sent { .. } => {},
647 },
648 Poll::Ready(event) => {
649 return Poll::Ready(event.map_in(Either::Right).map_out(|_| {
650 unreachable!("`GenerateEvent` is handled in a branch above; qed")
651 }));
652 },
653 }
654 }
655
656 while let Poll::Ready(Some(())) = self.garbage_collect.poll_next_unpin(cx) {
657 self.nodes_info.retain(|_, node| {
658 node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
659 });
660 }
661
662 Poll::Pending
663 }
664}