1use crate::{
25 peer_store::{PeerStoreProvider, ProtocolHandle},
26 service::{metrics::PeerStoreMetrics, traits::PeerStore},
27 ObservedRole, ReputationChange,
28};
29
30use parking_lot::Mutex;
31use prometheus_endpoint::Registry;
32use wasm_timer::Delay;
33
34use sc_network_types::PeerId;
35
36use std::{
37 collections::{HashMap, HashSet},
38 sync::Arc,
39 time::{Duration, Instant},
40};
41
42const LOG_TARGET: &str = "sub-libp2p::peerstore";
44
45pub const BANNED_THRESHOLD: i32 = 71 * (i32::MIN / 100);
47
48const INVERSE_DECREMENT: i32 = 200;
60
61const FORGET_AFTER: Duration = Duration::from_secs(3600);
64
65#[derive(Debug, Clone, Copy)]
67struct PeerInfo {
68 reputation: i32,
70
71 last_updated: Instant,
73
74 role: Option<ObservedRole>,
76}
77
78impl Default for PeerInfo {
79 fn default() -> Self {
80 Self { reputation: 0i32, last_updated: Instant::now(), role: None }
81 }
82}
83
84impl PeerInfo {
85 fn is_banned(&self) -> bool {
86 self.reputation < BANNED_THRESHOLD
87 }
88
89 fn add_reputation(&mut self, increment: i32) {
90 self.reputation = self.reputation.saturating_add(increment);
91 self.bump_last_updated();
92 }
93
94 fn decay_reputation(&mut self, seconds_passed: u64) {
95 for _ in 0..seconds_passed {
98 let mut diff = self.reputation / INVERSE_DECREMENT;
99 if diff == 0 && self.reputation < 0 {
100 diff = -1;
101 } else if diff == 0 && self.reputation > 0 {
102 diff = 1;
103 }
104
105 self.reputation = self.reputation.saturating_sub(diff);
106
107 if self.reputation == 0 {
108 break
109 }
110 }
111 }
112
113 fn bump_last_updated(&mut self) {
114 self.last_updated = Instant::now();
115 }
116}
117
118#[derive(Debug, Default)]
119pub struct PeerstoreHandleInner {
120 peers: HashMap<PeerId, PeerInfo>,
121 protocols: Vec<Arc<dyn ProtocolHandle>>,
122 metrics: Option<PeerStoreMetrics>,
123}
124
125#[derive(Debug, Clone, Default)]
126pub struct PeerstoreHandle(Arc<Mutex<PeerstoreHandleInner>>);
127
128impl PeerstoreHandle {
129 fn new(
131 peers: HashMap<PeerId, PeerInfo>,
132 protocols: Vec<Arc<dyn ProtocolHandle>>,
133 metrics: Option<PeerStoreMetrics>,
134 ) -> Self {
135 Self(Arc::new(Mutex::new(PeerstoreHandleInner { peers, protocols, metrics })))
136 }
137
138 pub fn add_known_peer(&self, peer: PeerId) {
140 self.0
141 .lock()
142 .peers
143 .insert(peer, PeerInfo { reputation: 0i32, last_updated: Instant::now(), role: None });
144 }
145
146 pub fn peer_count(&self) -> usize {
147 self.0.lock().peers.len()
148 }
149
150 fn progress_time(&self, seconds_passed: u64) {
151 if seconds_passed == 0 {
152 return
153 }
154
155 let mut lock = self.0.lock();
156
157 lock.peers
159 .iter_mut()
160 .for_each(|(_, info)| info.decay_reputation(seconds_passed));
161
162 let now = Instant::now();
164 let mut num_banned_peers = 0;
165 lock.peers.retain(|_, info| {
166 if info.is_banned() {
167 num_banned_peers += 1;
168 }
169 info.reputation != 0 || info.last_updated + FORGET_AFTER > now
170 });
171
172 if let Some(metrics) = &lock.metrics {
173 metrics.num_discovered.set(lock.peers.len() as u64);
174 metrics.num_banned_peers.set(num_banned_peers);
175 }
176 }
177}
178
179impl PeerStoreProvider for PeerstoreHandle {
180 fn is_banned(&self, peer: &PeerId) -> bool {
181 self.0.lock().peers.get(peer).map_or(false, |info| info.is_banned())
182 }
183
184 fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>) {
186 self.0.lock().protocols.push(protocol_handle);
187 }
188
189 fn report_disconnect(&self, _peer: PeerId) {
191 unimplemented!();
192 }
193
194 fn report_peer(&self, peer_id: PeerId, change: ReputationChange) {
196 let mut lock = self.0.lock();
197 let peer_info = lock.peers.entry(peer_id).or_default();
198 let was_banned = peer_info.is_banned();
199 peer_info.add_reputation(change.value);
200 let peer_reputation = peer_info.reputation;
201
202 log::trace!(
203 target: LOG_TARGET,
204 "Report {}: {:+} to {}. Reason: {}.",
205 peer_id,
206 change.value,
207 peer_reputation,
208 change.reason,
209 );
210
211 if !peer_info.is_banned() {
212 if was_banned {
213 log::info!(
214 target: LOG_TARGET,
215 "Peer {} is now unbanned: {:+} to {}. Reason: {}.",
216 peer_id,
217 change.value,
218 peer_reputation,
219 change.reason,
220 );
221 }
222 return;
223 }
224
225 lock.protocols.iter().for_each(|handle| handle.disconnect_peer(peer_id.into()));
227
228 if !was_banned {
230 log::warn!(
231 target: LOG_TARGET,
232 "Report {}: {:+} to {}. Reason: {}. Banned, disconnecting.",
233 peer_id,
234 change.value,
235 peer_reputation,
236 change.reason,
237 );
238 return;
239 }
240
241 if change.value < 0 {
244 log::debug!(
245 target: LOG_TARGET,
246 "Report {}: {:+} to {}. Reason: {}. Misbehaved during the ban threshold.",
247 peer_id,
248 change.value,
249 peer_reputation,
250 change.reason,
251 );
252 }
253 }
254
255 fn set_peer_role(&self, peer: &PeerId, role: ObservedRole) {
257 self.0.lock().peers.entry(*peer).or_default().role = Some(role);
258 }
259
260 fn peer_reputation(&self, peer: &PeerId) -> i32 {
262 self.0.lock().peers.get(peer).map_or(0i32, |info| info.reputation)
263 }
264
265 fn peer_role(&self, peer: &PeerId) -> Option<ObservedRole> {
267 self.0.lock().peers.get(peer).and_then(|info| info.role)
268 }
269
270 fn outgoing_candidates(&self, count: usize, ignored: HashSet<PeerId>) -> Vec<PeerId> {
272 let handle = self.0.lock();
273
274 let mut candidates = handle
275 .peers
276 .iter()
277 .filter_map(|(peer, info)| {
278 (!ignored.contains(&peer) && !info.is_banned()).then_some((*peer, info.reputation))
279 })
280 .collect::<Vec<(PeerId, _)>>();
281 candidates.sort_by(|(_, a), (_, b)| b.cmp(a));
282 candidates
283 .into_iter()
284 .take(count)
285 .map(|(peer, _score)| peer)
286 .collect::<Vec<_>>()
287 }
288
289 fn add_known_peer(&self, peer: PeerId) {
291 self.0.lock().peers.entry(peer).or_default().last_updated = Instant::now();
292 }
293}
294
295#[cfg(test)]
299pub fn peerstore_handle_test() -> PeerstoreHandle {
300 PeerstoreHandle(Arc::new(Mutex::new(Default::default())))
301}
302
303pub struct Peerstore {
305 peerstore_handle: PeerstoreHandle,
307}
308
309impl Peerstore {
310 pub fn new(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self {
312 let metrics = if let Some(registry) = &metrics_registry {
313 PeerStoreMetrics::register(registry)
314 .map_err(|err| {
315 log::error!(target: LOG_TARGET, "Failed to register peer store metrics: {}", err);
316 err
317 })
318 .ok()
319 } else {
320 None
321 };
322
323 let peerstore_handle = PeerstoreHandle::new(
324 bootnodes.iter().map(|peer_id| (*peer_id, PeerInfo::default())).collect(),
325 Vec::new(),
326 metrics,
327 );
328
329 Self { peerstore_handle }
330 }
331
332 pub fn handle(&mut self) -> &mut PeerstoreHandle {
334 &mut self.peerstore_handle
335 }
336
337 pub fn add_known_peer(&mut self, peer: PeerId) {
339 self.peerstore_handle.add_known_peer(peer);
340 }
341
342 async fn run(self) {
344 let started = Instant::now();
345 let mut latest_time_update = started;
346
347 loop {
348 let now = Instant::now();
349 let seconds_passed = {
352 let elapsed_latest = latest_time_update - started;
353 let elapsed_now = now - started;
354 latest_time_update = now;
355 elapsed_now.as_secs() - elapsed_latest.as_secs()
356 };
357
358 self.peerstore_handle.progress_time(seconds_passed);
359 let _ = Delay::new(Duration::from_secs(1)).await;
360 }
361 }
362}
363
364#[async_trait::async_trait]
365impl PeerStore for Peerstore {
366 fn handle(&self) -> Arc<dyn PeerStoreProvider> {
368 Arc::new(self.peerstore_handle.clone())
369 }
370
371 async fn run(self) {
373 self.run().await;
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::{PeerInfo, PeerStoreProvider, Peerstore};
380
381 #[test]
382 fn decaying_zero_reputation_yields_zero() {
383 let mut peer_info = PeerInfo::default();
384 assert_eq!(peer_info.reputation, 0);
385
386 peer_info.decay_reputation(1);
387 assert_eq!(peer_info.reputation, 0);
388
389 peer_info.decay_reputation(100_000);
390 assert_eq!(peer_info.reputation, 0);
391 }
392
393 #[test]
394 fn decaying_positive_reputation_decreases_it() {
395 const INITIAL_REPUTATION: i32 = 100;
396
397 let mut peer_info = PeerInfo::default();
398 peer_info.reputation = INITIAL_REPUTATION;
399
400 peer_info.decay_reputation(1);
401 assert!(peer_info.reputation >= 0);
402 assert!(peer_info.reputation < INITIAL_REPUTATION);
403 }
404
405 #[test]
406 fn decaying_negative_reputation_increases_it() {
407 const INITIAL_REPUTATION: i32 = -100;
408
409 let mut peer_info = PeerInfo::default();
410 peer_info.reputation = INITIAL_REPUTATION;
411
412 peer_info.decay_reputation(1);
413 assert!(peer_info.reputation <= 0);
414 assert!(peer_info.reputation > INITIAL_REPUTATION);
415 }
416
417 #[test]
418 fn decaying_max_reputation_finally_yields_zero() {
419 const INITIAL_REPUTATION: i32 = i32::MAX;
420 const SECONDS: u64 = 3544;
421
422 let mut peer_info = PeerInfo::default();
423 peer_info.reputation = INITIAL_REPUTATION;
424
425 peer_info.decay_reputation(SECONDS / 2);
426 assert!(peer_info.reputation > 0);
427
428 peer_info.decay_reputation(SECONDS / 2);
429 assert_eq!(peer_info.reputation, 0);
430 }
431
432 #[test]
433 fn decaying_min_reputation_finally_yields_zero() {
434 const INITIAL_REPUTATION: i32 = i32::MIN;
435 const SECONDS: u64 = 3544;
436
437 let mut peer_info = PeerInfo::default();
438 peer_info.reputation = INITIAL_REPUTATION;
439
440 peer_info.decay_reputation(SECONDS / 2);
441 assert!(peer_info.reputation < 0);
442
443 peer_info.decay_reputation(SECONDS / 2);
444 assert_eq!(peer_info.reputation, 0);
445 }
446
447 #[test]
448 fn report_banned_peers() {
449 let peer_a = sc_network_types::PeerId::random();
450 let peer_b = sc_network_types::PeerId::random();
451 let peer_c = sc_network_types::PeerId::random();
452
453 let metrics_registry = prometheus_endpoint::Registry::new();
454 let mut peerstore = Peerstore::new(
455 vec![peer_a, peer_b, peer_c].into_iter().map(Into::into).collect(),
456 Some(metrics_registry),
457 );
458 let metrics = peerstore.peerstore_handle.0.lock().metrics.as_ref().unwrap().clone();
459 let handle = peerstore.handle();
460
461 handle.progress_time(1);
463 assert_eq!(metrics.num_discovered.get(), 3);
464 assert_eq!(metrics.num_banned_peers.get(), 0);
465
466 handle.report_peer(
468 peer_a,
469 sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
470 );
471 handle.report_peer(
472 peer_b,
473 sc_network_common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
474 );
475
476 handle.progress_time(1);
478 assert_eq!(metrics.num_discovered.get(), 3);
479 assert_eq!(metrics.num_banned_peers.get(), 2);
480 }
481}