1mod backend;
17mod connected;
18mod db;
19
20use futures::channel::oneshot;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
22
23use crate::{
24 validator_side_experimental::{
25 common::{
26 PeerInfo, PeerState, Score, CONNECTED_PEERS_LIMIT, CONNECTED_PEERS_PARA_LIMIT,
27 INACTIVITY_DECAY, MAX_STARTUP_ANCESTRY_LOOKBACK, MAX_STORED_SCORES_PER_PARA,
28 VALID_INCLUDED_CANDIDATE_BUMP,
29 },
30 error::{Error, Result},
31 },
32 LOG_TARGET,
33};
34pub use backend::Backend;
35use connected::ConnectedPeers;
36pub use db::Db;
37use polkadot_node_network_protocol::{
38 peer_set::{CollationVersion, PeerSet},
39 PeerId,
40};
41use polkadot_node_subsystem::{
42 messages::{ChainApiMessage, NetworkBridgeTxMessage},
43 ActivatedLeaf, CollatorProtocolSenderTrait,
44};
45use polkadot_node_subsystem_util::{
46 request_candidate_events, request_candidates_pending_availability, runtime::recv_runtime,
47};
48use polkadot_primitives::{
49 BlockNumber, CandidateDescriptorVersion, CandidateEvent, CandidateHash, Hash, Id as ParaId,
50};
51
52#[derive(Debug, PartialEq, Clone)]
53pub struct ReputationUpdate {
54 pub peer_id: PeerId,
55 pub para_id: ParaId,
56 pub value: Score,
57 pub kind: ReputationUpdateKind,
58}
59
60#[derive(Debug, PartialEq, Clone)]
61pub enum ReputationUpdateKind {
62 Bump,
63 Slash,
64}
65
66#[derive(Debug, PartialEq)]
67enum TryAcceptOutcome {
68 Added,
69 Replaced(HashSet<PeerId>),
75 Rejected,
76}
77
78impl TryAcceptOutcome {
79 fn combine(self, other: Self) -> Self {
80 use TryAcceptOutcome::*;
81 match (self, other) {
82 (Added, Added) => Added,
83 (Rejected, Rejected) => Rejected,
84 (Added, Rejected) | (Rejected, Added) => Added,
85 (Replaced(mut replaced_a), Replaced(replaced_b)) => {
86 replaced_a.extend(replaced_b);
87 Replaced(replaced_a)
88 },
89 (_, Replaced(replaced)) | (Replaced(replaced), _) => Replaced(replaced),
90 }
91 }
92}
93
94#[derive(Debug, PartialEq)]
95enum DeclarationOutcome {
96 Rejected,
97 Switched(ParaId),
98 Accepted,
99}
100
101pub struct PeerManager<B> {
102 db: B,
103 connected: ConnectedPeers,
104}
105
106impl<B: Backend> PeerManager<B> {
107 pub async fn startup<Sender: CollatorProtocolSenderTrait>(
110 backend: B,
111 sender: &mut Sender,
112 scheduled_paras: BTreeSet<ParaId>,
113 ) -> Result<Self> {
114 let mut instance = Self {
115 db: backend,
116 connected: ConnectedPeers::new(
117 scheduled_paras,
118 CONNECTED_PEERS_LIMIT,
119 CONNECTED_PEERS_PARA_LIMIT,
120 ),
121 };
122
123 let (latest_finalized_block_number, latest_finalized_block_hash) =
124 get_latest_finalized_block(sender).await?;
125
126 let processed_finalized_block_number =
127 instance.db.processed_finalized_block_number().await.unwrap_or_default();
128
129 let bumps = extract_reputation_bumps_on_new_finalized_block(
130 sender,
131 processed_finalized_block_number,
132 (latest_finalized_block_number, latest_finalized_block_hash),
133 )
134 .await?;
135
136 instance.db.process_bumps(latest_finalized_block_number, bumps, None).await;
137
138 Ok(instance)
139 }
140
141 pub async fn update_reputations_on_new_finalized_block<Sender: CollatorProtocolSenderTrait>(
143 &mut self,
144 sender: &mut Sender,
145 (finalized_block_hash, finalized_block_number): (Hash, BlockNumber),
146 ) -> Result<()> {
147 let processed_finalized_block_number =
148 self.db.processed_finalized_block_number().await.unwrap_or_default();
149
150 let bumps = extract_reputation_bumps_on_new_finalized_block(
151 sender,
152 processed_finalized_block_number,
153 (finalized_block_number, finalized_block_hash),
154 )
155 .await?;
156
157 let updates = self
158 .db
159 .process_bumps(
160 finalized_block_number,
161 bumps,
162 Some(Score::new(INACTIVITY_DECAY).expect("INACTIVITY_DECAY is a valid score")),
163 )
164 .await;
165 for update in updates {
166 self.connected.update_reputation(update);
167 }
168
169 Ok(())
170 }
171
172 pub async fn registered_paras_update(&mut self, registered_paras: BTreeSet<ParaId>) {
176 self.db.prune_paras(registered_paras).await;
180 }
181
182 pub async fn scheduled_paras_update<Sender: CollatorProtocolSenderTrait>(
184 &mut self,
185 sender: &mut Sender,
186 scheduled_paras: BTreeSet<ParaId>,
187 ) {
188 let mut prev_scheduled_paras: BTreeSet<_> =
189 self.connected.scheduled_paras().copied().collect();
190
191 if prev_scheduled_paras == scheduled_paras {
192 return
194 }
195
196 let mut new_instance =
199 ConnectedPeers::new(scheduled_paras, CONNECTED_PEERS_LIMIT, CONNECTED_PEERS_PARA_LIMIT);
200
201 std::mem::swap(&mut new_instance, &mut self.connected);
202 let prev_instance = new_instance;
203 let (prev_peers, cached_scores) = prev_instance.consume();
204
205 let cached_scores = &cached_scores;
210 let db = &self.db;
211 let reputation_query_fn = |peer_id: PeerId, para_id: ParaId| async move {
212 if let Some(cached_score) =
213 cached_scores.get(¶_id).and_then(|per_para| per_para.get_score(&peer_id))
214 {
215 cached_score
216 } else {
217 db.query(&peer_id, ¶_id).await.unwrap_or_default()
218 }
219 };
220
221 let mut peers_to_disconnect = HashSet::new();
223 for (peer_id, peer_info) in prev_peers {
224 let outcome = self.connected.try_accept(reputation_query_fn, peer_id, peer_info).await;
225
226 match outcome {
227 TryAcceptOutcome::Rejected => {
228 peers_to_disconnect.insert(peer_id);
229 },
230 TryAcceptOutcome::Replaced(replaced_peer_ids) => {
231 peers_to_disconnect.extend(replaced_peer_ids);
232 },
233 TryAcceptOutcome::Added => {},
234 }
235 }
236
237 self.disconnect_peers(sender, peers_to_disconnect).await;
239 }
240
241 pub async fn declared<Sender: CollatorProtocolSenderTrait>(
243 &mut self,
244 sender: &mut Sender,
245 peer_id: PeerId,
246 para_id: ParaId,
247 ) {
248 let Some(peer_info) = self.connected.peer_info(&peer_id).cloned() else { return };
249 let outcome = self.connected.declared(peer_id, para_id);
250
251 match outcome {
252 DeclarationOutcome::Accepted => {
253 gum::debug!(
254 target: LOG_TARGET,
255 ?para_id,
256 ?peer_id,
257 "Peer declared",
258 );
259 },
260 DeclarationOutcome::Switched(old_para_id) => {
261 gum::debug!(
262 target: LOG_TARGET,
263 ?para_id,
264 ?old_para_id,
265 ?peer_id,
266 "Peer switched collating paraid. Trying to accept it on the new one.",
267 );
268
269 self.try_accept_connection(sender, peer_id, peer_info).await;
270 },
271 DeclarationOutcome::Rejected => {
272 gum::debug!(
273 target: LOG_TARGET,
274 ?para_id,
275 ?peer_id,
276 "Peer declared but rejected. Going to disconnect.",
277 );
278
279 self.disconnect_peers(sender, [peer_id].into_iter().collect()).await;
280 },
281 }
282 }
283
284 pub async fn slash_reputation(&mut self, peer_id: &PeerId, para_id: &ParaId, value: Score) {
286 gum::debug!(
287 target: LOG_TARGET,
288 ?peer_id,
289 ?para_id,
290 ?value,
291 "Slashing peer's reputation",
292 );
293
294 self.db.slash(peer_id, para_id, value).await;
295 self.connected.update_reputation(ReputationUpdate {
296 peer_id: *peer_id,
297 para_id: *para_id,
298 value,
299 kind: ReputationUpdateKind::Slash,
300 });
301 }
302
303 pub fn disconnected(&mut self, peer_id: &PeerId) {
305 self.connected.remove(peer_id);
306 }
307
308 pub async fn try_accept_connection<Sender: CollatorProtocolSenderTrait>(
310 &mut self,
311 sender: &mut Sender,
312 peer_id: PeerId,
313 peer_info: PeerInfo,
314 ) -> bool {
315 let db = &self.db;
316 let reputation_query_fn = |peer_id: PeerId, para_id: ParaId| async move {
317 db.query(&peer_id, ¶_id).await.unwrap_or_default()
319 };
320
321 let outcome = self.connected.try_accept(reputation_query_fn, peer_id, peer_info).await;
322
323 match outcome {
324 TryAcceptOutcome::Added => true,
325 TryAcceptOutcome::Replaced(other_peers) => {
326 gum::trace!(
327 target: LOG_TARGET,
328 "Peer {:?} replaced the connection slots of other peers: {:?}",
329 peer_id,
330 &other_peers
331 );
332 self.disconnect_peers(sender, other_peers).await;
333 true
334 },
335 TryAcceptOutcome::Rejected => {
336 gum::debug!(
337 target: LOG_TARGET,
338 ?peer_id,
339 "Peer connection was rejected",
340 );
341 self.disconnect_peers(sender, [peer_id].into_iter().collect()).await;
342 false
343 },
344 }
345 }
346
347 pub fn connected_peer_score(&self, peer_id: &PeerId, para_id: &ParaId) -> Option<Score> {
349 self.connected.peer_score(peer_id, para_id)
350 }
351
352 async fn disconnect_peers<Sender: CollatorProtocolSenderTrait>(
353 &self,
354 sender: &mut Sender,
355 peers: HashSet<PeerId>,
356 ) {
357 gum::trace!(
358 target: LOG_TARGET,
359 ?peers,
360 "Disconnecting peers",
361 );
362
363 sender
364 .send_message(NetworkBridgeTxMessage::DisconnectPeers(
365 peers.into_iter().collect(),
366 PeerSet::Collation,
367 ))
368 .await;
369 }
370}
371
372async fn get_ancestors<Sender: CollatorProtocolSenderTrait>(
373 sender: &mut Sender,
374 k: usize,
375 hash: Hash,
376) -> Result<Vec<Hash>> {
377 let (tx, rx) = oneshot::channel();
378 sender
379 .send_message(ChainApiMessage::Ancestors { hash, k, response_channel: tx })
380 .await;
381
382 Ok(rx.await.map_err(|_| Error::CanceledAncestors)??)
383}
384
385async fn get_latest_finalized_block<Sender: CollatorProtocolSenderTrait>(
386 sender: &mut Sender,
387) -> Result<(BlockNumber, Hash)> {
388 let (tx, rx) = oneshot::channel();
389 sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await;
390
391 let block_number = rx.await.map_err(|_| Error::CanceledFinalizedBlockNumber)??;
392
393 let (tx, rx) = oneshot::channel();
394 sender.send_message(ChainApiMessage::FinalizedBlockHash(block_number, tx)).await;
395
396 let block_hash = rx
397 .await
398 .map_err(|_| Error::CanceledFinalizedBlockHash)??
399 .ok_or_else(|| Error::FinalizedBlockNotFound(block_number))?;
400
401 Ok((block_number, block_hash))
402}
403
404async fn extract_reputation_bumps_on_new_finalized_block<Sender: CollatorProtocolSenderTrait>(
405 sender: &mut Sender,
406 processed_finalized_block_number: BlockNumber,
407 (latest_finalized_block_number, latest_finalized_block_hash): (BlockNumber, Hash),
408) -> Result<BTreeMap<ParaId, HashMap<PeerId, Score>>> {
409 if latest_finalized_block_number < processed_finalized_block_number {
410 gum::warn!(
412 target: LOG_TARGET,
413 latest_finalized_block_number,
414 ?latest_finalized_block_hash,
415 "Peer manager stored finalized block number {} is higher than the latest finalized block.",
416 processed_finalized_block_number,
417 );
418 return Ok(BTreeMap::new())
419 }
420
421 let ancestry_len = std::cmp::min(
422 latest_finalized_block_number.saturating_sub(processed_finalized_block_number),
423 MAX_STARTUP_ANCESTRY_LOOKBACK,
424 );
425
426 if ancestry_len == 0 {
427 return Ok(BTreeMap::new())
428 }
429
430 let mut ancestors =
431 get_ancestors(sender, ancestry_len as usize, latest_finalized_block_hash).await?;
432 ancestors.push(latest_finalized_block_hash);
433 ancestors.reverse();
434
435 gum::trace!(
436 target: LOG_TARGET,
437 ?latest_finalized_block_hash,
438 processed_finalized_block_number,
439 "Processing reputation bumps for finalized relay parent {} and its {} ancestors",
440 latest_finalized_block_number,
441 ancestry_len
442 );
443
444 let mut v2_candidates_per_rp: HashMap<Hash, BTreeMap<ParaId, HashSet<CandidateHash>>> =
445 HashMap::with_capacity(ancestors.len());
446
447 for i in 1..ancestors.len() {
448 let rp = ancestors[i];
449 let parent_rp = ancestors[i - 1];
450 let candidate_events = recv_runtime(request_candidate_events(rp, sender).await).await?;
451
452 for event in candidate_events {
453 if let CandidateEvent::CandidateIncluded(receipt, _, _, _) = event {
454 if receipt.descriptor.version() == CandidateDescriptorVersion::V2 {
456 v2_candidates_per_rp
457 .entry(parent_rp)
458 .or_default()
459 .entry(receipt.descriptor.para_id())
460 .or_default()
461 .insert(receipt.hash());
462 }
463 }
464 }
465 }
466
467 let mut updates: BTreeMap<ParaId, HashMap<PeerId, Score>> = BTreeMap::new();
469 for (rp, per_para) in v2_candidates_per_rp {
470 for (para_id, included_candidates) in per_para {
471 let candidates_pending_availability =
472 recv_runtime(request_candidates_pending_availability(rp, para_id, sender).await)
473 .await?;
474
475 for candidate in candidates_pending_availability {
476 let candidate_hash = candidate.hash();
477 if included_candidates.contains(&candidate_hash) {
478 match candidate.commitments.ump_signals() {
479 Ok(ump_signals) => {
480 if let Some(approved_peer) = ump_signals.approved_peer() {
481 match PeerId::from_bytes(approved_peer) {
482 Ok(peer_id) => updates
483 .entry(para_id)
484 .or_default()
485 .entry(peer_id)
486 .or_default()
487 .saturating_add(VALID_INCLUDED_CANDIDATE_BUMP),
488 Err(err) => {
489 gum::debug!(
492 target: LOG_TARGET,
493 ?candidate_hash,
494 "UMP signal contains invalid ApprovedPeer id: {}",
495 err
496 );
497 },
498 }
499 }
500 },
501 Err(err) => {
502 gum::warn!(
505 target: LOG_TARGET,
506 ?candidate_hash,
507 "Failed to parse UMP signals for included candidate: {}",
508 err
509 );
510 },
511 }
512 }
513 }
514 }
515 }
516
517 Ok(updates)
518}