referrerpolicy=no-referrer-when-downgrade

sc_network/
peer_info.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PeerInfoBehaviour`] is implementation of `NetworkBehaviour` that holds information about peers
20//! in cache.
21
22use crate::{utils::interval, LOG_TARGET};
23use either::Either;
24
25use fnv::FnvHashMap;
26use futures::prelude::*;
27use libp2p::{
28	core::{transport::PortUse, ConnectedPoint, Endpoint},
29	identify::{
30		Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent,
31		Info as IdentifyInfo,
32	},
33	identity::PublicKey,
34	multiaddr::Protocol,
35	ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent},
36	swarm::{
37		behaviour::{
38			AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm,
39			ListenFailure,
40		},
41		ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId,
42		NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
43	},
44	Multiaddr, PeerId,
45};
46use log::{debug, error, trace, warn};
47use parking_lot::Mutex;
48use schnellru::{ByLength, LruMap};
49use smallvec::SmallVec;
50
51use std::{
52	collections::{hash_map::Entry, HashSet, VecDeque},
53	iter,
54	pin::Pin,
55	sync::Arc,
56	task::{Context, Poll},
57	time::{Duration, Instant},
58};
59
60/// Time after we disconnect from a node before we purge its information from the cache.
61const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
62/// Interval at which we perform garbage collection on the node info.
63const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
64/// The maximum number of tracked external addresses we allow.
65const MAX_EXTERNAL_ADDRESSES: u32 = 32;
66/// Number of times observed address is received from different peers before it is confirmed as
67/// external.
68const MIN_ADDRESS_CONFIRMATIONS: usize = 3;
69
70/// Implementation of `NetworkBehaviour` that holds information about peers in cache.
71pub struct PeerInfoBehaviour {
72	/// Periodically ping nodes, and close the connection if it's unresponsive.
73	ping: Ping,
74	/// Periodically identifies the remote and responds to incoming requests.
75	identify: Identify,
76	/// Information that we know about all nodes.
77	nodes_info: FnvHashMap<PeerId, NodeInfo>,
78	/// Interval at which we perform garbage collection in `nodes_info`.
79	garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
80	/// PeerId of the local node.
81	local_peer_id: PeerId,
82	/// Public addresses supplied by the operator. Never expire.
83	public_addresses: Vec<Multiaddr>,
84	/// Listen addresses. External addresses matching listen addresses never expire.
85	listen_addresses: HashSet<Multiaddr>,
86	/// External address confirmations.
87	address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,
88	/// Record keeping of external addresses. Data is queried by the `NetworkService`.
89	/// The addresses contain the `/p2p/...` part with local peer ID.
90	external_addresses: ExternalAddresses,
91	/// Pending events to emit to [`Swarm`](libp2p::swarm::Swarm).
92	pending_actions: VecDeque<ToSwarm<PeerInfoEvent, THandlerInEvent<PeerInfoBehaviour>>>,
93}
94
95/// Information about a node we're connected to.
96#[derive(Debug)]
97struct NodeInfo {
98	/// When we will remove the entry about this node from the list, or `None` if we're connected
99	/// to the node.
100	info_expire: Option<Instant>,
101	/// Non-empty list of connected endpoints, one per connection.
102	endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>,
103	/// Version reported by the remote, or `None` if unknown.
104	client_version: Option<String>,
105	/// Latest ping time with this node.
106	latest_ping: Option<Duration>,
107}
108
109impl NodeInfo {
110	fn new(endpoint: ConnectedPoint) -> Self {
111		let mut endpoints = SmallVec::new();
112		endpoints.push(endpoint);
113		Self { info_expire: None, endpoints, client_version: None, latest_ping: None }
114	}
115}
116
117/// Utility struct for tracking external addresses. The data is shared with the `NetworkService`.
118#[derive(Debug, Clone, Default)]
119pub struct ExternalAddresses {
120	addresses: Arc<Mutex<HashSet<Multiaddr>>>,
121}
122
123impl ExternalAddresses {
124	/// Add an external address.
125	pub fn add(&mut self, addr: Multiaddr) -> bool {
126		self.addresses.lock().insert(addr)
127	}
128
129	/// Remove an external address.
130	pub fn remove(&mut self, addr: &Multiaddr) -> bool {
131		self.addresses.lock().remove(addr)
132	}
133}
134
135impl PeerInfoBehaviour {
136	/// Builds a new `PeerInfoBehaviour`.
137	pub fn new(
138		user_agent: String,
139		local_public_key: PublicKey,
140		external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
141		public_addresses: Vec<Multiaddr>,
142	) -> Self {
143		let identify = {
144			let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key.clone())
145				.with_agent_version(user_agent)
146				// We don't need any peer information cached.
147				.with_cache_size(0);
148			Identify::new(cfg)
149		};
150
151		Self {
152			ping: Ping::new(PingConfig::new()),
153			identify,
154			nodes_info: FnvHashMap::default(),
155			garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
156			local_peer_id: local_public_key.to_peer_id(),
157			public_addresses,
158			listen_addresses: HashSet::new(),
159			address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
160			external_addresses: ExternalAddresses { addresses: external_addresses },
161			pending_actions: Default::default(),
162		}
163	}
164
165	/// Borrows `self` and returns a struct giving access to the information about a node.
166	///
167	/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
168	/// we're connected to, meaning that if `None` is returned then we're not connected to that
169	/// node.
170	pub fn node(&self, peer_id: &PeerId) -> Option<Node> {
171		self.nodes_info.get(peer_id).map(Node)
172	}
173
174	/// Inserts a ping time in the cache. Has no effect if we don't have any entry for that node,
175	/// which shouldn't happen.
176	fn handle_ping_report(
177		&mut self,
178		peer_id: &PeerId,
179		ping_time: Duration,
180		connection: ConnectionId,
181	) {
182		trace!(target: LOG_TARGET, "Ping time with {:?} via {:?}: {:?}", peer_id, connection, ping_time);
183		if let Some(entry) = self.nodes_info.get_mut(peer_id) {
184			entry.latest_ping = Some(ping_time);
185		} else {
186			error!(target: LOG_TARGET,
187				"Received ping from node we're not connected to {:?} via {:?}", peer_id, connection);
188		}
189	}
190
191	/// Ensure address has the `/p2p/...` part with local peer id. Returns `Err` if the address
192	/// already contains a different peer id.
193	fn with_local_peer_id(&self, address: Multiaddr) -> Result<Multiaddr, Multiaddr> {
194		if let Some(Protocol::P2p(peer_id)) = address.iter().last() {
195			if peer_id == self.local_peer_id {
196				Ok(address)
197			} else {
198				Err(address)
199			}
200		} else {
201			Ok(address.with(Protocol::P2p(self.local_peer_id)))
202		}
203	}
204
205	/// Inserts an identify record in the cache & discovers external addresses when multiple
206	/// peers report the same address as observed.
207	fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
208		trace!(target: LOG_TARGET, "Identified {:?} => {:?}", peer_id, info);
209		if let Some(entry) = self.nodes_info.get_mut(peer_id) {
210			entry.client_version = Some(info.agent_version.clone());
211		} else {
212			error!(target: LOG_TARGET,
213				"Received identify message from node we're not connected to {peer_id:?}");
214		}
215		// Discover external addresses.
216		match self.with_local_peer_id(info.observed_addr.clone()) {
217			Ok(observed_addr) => {
218				let (is_new, expired) = self.is_new_external_address(&observed_addr, *peer_id);
219				if is_new && self.external_addresses.add(observed_addr.clone()) {
220					trace!(
221						target: LOG_TARGET,
222						"Observed address reported by Identify confirmed as external {}",
223						observed_addr,
224					);
225					self.pending_actions.push_back(ToSwarm::ExternalAddrConfirmed(observed_addr));
226				}
227				if let Some(expired) = expired {
228					trace!(target: LOG_TARGET, "Removing replaced external address: {expired}");
229					self.external_addresses.remove(&expired);
230					self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(expired));
231				}
232			},
233			Err(addr) => {
234				warn!(
235					target: LOG_TARGET,
236					"Identify reported observed address for a peer that is not us: {addr}",
237				);
238			},
239		}
240	}
241
242	/// Check if addresses are equal taking into account they can contain or not contain
243	/// the `/p2p/...` part.
244	fn is_same_address(left: &Multiaddr, right: &Multiaddr) -> bool {
245		let mut left = left.iter();
246		let mut right = right.iter();
247
248		loop {
249			match (left.next(), right.next()) {
250				(None, None) => return true,
251				(None, Some(Protocol::P2p(_))) => return true,
252				(Some(Protocol::P2p(_)), None) => return true,
253				(left, right) if left != right => return false,
254				_ => {},
255			}
256		}
257	}
258
259	/// Check if `address` can be considered a new external address.
260	///
261	/// If this address replaces an older address, the expired address is returned.
262	fn is_new_external_address(
263		&mut self,
264		address: &Multiaddr,
265		peer_id: PeerId,
266	) -> (bool, Option<Multiaddr>) {
267		trace!(target: LOG_TARGET, "Verify new external address: {address}");
268
269		// Public and listen addresses don't count towards discovered external addresses
270		// and are always confirmed.
271		// Because they are not kept in the LRU, they are never replaced by discovered
272		// external addresses.
273		if self
274			.listen_addresses
275			.iter()
276			.chain(self.public_addresses.iter())
277			.any(|known_address| PeerInfoBehaviour::is_same_address(&known_address, &address))
278		{
279			return (true, None)
280		}
281
282		match self.address_confirmations.get(address) {
283			Some(confirmations) => {
284				confirmations.insert(peer_id);
285
286				if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
287					return (true, None)
288				}
289			},
290			None => {
291				let oldest = (self.address_confirmations.len() >=
292					self.address_confirmations.limiter().max_length() as usize)
293					.then(|| {
294						self.address_confirmations.pop_oldest().map(|(address, peers)| {
295							if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
296								return Some(address)
297							} else {
298								None
299							}
300						})
301					})
302					.flatten()
303					.flatten();
304
305				self.address_confirmations
306					.insert(address.clone(), iter::once(peer_id).collect());
307
308				return (false, oldest)
309			},
310		}
311
312		(false, None)
313	}
314}
315
316/// Gives access to the information about a node.
317pub struct Node<'a>(&'a NodeInfo);
318
319impl<'a> Node<'a> {
320	/// Returns the endpoint of an established connection to the peer.
321	///
322	/// Returns `None` if we are disconnected from the node.
323	pub fn endpoint(&self) -> Option<&'a ConnectedPoint> {
324		self.0.endpoints.get(0)
325	}
326
327	/// Returns the latest version information we know of.
328	pub fn client_version(&self) -> Option<&'a str> {
329		self.0.client_version.as_deref()
330	}
331
332	/// Returns the latest ping time we know of for this node. `None` if we never successfully
333	/// pinged this node.
334	pub fn latest_ping(&self) -> Option<Duration> {
335		self.0.latest_ping
336	}
337}
338
339/// Event that can be emitted by the behaviour.
340#[derive(Debug)]
341pub enum PeerInfoEvent {
342	/// We have obtained identity information from a peer, including the addresses it is listening
343	/// on.
344	Identified {
345		/// Id of the peer that has been identified.
346		peer_id: PeerId,
347		/// Information about the peer.
348		info: IdentifyInfo,
349	},
350}
351
352impl NetworkBehaviour for PeerInfoBehaviour {
353	type ConnectionHandler = ConnectionHandlerSelect<
354		<Ping as NetworkBehaviour>::ConnectionHandler,
355		<Identify as NetworkBehaviour>::ConnectionHandler,
356	>;
357	type ToSwarm = PeerInfoEvent;
358
359	fn handle_pending_inbound_connection(
360		&mut self,
361		connection_id: ConnectionId,
362		local_addr: &Multiaddr,
363		remote_addr: &Multiaddr,
364	) -> Result<(), ConnectionDenied> {
365		self.ping
366			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
367		self.identify
368			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
369	}
370
371	fn handle_pending_outbound_connection(
372		&mut self,
373		_connection_id: ConnectionId,
374		_maybe_peer: Option<PeerId>,
375		_addresses: &[Multiaddr],
376		_effective_role: Endpoint,
377	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
378		// Only `Discovery::handle_pending_outbound_connection` must be returning addresses to
379		// ensure that we don't return unwanted addresses.
380		Ok(Vec::new())
381	}
382
383	fn handle_established_inbound_connection(
384		&mut self,
385		connection_id: ConnectionId,
386		peer: PeerId,
387		local_addr: &Multiaddr,
388		remote_addr: &Multiaddr,
389	) -> Result<THandler<Self>, ConnectionDenied> {
390		let ping_handler = self.ping.handle_established_inbound_connection(
391			connection_id,
392			peer,
393			local_addr,
394			remote_addr,
395		)?;
396		let identify_handler = self.identify.handle_established_inbound_connection(
397			connection_id,
398			peer,
399			local_addr,
400			remote_addr,
401		)?;
402		Ok(ping_handler.select(identify_handler))
403	}
404
405	fn handle_established_outbound_connection(
406		&mut self,
407		connection_id: ConnectionId,
408		peer: PeerId,
409		addr: &Multiaddr,
410		role_override: Endpoint,
411		port_use: PortUse,
412	) -> Result<THandler<Self>, ConnectionDenied> {
413		let ping_handler = self.ping.handle_established_outbound_connection(
414			connection_id,
415			peer,
416			addr,
417			role_override,
418			port_use,
419		)?;
420		let identify_handler = self.identify.handle_established_outbound_connection(
421			connection_id,
422			peer,
423			addr,
424			role_override,
425			port_use,
426		)?;
427		Ok(ping_handler.select(identify_handler))
428	}
429
430	fn on_swarm_event(&mut self, event: FromSwarm) {
431		match event {
432			FromSwarm::ConnectionEstablished(
433				e @ ConnectionEstablished { peer_id, endpoint, .. },
434			) => {
435				self.ping.on_swarm_event(FromSwarm::ConnectionEstablished(e));
436				self.identify.on_swarm_event(FromSwarm::ConnectionEstablished(e));
437
438				match self.nodes_info.entry(peer_id) {
439					Entry::Vacant(e) => {
440						e.insert(NodeInfo::new(endpoint.clone()));
441					},
442					Entry::Occupied(e) => {
443						let e = e.into_mut();
444						if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false)
445						{
446							e.client_version = None;
447							e.latest_ping = None;
448						}
449						e.info_expire = None;
450						e.endpoints.push(endpoint.clone());
451					},
452				}
453			},
454			FromSwarm::ConnectionClosed(ConnectionClosed {
455				peer_id,
456				connection_id,
457				endpoint,
458				cause,
459				remaining_established,
460			}) => {
461				self.ping.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
462					peer_id,
463					connection_id,
464					endpoint,
465					cause,
466					remaining_established,
467				}));
468				self.identify.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
469					peer_id,
470					connection_id,
471					endpoint,
472					cause,
473					remaining_established,
474				}));
475
476				if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
477					if remaining_established == 0 {
478						entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
479					}
480					entry.endpoints.retain(|ep| ep != endpoint)
481				} else {
482					error!(target: LOG_TARGET,
483						"Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
484				}
485			},
486			FromSwarm::DialFailure(DialFailure { peer_id, error, connection_id }) => {
487				self.ping.on_swarm_event(FromSwarm::DialFailure(DialFailure {
488					peer_id,
489					error,
490					connection_id,
491				}));
492				self.identify.on_swarm_event(FromSwarm::DialFailure(DialFailure {
493					peer_id,
494					error,
495					connection_id,
496				}));
497			},
498			FromSwarm::ListenerClosed(e) => {
499				self.ping.on_swarm_event(FromSwarm::ListenerClosed(e));
500				self.identify.on_swarm_event(FromSwarm::ListenerClosed(e));
501			},
502			FromSwarm::ListenFailure(ListenFailure {
503				local_addr,
504				send_back_addr,
505				error,
506				connection_id,
507				peer_id,
508			}) => {
509				self.ping.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
510					local_addr,
511					send_back_addr,
512					error,
513					connection_id,
514					peer_id,
515				}));
516				self.identify.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
517					local_addr,
518					send_back_addr,
519					error,
520					connection_id,
521					peer_id,
522				}));
523			},
524			FromSwarm::ListenerError(e) => {
525				self.ping.on_swarm_event(FromSwarm::ListenerError(e));
526				self.identify.on_swarm_event(FromSwarm::ListenerError(e));
527			},
528			FromSwarm::ExternalAddrExpired(e) => {
529				self.ping.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
530				self.identify.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
531			},
532			FromSwarm::NewListener(e) => {
533				self.ping.on_swarm_event(FromSwarm::NewListener(e));
534				self.identify.on_swarm_event(FromSwarm::NewListener(e));
535			},
536			FromSwarm::NewListenAddr(e) => {
537				self.ping.on_swarm_event(FromSwarm::NewListenAddr(e));
538				self.identify.on_swarm_event(FromSwarm::NewListenAddr(e));
539				self.listen_addresses.insert(e.addr.clone());
540			},
541			FromSwarm::ExpiredListenAddr(e) => {
542				self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
543				self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
544				self.listen_addresses.remove(e.addr);
545				// Remove matching external address.
546				match self.with_local_peer_id(e.addr.clone()) {
547					Ok(addr) => {
548						self.external_addresses.remove(&addr);
549						self.pending_actions.push_back(ToSwarm::ExternalAddrExpired(addr));
550					},
551					Err(addr) => {
552						warn!(
553							target: LOG_TARGET,
554							"Listen address expired with peer ID that is not us: {addr}",
555						);
556					},
557				}
558			},
559			FromSwarm::NewExternalAddrCandidate(e) => {
560				self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
561				self.identify.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
562			},
563			FromSwarm::ExternalAddrConfirmed(e) => {
564				self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
565				self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
566			},
567			FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => {
568				self.ping.on_swarm_event(FromSwarm::AddressChange(e));
569				self.identify.on_swarm_event(FromSwarm::AddressChange(e));
570
571				if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
572					if let Some(endpoint) = entry.endpoints.iter_mut().find(|e| e == &old) {
573						*endpoint = new.clone();
574					} else {
575						error!(target: LOG_TARGET,
576							"Unknown address change for peer {:?} from {:?} to {:?}", peer_id, old, new);
577					}
578				} else {
579					error!(target: LOG_TARGET,
580						"Unknown peer {:?} to change address from {:?} to {:?}", peer_id, old, new);
581				}
582			},
583			FromSwarm::NewExternalAddrOfPeer(e) => {
584				self.ping.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
585				self.identify.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
586			},
587			event => {
588				debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
589				self.ping.on_swarm_event(event);
590				self.identify.on_swarm_event(event);
591			},
592		}
593	}
594
595	fn on_connection_handler_event(
596		&mut self,
597		peer_id: PeerId,
598		connection_id: ConnectionId,
599		event: THandlerOutEvent<Self>,
600	) {
601		match event {
602			Either::Left(event) =>
603				self.ping.on_connection_handler_event(peer_id, connection_id, event),
604			Either::Right(event) =>
605				self.identify.on_connection_handler_event(peer_id, connection_id, event),
606		}
607	}
608
609	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
610		if let Some(event) = self.pending_actions.pop_front() {
611			return Poll::Ready(event)
612		}
613
614		loop {
615			match self.ping.poll(cx) {
616				Poll::Pending => break,
617				Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
618					if let PingEvent { peer, result: Ok(rtt), connection } = ev {
619						self.handle_ping_report(&peer, rtt, connection)
620					}
621				},
622				Poll::Ready(event) => {
623					return Poll::Ready(event.map_in(Either::Left).map_out(|_| {
624						unreachable!("`GenerateEvent` is handled in a branch above; qed")
625					}));
626				},
627			}
628		}
629
630		loop {
631			match self.identify.poll(cx) {
632				Poll::Pending => break,
633				Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
634					IdentifyEvent::Received { peer_id, info, .. } => {
635						self.handle_identify_report(&peer_id, &info);
636						let event = PeerInfoEvent::Identified { peer_id, info };
637						return Poll::Ready(ToSwarm::GenerateEvent(event))
638					},
639					IdentifyEvent::Error { connection_id, peer_id, error } => {
640						debug!(
641							target: LOG_TARGET,
642							"Identification with peer {peer_id:?}({connection_id}) failed => {error}"
643						);
644					},
645					IdentifyEvent::Pushed { .. } => {},
646					IdentifyEvent::Sent { .. } => {},
647				},
648				Poll::Ready(event) => {
649					return Poll::Ready(event.map_in(Either::Right).map_out(|_| {
650						unreachable!("`GenerateEvent` is handled in a branch above; qed")
651					}));
652				},
653			}
654		}
655
656		while let Poll::Ready(Some(())) = self.garbage_collect.poll_next_unpin(cx) {
657			self.nodes_info.retain(|_, node| {
658				node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
659			});
660		}
661
662		Poll::Pending
663	}
664}