referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/
peerstore.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `Peerstore` implementation for `litep2p`.
20//!
21//! `Peerstore` is responsible for storing information about remote peers
22//! such as their addresses, reputations, supported protocols etc.
23
24use crate::{
25	peer_store::{PeerStoreProvider, ProtocolHandle},
26	service::{metrics::PeerStoreMetrics, traits::PeerStore},
27	ObservedRole, ReputationChange,
28};
29
30use parking_lot::Mutex;
31use prometheus_endpoint::Registry;
32use wasm_timer::Delay;
33
34use sc_network_types::PeerId;
35
36use std::{
37	collections::{HashMap, HashSet},
38	sync::Arc,
39	time::{Duration, Instant},
40};
41
42/// Logging target for the file.
43const LOG_TARGET: &str = "sub-libp2p::peerstore";
44
45/// We don't accept nodes whose reputation is under this value.
46pub const BANNED_THRESHOLD: i32 = 71 * (i32::MIN / 100);
47
48/// Relative decrement of a reputation value that is applied every second. I.e., for inverse
49/// decrement of 200 we decrease absolute value of the reputation by 1/200.
50///
51/// This corresponds to a factor of `k = 0.995`, where k = 1 - 1 / INVERSE_DECREMENT.
52///
53/// It takes ~ `ln(0.5) / ln(k)` seconds to reduce the reputation by half, or 138.63 seconds for the
54/// values above.
55///
56/// In this setup:
57/// - `i32::MAX` becomes 0 in exactly 3544 seconds, or approximately 59 minutes
58/// - `i32::MIN` escapes the banned threshold in 69 seconds
59const INVERSE_DECREMENT: i32 = 200;
60
61/// Amount of time between the moment we last updated the [`PeerStore`] entry and the moment we
62/// remove it, once the reputation value reaches 0.
63const FORGET_AFTER: Duration = Duration::from_secs(3600);
64
65/// Peer information.
66#[derive(Debug, Clone, Copy)]
67struct PeerInfo {
68	/// Reputation of the peer.
69	reputation: i32,
70
71	/// Instant when the peer was last updated.
72	last_updated: Instant,
73
74	/// Role of the peer, if known.
75	role: Option<ObservedRole>,
76}
77
78impl Default for PeerInfo {
79	fn default() -> Self {
80		Self { reputation: 0i32, last_updated: Instant::now(), role: None }
81	}
82}
83
84impl PeerInfo {
85	fn is_banned(&self) -> bool {
86		self.reputation < BANNED_THRESHOLD
87	}
88
89	fn add_reputation(&mut self, increment: i32) {
90		self.reputation = self.reputation.saturating_add(increment);
91		self.bump_last_updated();
92	}
93
94	fn decay_reputation(&mut self, seconds_passed: u64) {
95		// Note that decaying the reputation value happens "on its own",
96		// so we don't do `bump_last_updated()`.
97		for _ in 0..seconds_passed {
98			let mut diff = self.reputation / INVERSE_DECREMENT;
99			if diff == 0 && self.reputation < 0 {
100				diff = -1;
101			} else if diff == 0 && self.reputation > 0 {
102				diff = 1;
103			}
104
105			self.reputation = self.reputation.saturating_sub(diff);
106
107			if self.reputation == 0 {
108				break
109			}
110		}
111	}
112
113	fn bump_last_updated(&mut self) {
114		self.last_updated = Instant::now();
115	}
116}
117
118#[derive(Debug, Default)]
119pub struct PeerstoreHandleInner {
120	peers: HashMap<PeerId, PeerInfo>,
121	protocols: Vec<Arc<dyn ProtocolHandle>>,
122	metrics: Option<PeerStoreMetrics>,
123}
124
125#[derive(Debug, Clone, Default)]
126pub struct PeerstoreHandle(Arc<Mutex<PeerstoreHandleInner>>);
127
128impl PeerstoreHandle {
129	/// Constructs a new [`PeerstoreHandle`].
130	fn new(
131		peers: HashMap<PeerId, PeerInfo>,
132		protocols: Vec<Arc<dyn ProtocolHandle>>,
133		metrics: Option<PeerStoreMetrics>,
134	) -> Self {
135		Self(Arc::new(Mutex::new(PeerstoreHandleInner { peers, protocols, metrics })))
136	}
137
138	/// Add known peer to [`Peerstore`].
139	pub fn add_known_peer(&self, peer: PeerId) {
140		self.0
141			.lock()
142			.peers
143			.insert(peer, PeerInfo { reputation: 0i32, last_updated: Instant::now(), role: None });
144	}
145
146	pub fn peer_count(&self) -> usize {
147		self.0.lock().peers.len()
148	}
149
150	fn progress_time(&self, seconds_passed: u64) {
151		if seconds_passed == 0 {
152			return
153		}
154
155		let mut lock = self.0.lock();
156
157		// Drive reputation values towards 0.
158		lock.peers
159			.iter_mut()
160			.for_each(|(_, info)| info.decay_reputation(seconds_passed));
161
162		// Retain only entries with non-zero reputation values or not expired ones.
163		let now = Instant::now();
164		let mut num_banned_peers = 0;
165		lock.peers.retain(|_, info| {
166			if info.is_banned() {
167				num_banned_peers += 1;
168			}
169			info.reputation != 0 || info.last_updated + FORGET_AFTER > now
170		});
171
172		if let Some(metrics) = &lock.metrics {
173			metrics.num_discovered.set(lock.peers.len() as u64);
174			metrics.num_banned_peers.set(num_banned_peers);
175		}
176	}
177}
178
179impl PeerStoreProvider for PeerstoreHandle {
180	fn is_banned(&self, peer: &PeerId) -> bool {
181		self.0.lock().peers.get(peer).map_or(false, |info| info.is_banned())
182	}
183
184	/// Register a protocol handle to disconnect peers whose reputation drops below the threshold.
185	fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>) {
186		self.0.lock().protocols.push(protocol_handle);
187	}
188
189	/// Report peer disconnection for reputation adjustment.
190	fn report_disconnect(&self, _peer: PeerId) {
191		unimplemented!();
192	}
193
194	/// Adjust peer reputation.
195	fn report_peer(&self, peer_id: PeerId, change: ReputationChange) {
196		let mut lock = self.0.lock();
197		let peer_info = lock.peers.entry(peer_id).or_default();
198		let was_banned = peer_info.is_banned();
199		peer_info.add_reputation(change.value);
200		let peer_reputation = peer_info.reputation;
201
202		log::trace!(
203			target: LOG_TARGET,
204			"Report {}: {:+} to {}. Reason: {}.",
205			peer_id,
206			change.value,
207			peer_reputation,
208			change.reason,
209		);
210
211		if !peer_info.is_banned() {
212			if was_banned {
213				log::info!(
214					target: LOG_TARGET,
215					"Peer {} is now unbanned: {:+} to {}. Reason: {}.",
216					peer_id,
217					change.value,
218					peer_reputation,
219					change.reason,
220				);
221			}
222			return;
223		}
224
225		// Peer is currently banned, disconnect it from all protocols.
226		lock.protocols.iter().for_each(|handle| handle.disconnect_peer(peer_id.into()));
227
228		// The peer is banned for the first time.
229		if !was_banned {
230			log::warn!(
231				target: LOG_TARGET,
232				"Report {}: {:+} to {}. Reason: {}. Banned, disconnecting.",
233				peer_id,
234				change.value,
235				peer_reputation,
236				change.reason,
237			);
238			return;
239		}
240
241		// The peer was already banned and it got another negative report.
242		// This may happen during a batch report.
243		if change.value < 0 {
244			log::debug!(
245				target: LOG_TARGET,
246				"Report {}: {:+} to {}. Reason: {}. Misbehaved during the ban threshold.",
247				peer_id,
248				change.value,
249				peer_reputation,
250				change.reason,
251			);
252		}
253	}
254
255	/// Set peer role.
256	fn set_peer_role(&self, peer: &PeerId, role: ObservedRole) {
257		self.0.lock().peers.entry(*peer).or_default().role = Some(role);
258	}
259
260	/// Get peer reputation.
261	fn peer_reputation(&self, peer: &PeerId) -> i32 {
262		self.0.lock().peers.get(peer).map_or(0i32, |info| info.reputation)
263	}
264
265	/// Get peer role, if available.
266	fn peer_role(&self, peer: &PeerId) -> Option<ObservedRole> {
267		self.0.lock().peers.get(peer).and_then(|info| info.role)
268	}
269
270	/// Get candidates with highest reputations for initiating outgoing connections.
271	fn outgoing_candidates(&self, count: usize, ignored: HashSet<PeerId>) -> Vec<PeerId> {
272		let handle = self.0.lock();
273
274		let mut candidates = handle
275			.peers
276			.iter()
277			.filter_map(|(peer, info)| {
278				(!ignored.contains(&peer) && !info.is_banned()).then_some((*peer, info.reputation))
279			})
280			.collect::<Vec<(PeerId, _)>>();
281		candidates.sort_by(|(_, a), (_, b)| b.cmp(a));
282		candidates
283			.into_iter()
284			.take(count)
285			.map(|(peer, _score)| peer)
286			.collect::<Vec<_>>()
287	}
288
289	/// Add known peer.
290	fn add_known_peer(&self, peer: PeerId) {
291		self.0.lock().peers.entry(peer).or_default().last_updated = Instant::now();
292	}
293}
294
295/// `Peerstore` handle for testing.
296///
297/// This instance of `Peerstore` is not shared between protocols.
298#[cfg(test)]
299pub fn peerstore_handle_test() -> PeerstoreHandle {
300	PeerstoreHandle(Arc::new(Mutex::new(Default::default())))
301}
302
303/// Peerstore implementation.
304pub struct Peerstore {
305	/// Handle to `Peerstore`.
306	peerstore_handle: PeerstoreHandle,
307}
308
309impl Peerstore {
310	/// Create new [`Peerstore`].
311	pub fn new(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self {
312		let metrics = if let Some(registry) = &metrics_registry {
313			PeerStoreMetrics::register(registry)
314				.map_err(|err| {
315					log::error!(target: LOG_TARGET, "Failed to register peer store metrics: {}", err);
316					err
317				})
318				.ok()
319		} else {
320			None
321		};
322
323		let peerstore_handle = PeerstoreHandle::new(
324			bootnodes.iter().map(|peer_id| (*peer_id, PeerInfo::default())).collect(),
325			Vec::new(),
326			metrics,
327		);
328
329		Self { peerstore_handle }
330	}
331
332	/// Get mutable reference to the underlying [`PeerstoreHandle`].
333	pub fn handle(&mut self) -> &mut PeerstoreHandle {
334		&mut self.peerstore_handle
335	}
336
337	/// Add known peer to [`Peerstore`].
338	pub fn add_known_peer(&mut self, peer: PeerId) {
339		self.peerstore_handle.add_known_peer(peer);
340	}
341
342	/// Start [`Peerstore`] event loop.
343	async fn run(self) {
344		let started = Instant::now();
345		let mut latest_time_update = started;
346
347		loop {
348			let now = Instant::now();
349			// We basically do `(now - self.latest_update).as_secs()`, except that by the way we do
350			// it we know that we're not going to miss seconds because of rounding to integers.
351			let seconds_passed = {
352				let elapsed_latest = latest_time_update - started;
353				let elapsed_now = now - started;
354				latest_time_update = now;
355				elapsed_now.as_secs() - elapsed_latest.as_secs()
356			};
357
358			self.peerstore_handle.progress_time(seconds_passed);
359			let _ = Delay::new(Duration::from_secs(1)).await;
360		}
361	}
362}
363
364#[async_trait::async_trait]
365impl PeerStore for Peerstore {
366	/// Get handle to `PeerStore`.
367	fn handle(&self) -> Arc<dyn PeerStoreProvider> {
368		Arc::new(self.peerstore_handle.clone())
369	}
370
371	/// Start running `PeerStore` event loop.
372	async fn run(self) {
373		self.run().await;
374	}
375}
376
377#[cfg(test)]
378mod tests {
379	use super::{PeerInfo, PeerStoreProvider, Peerstore};
380
381	#[test]
382	fn decaying_zero_reputation_yields_zero() {
383		let mut peer_info = PeerInfo::default();
384		assert_eq!(peer_info.reputation, 0);
385
386		peer_info.decay_reputation(1);
387		assert_eq!(peer_info.reputation, 0);
388
389		peer_info.decay_reputation(100_000);
390		assert_eq!(peer_info.reputation, 0);
391	}
392
393	#[test]
394	fn decaying_positive_reputation_decreases_it() {
395		const INITIAL_REPUTATION: i32 = 100;
396
397		let mut peer_info = PeerInfo::default();
398		peer_info.reputation = INITIAL_REPUTATION;
399
400		peer_info.decay_reputation(1);
401		assert!(peer_info.reputation >= 0);
402		assert!(peer_info.reputation < INITIAL_REPUTATION);
403	}
404
405	#[test]
406	fn decaying_negative_reputation_increases_it() {
407		const INITIAL_REPUTATION: i32 = -100;
408
409		let mut peer_info = PeerInfo::default();
410		peer_info.reputation = INITIAL_REPUTATION;
411
412		peer_info.decay_reputation(1);
413		assert!(peer_info.reputation <= 0);
414		assert!(peer_info.reputation > INITIAL_REPUTATION);
415	}
416
417	#[test]
418	fn decaying_max_reputation_finally_yields_zero() {
419		const INITIAL_REPUTATION: i32 = i32::MAX;
420		const SECONDS: u64 = 3544;
421
422		let mut peer_info = PeerInfo::default();
423		peer_info.reputation = INITIAL_REPUTATION;
424
425		peer_info.decay_reputation(SECONDS / 2);
426		assert!(peer_info.reputation > 0);
427
428		peer_info.decay_reputation(SECONDS / 2);
429		assert_eq!(peer_info.reputation, 0);
430	}
431
432	#[test]
433	fn decaying_min_reputation_finally_yields_zero() {
434		const INITIAL_REPUTATION: i32 = i32::MIN;
435		const SECONDS: u64 = 3544;
436
437		let mut peer_info = PeerInfo::default();
438		peer_info.reputation = INITIAL_REPUTATION;
439
440		peer_info.decay_reputation(SECONDS / 2);
441		assert!(peer_info.reputation < 0);
442
443		peer_info.decay_reputation(SECONDS / 2);
444		assert_eq!(peer_info.reputation, 0);
445	}
446
447	#[test]
448	fn report_banned_peers() {
449		let peer_a = sc_network_types::PeerId::random();
450		let peer_b = sc_network_types::PeerId::random();
451		let peer_c = sc_network_types::PeerId::random();
452
453		let metrics_registry = prometheus_endpoint::Registry::new();
454		let mut peerstore = Peerstore::new(
455			vec![peer_a, peer_b, peer_c].into_iter().map(Into::into).collect(),
456			Some(metrics_registry),
457		);
458		let metrics = peerstore.peerstore_handle.0.lock().metrics.as_ref().unwrap().clone();
459		let handle = peerstore.handle();
460
461		// Check initial state. Advance time to propagate peers.
462		handle.progress_time(1);
463		assert_eq!(metrics.num_discovered.get(), 3);
464		assert_eq!(metrics.num_banned_peers.get(), 0);
465
466		// Report 2 peers with a negative reputation.
467		handle.report_peer(
468			peer_a,
469			sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
470		);
471		handle.report_peer(
472			peer_b,
473			sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
474		);
475
476		// Advance time to propagate peers.
477		handle.progress_time(1);
478		assert_eq!(metrics.num_discovered.get(), 3);
479		assert_eq!(metrics.num_banned_peers.get(), 2);
480	}
481}