referrerpolicy=no-referrer-when-downgrade

polkadot_collator_protocol/validator_side_experimental/peer_manager/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16mod backend;
17mod connected;
18mod db;
19
20use futures::channel::oneshot;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
22
23use crate::{
24	validator_side_experimental::{
25		common::{
26			PeerInfo, PeerState, Score, CONNECTED_PEERS_LIMIT, CONNECTED_PEERS_PARA_LIMIT,
27			INACTIVITY_DECAY, MAX_STARTUP_ANCESTRY_LOOKBACK, MAX_STORED_SCORES_PER_PARA,
28			VALID_INCLUDED_CANDIDATE_BUMP,
29		},
30		error::{Error, Result},
31	},
32	LOG_TARGET,
33};
34pub use backend::Backend;
35use connected::ConnectedPeers;
36pub use db::Db;
37use polkadot_node_network_protocol::{
38	peer_set::{CollationVersion, PeerSet},
39	PeerId,
40};
41use polkadot_node_subsystem::{
42	messages::{ChainApiMessage, NetworkBridgeTxMessage},
43	ActivatedLeaf, CollatorProtocolSenderTrait,
44};
45use polkadot_node_subsystem_util::{
46	request_candidate_events, request_candidates_pending_availability, runtime::recv_runtime,
47};
48use polkadot_primitives::{
49	BlockNumber, CandidateDescriptorVersion, CandidateEvent, CandidateHash, Hash, Id as ParaId,
50};
51
52#[derive(Debug, PartialEq, Clone)]
53pub struct ReputationUpdate {
54	pub peer_id: PeerId,
55	pub para_id: ParaId,
56	pub value: Score,
57	pub kind: ReputationUpdateKind,
58}
59
60#[derive(Debug, PartialEq, Clone)]
61pub enum ReputationUpdateKind {
62	Bump,
63	Slash,
64}
65
66#[derive(Debug, PartialEq)]
67enum TryAcceptOutcome {
68	Added,
69	// This can hold more than one `PeerId` because before receiving the `Declare` message,
70	// one peer can hold connection slots for multiple paraids.
71	// The set can also be empty if this peer replaced some other peer's slot but that other peer
72	// maintained a connection slot for another para (therefore not disconnected).
73	// The number of peers in the set is bound to the number of scheduled paras.
74	Replaced(HashSet<PeerId>),
75	Rejected,
76}
77
78impl TryAcceptOutcome {
79	fn combine(self, other: Self) -> Self {
80		use TryAcceptOutcome::*;
81		match (self, other) {
82			(Added, Added) => Added,
83			(Rejected, Rejected) => Rejected,
84			(Added, Rejected) | (Rejected, Added) => Added,
85			(Replaced(mut replaced_a), Replaced(replaced_b)) => {
86				replaced_a.extend(replaced_b);
87				Replaced(replaced_a)
88			},
89			(_, Replaced(replaced)) | (Replaced(replaced), _) => Replaced(replaced),
90		}
91	}
92}
93
94#[derive(Debug, PartialEq)]
95enum DeclarationOutcome {
96	Rejected,
97	Switched(ParaId),
98	Accepted,
99}
100
101pub struct PeerManager<B> {
102	db: B,
103	connected: ConnectedPeers,
104}
105
106impl<B: Backend> PeerManager<B> {
107	/// Initialize the peer manager (called on subsystem startup, after the node finished syncing to
108	/// the tip of the chain).
109	pub async fn startup<Sender: CollatorProtocolSenderTrait>(
110		backend: B,
111		sender: &mut Sender,
112		scheduled_paras: BTreeSet<ParaId>,
113	) -> Result<Self> {
114		let mut instance = Self {
115			db: backend,
116			connected: ConnectedPeers::new(
117				scheduled_paras,
118				CONNECTED_PEERS_LIMIT,
119				CONNECTED_PEERS_PARA_LIMIT,
120			),
121		};
122
123		let (latest_finalized_block_number, latest_finalized_block_hash) =
124			get_latest_finalized_block(sender).await?;
125
126		let processed_finalized_block_number =
127			instance.db.processed_finalized_block_number().await.unwrap_or_default();
128
129		let bumps = extract_reputation_bumps_on_new_finalized_block(
130			sender,
131			processed_finalized_block_number,
132			(latest_finalized_block_number, latest_finalized_block_hash),
133		)
134		.await?;
135
136		instance.db.process_bumps(latest_finalized_block_number, bumps, None).await;
137
138		Ok(instance)
139	}
140
141	/// Handle a new block finality notification, by updating peer reputations.
142	pub async fn update_reputations_on_new_finalized_block<Sender: CollatorProtocolSenderTrait>(
143		&mut self,
144		sender: &mut Sender,
145		(finalized_block_hash, finalized_block_number): (Hash, BlockNumber),
146	) -> Result<()> {
147		let processed_finalized_block_number =
148			self.db.processed_finalized_block_number().await.unwrap_or_default();
149
150		let bumps = extract_reputation_bumps_on_new_finalized_block(
151			sender,
152			processed_finalized_block_number,
153			(finalized_block_number, finalized_block_hash),
154		)
155		.await?;
156
157		let updates = self
158			.db
159			.process_bumps(
160				finalized_block_number,
161				bumps,
162				Some(Score::new(INACTIVITY_DECAY).expect("INACTIVITY_DECAY is a valid score")),
163			)
164			.await;
165		for update in updates {
166			self.connected.update_reputation(update);
167		}
168
169		Ok(())
170	}
171
172	/// Process the registered paras and cleanup all data pertaining to any unregistered paras, if
173	/// any. Should be called every N finalized block notifications, since it's expected that para
174	/// deregistrations are rare.
175	pub async fn registered_paras_update(&mut self, registered_paras: BTreeSet<ParaId>) {
176		// Tell the DB to cleanup paras that are no longer registered. No need to clean up the
177		// connected peers state, since it will get automatically cleaned up as the claim queue
178		// gets rid of these stale assignments.
179		self.db.prune_paras(registered_paras).await;
180	}
181
182	/// Process a potential change of the scheduled paras.
183	pub async fn scheduled_paras_update<Sender: CollatorProtocolSenderTrait>(
184		&mut self,
185		sender: &mut Sender,
186		scheduled_paras: BTreeSet<ParaId>,
187	) {
188		let mut prev_scheduled_paras: BTreeSet<_> =
189			self.connected.scheduled_paras().copied().collect();
190
191		if prev_scheduled_paras == scheduled_paras {
192			// Nothing to do if the scheduled paras didn't change.
193			return
194		}
195
196		// Recreate the connected peers based on the new schedule and try populating it again based
197		// on their reputations. Disconnect any peers that couldn't be kept
198		let mut new_instance =
199			ConnectedPeers::new(scheduled_paras, CONNECTED_PEERS_LIMIT, CONNECTED_PEERS_PARA_LIMIT);
200
201		std::mem::swap(&mut new_instance, &mut self.connected);
202		let prev_instance = new_instance;
203		let (prev_peers, cached_scores) = prev_instance.consume();
204
205		// Build a closure that can be used to first query the in-memory past reputations of the
206		// peers before reaching for the DB.
207
208		// Borrow these for use in the closure.
209		let cached_scores = &cached_scores;
210		let db = &self.db;
211		let reputation_query_fn = |peer_id: PeerId, para_id: ParaId| async move {
212			if let Some(cached_score) =
213				cached_scores.get(&para_id).and_then(|per_para| per_para.get_score(&peer_id))
214			{
215				cached_score
216			} else {
217				db.query(&peer_id, &para_id).await.unwrap_or_default()
218			}
219		};
220
221		// See which of the old peers we should keep.
222		let mut peers_to_disconnect = HashSet::new();
223		for (peer_id, peer_info) in prev_peers {
224			let outcome = self.connected.try_accept(reputation_query_fn, peer_id, peer_info).await;
225
226			match outcome {
227				TryAcceptOutcome::Rejected => {
228					peers_to_disconnect.insert(peer_id);
229				},
230				TryAcceptOutcome::Replaced(replaced_peer_ids) => {
231					peers_to_disconnect.extend(replaced_peer_ids);
232				},
233				TryAcceptOutcome::Added => {},
234			}
235		}
236
237		// Disconnect peers that couldn't be kept.
238		self.disconnect_peers(sender, peers_to_disconnect).await;
239	}
240
241	/// Process a declaration message of a peer.
242	pub async fn declared<Sender: CollatorProtocolSenderTrait>(
243		&mut self,
244		sender: &mut Sender,
245		peer_id: PeerId,
246		para_id: ParaId,
247	) {
248		let Some(peer_info) = self.connected.peer_info(&peer_id).cloned() else { return };
249		let outcome = self.connected.declared(peer_id, para_id);
250
251		match outcome {
252			DeclarationOutcome::Accepted => {
253				gum::debug!(
254					target: LOG_TARGET,
255					?para_id,
256					?peer_id,
257					"Peer declared",
258				);
259			},
260			DeclarationOutcome::Switched(old_para_id) => {
261				gum::debug!(
262					target: LOG_TARGET,
263					?para_id,
264					?old_para_id,
265					?peer_id,
266					"Peer switched collating paraid. Trying to accept it on the new one.",
267				);
268
269				self.try_accept_connection(sender, peer_id, peer_info).await;
270			},
271			DeclarationOutcome::Rejected => {
272				gum::debug!(
273					target: LOG_TARGET,
274					?para_id,
275					?peer_id,
276					"Peer declared but rejected. Going to disconnect.",
277				);
278
279				self.disconnect_peers(sender, [peer_id].into_iter().collect()).await;
280			},
281		}
282	}
283
284	/// Slash a peer's reputation for this paraid.
285	pub async fn slash_reputation(&mut self, peer_id: &PeerId, para_id: &ParaId, value: Score) {
286		gum::debug!(
287			target: LOG_TARGET,
288			?peer_id,
289			?para_id,
290			?value,
291			"Slashing peer's reputation",
292		);
293
294		self.db.slash(peer_id, para_id, value).await;
295		self.connected.update_reputation(ReputationUpdate {
296			peer_id: *peer_id,
297			para_id: *para_id,
298			value,
299			kind: ReputationUpdateKind::Slash,
300		});
301	}
302
303	/// Process a peer disconnected event coming from the network.
304	pub fn disconnected(&mut self, peer_id: &PeerId) {
305		self.connected.remove(peer_id);
306	}
307
308	/// A connection was made, triage it. Return whether or not is was kept.
309	pub async fn try_accept_connection<Sender: CollatorProtocolSenderTrait>(
310		&mut self,
311		sender: &mut Sender,
312		peer_id: PeerId,
313		peer_info: PeerInfo,
314	) -> bool {
315		let db = &self.db;
316		let reputation_query_fn = |peer_id: PeerId, para_id: ParaId| async move {
317			// Go straight to the DB. We only store in-memory the reputations of connected peers.
318			db.query(&peer_id, &para_id).await.unwrap_or_default()
319		};
320
321		let outcome = self.connected.try_accept(reputation_query_fn, peer_id, peer_info).await;
322
323		match outcome {
324			TryAcceptOutcome::Added => true,
325			TryAcceptOutcome::Replaced(other_peers) => {
326				gum::trace!(
327					target: LOG_TARGET,
328					"Peer {:?} replaced the connection slots of other peers: {:?}",
329					peer_id,
330					&other_peers
331				);
332				self.disconnect_peers(sender, other_peers).await;
333				true
334			},
335			TryAcceptOutcome::Rejected => {
336				gum::debug!(
337					target: LOG_TARGET,
338					?peer_id,
339					"Peer connection was rejected",
340				);
341				self.disconnect_peers(sender, [peer_id].into_iter().collect()).await;
342				false
343			},
344		}
345	}
346
347	/// Retrieve the score of the connected peer. We assume the peer is declared for this paraid.
348	pub fn connected_peer_score(&self, peer_id: &PeerId, para_id: &ParaId) -> Option<Score> {
349		self.connected.peer_score(peer_id, para_id)
350	}
351
352	async fn disconnect_peers<Sender: CollatorProtocolSenderTrait>(
353		&self,
354		sender: &mut Sender,
355		peers: HashSet<PeerId>,
356	) {
357		gum::trace!(
358			target: LOG_TARGET,
359			?peers,
360			"Disconnecting peers",
361		);
362
363		sender
364			.send_message(NetworkBridgeTxMessage::DisconnectPeers(
365				peers.into_iter().collect(),
366				PeerSet::Collation,
367			))
368			.await;
369	}
370}
371
372async fn get_ancestors<Sender: CollatorProtocolSenderTrait>(
373	sender: &mut Sender,
374	k: usize,
375	hash: Hash,
376) -> Result<Vec<Hash>> {
377	let (tx, rx) = oneshot::channel();
378	sender
379		.send_message(ChainApiMessage::Ancestors { hash, k, response_channel: tx })
380		.await;
381
382	Ok(rx.await.map_err(|_| Error::CanceledAncestors)??)
383}
384
385async fn get_latest_finalized_block<Sender: CollatorProtocolSenderTrait>(
386	sender: &mut Sender,
387) -> Result<(BlockNumber, Hash)> {
388	let (tx, rx) = oneshot::channel();
389	sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await;
390
391	let block_number = rx.await.map_err(|_| Error::CanceledFinalizedBlockNumber)??;
392
393	let (tx, rx) = oneshot::channel();
394	sender.send_message(ChainApiMessage::FinalizedBlockHash(block_number, tx)).await;
395
396	let block_hash = rx
397		.await
398		.map_err(|_| Error::CanceledFinalizedBlockHash)??
399		.ok_or_else(|| Error::FinalizedBlockNotFound(block_number))?;
400
401	Ok((block_number, block_hash))
402}
403
404async fn extract_reputation_bumps_on_new_finalized_block<Sender: CollatorProtocolSenderTrait>(
405	sender: &mut Sender,
406	processed_finalized_block_number: BlockNumber,
407	(latest_finalized_block_number, latest_finalized_block_hash): (BlockNumber, Hash),
408) -> Result<BTreeMap<ParaId, HashMap<PeerId, Score>>> {
409	if latest_finalized_block_number < processed_finalized_block_number {
410		// Shouldn't be possible, but in this case there is no other initialisation needed.
411		gum::warn!(
412			target: LOG_TARGET,
413			latest_finalized_block_number,
414			?latest_finalized_block_hash,
415			"Peer manager stored finalized block number {} is higher than the latest finalized block.",
416			processed_finalized_block_number,
417		);
418		return Ok(BTreeMap::new())
419	}
420
421	let ancestry_len = std::cmp::min(
422		latest_finalized_block_number.saturating_sub(processed_finalized_block_number),
423		MAX_STARTUP_ANCESTRY_LOOKBACK,
424	);
425
426	if ancestry_len == 0 {
427		return Ok(BTreeMap::new())
428	}
429
430	let mut ancestors =
431		get_ancestors(sender, ancestry_len as usize, latest_finalized_block_hash).await?;
432	ancestors.push(latest_finalized_block_hash);
433	ancestors.reverse();
434
435	gum::trace!(
436		target: LOG_TARGET,
437		?latest_finalized_block_hash,
438		processed_finalized_block_number,
439		"Processing reputation bumps for finalized relay parent {} and its {} ancestors",
440		latest_finalized_block_number,
441		ancestry_len
442	);
443
444	let mut v2_candidates_per_rp: HashMap<Hash, BTreeMap<ParaId, HashSet<CandidateHash>>> =
445		HashMap::with_capacity(ancestors.len());
446
447	for i in 1..ancestors.len() {
448		let rp = ancestors[i];
449		let parent_rp = ancestors[i - 1];
450		let candidate_events = recv_runtime(request_candidate_events(rp, sender).await).await?;
451
452		for event in candidate_events {
453			if let CandidateEvent::CandidateIncluded(receipt, _, _, _) = event {
454				// Only v2 receipts can contain UMP signals.
455				if receipt.descriptor.version() == CandidateDescriptorVersion::V2 {
456					v2_candidates_per_rp
457						.entry(parent_rp)
458						.or_default()
459						.entry(receipt.descriptor.para_id())
460						.or_default()
461						.insert(receipt.hash());
462				}
463			}
464		}
465	}
466
467	// This could be removed if we implemented https://github.com/paritytech/polkadot-sdk/issues/7732.
468	let mut updates: BTreeMap<ParaId, HashMap<PeerId, Score>> = BTreeMap::new();
469	for (rp, per_para) in v2_candidates_per_rp {
470		for (para_id, included_candidates) in per_para {
471			let candidates_pending_availability =
472				recv_runtime(request_candidates_pending_availability(rp, para_id, sender).await)
473					.await?;
474
475			for candidate in candidates_pending_availability {
476				let candidate_hash = candidate.hash();
477				if included_candidates.contains(&candidate_hash) {
478					match candidate.commitments.ump_signals() {
479						Ok(ump_signals) => {
480							if let Some(approved_peer) = ump_signals.approved_peer() {
481								match PeerId::from_bytes(approved_peer) {
482									Ok(peer_id) => updates
483										.entry(para_id)
484										.or_default()
485										.entry(peer_id)
486										.or_default()
487										.saturating_add(VALID_INCLUDED_CANDIDATE_BUMP),
488									Err(err) => {
489										// Collator sent an invalid peerid. It's only harming
490										// itself.
491										gum::debug!(
492											target: LOG_TARGET,
493											?candidate_hash,
494											"UMP signal contains invalid ApprovedPeer id: {}",
495											err
496										);
497									},
498								}
499							}
500						},
501						Err(err) => {
502							// This should never happen, as the ump signals are checked during
503							// on-chain backing.
504							gum::warn!(
505								target: LOG_TARGET,
506								?candidate_hash,
507								"Failed to parse UMP signals for included candidate: {}",
508								err
509							);
510						},
511					}
512				}
513			}
514		}
515	}
516
517	Ok(updates)
518}