1use crate::{
23 block_announce_validator::{
24 BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
25 },
26 pending_responses::{PendingResponses, ResponseEvent},
27 service::{
28 self,
29 syncing_service::{SyncingService, ToServiceCommand},
30 },
31 strategy::{SyncingAction, SyncingStrategy},
32 types::{BadPeer, ExtendedPeerInfo, SyncEvent},
33 LOG_TARGET,
34};
35
36use codec::{Decode, DecodeAll, Encode};
37use futures::{channel::oneshot, StreamExt};
38use log::{debug, error, trace, warn};
39use prometheus_endpoint::{
40 register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
41};
42use schnellru::{ByLength, LruMap};
43use tokio::time::{Interval, MissedTickBehavior};
44
45use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
46use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
47use sc_network::{
48 config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
49 peer_store::PeerStoreProvider,
50 request_responses::{OutboundFailure, RequestFailure},
51 service::{
52 traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
53 NotificationMetrics,
54 },
55 types::ProtocolName,
56 utils::LruHashSet,
57 NetworkBackend, NotificationService, ReputationChange,
58};
59use sc_network_common::{
60 role::Roles,
61 sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
62};
63use sc_network_types::PeerId;
64use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
65use sp_blockchain::{Error as ClientError, HeaderMetadata};
66use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
67use sp_runtime::{
68 traits::{Block as BlockT, Header, NumberFor, Zero},
69 Justifications,
70};
71
72use std::{
73 collections::{HashMap, HashSet},
74 iter,
75 num::NonZeroUsize,
76 sync::{
77 atomic::{AtomicBool, AtomicUsize, Ordering},
78 Arc,
79 },
80};
81
82const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
84
85const MAX_KNOWN_BLOCKS: usize = 1024; const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
90
91pub fn block_announces_protocol_name<Hash: AsRef<[u8]>>(
93 genesis_hash: Hash,
94 fork_id: Option<&str>,
95) -> String {
96 let genesis_hash = genesis_hash.as_ref();
97 if let Some(fork_id) = fork_id {
98 format!("/{}/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
99 } else {
100 format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
101 }
102}
103
104pub fn block_announces_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
106 format!("/{}/block-announces/1", protocol_id.as_ref())
107}
108
109mod rep {
110 use sc_network::ReputationChange as Rep;
111 pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
113 pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
115 pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
117 pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
119 pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
121 pub const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
123}
124
125struct Metrics {
126 peers: Gauge<U64>,
127 import_queue_blocks_submitted: Counter<U64>,
128 import_queue_justifications_submitted: Counter<U64>,
129}
130
131impl Metrics {
132 fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
133 MajorSyncingGauge::register(r, major_syncing)?;
134 Ok(Self {
135 peers: {
136 let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
137 register(g, r)?
138 },
139 import_queue_blocks_submitted: {
140 let c = Counter::new(
141 "substrate_sync_import_queue_blocks_submitted",
142 "Number of blocks submitted to the import queue.",
143 )?;
144 register(c, r)?
145 },
146 import_queue_justifications_submitted: {
147 let c = Counter::new(
148 "substrate_sync_import_queue_justifications_submitted",
149 "Number of justifications submitted to the import queue.",
150 )?;
151 register(c, r)?
152 },
153 })
154 }
155}
156
157#[derive(Clone)]
159pub struct MajorSyncingGauge(Arc<AtomicBool>);
160
161impl MajorSyncingGauge {
162 fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
165 prometheus_endpoint::register(
166 SourcedGauge::new(
167 &Opts::new(
168 "substrate_sub_libp2p_is_major_syncing",
169 "Whether the node is performing a major sync or not.",
170 ),
171 MajorSyncingGauge(value),
172 )?,
173 registry,
174 )?;
175
176 Ok(())
177 }
178}
179
180impl MetricSource for MajorSyncingGauge {
181 type N = u64;
182
183 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
184 set(&[], self.0.load(Ordering::Relaxed) as u64);
185 }
186}
187
188#[derive(Debug)]
190pub struct Peer<B: BlockT> {
191 pub info: ExtendedPeerInfo<B>,
192 pub known_blocks: LruHashSet<B::Hash>,
194 inbound: bool,
196}
197
198pub struct SyncingEngine<B: BlockT, Client> {
199 strategy: Box<dyn SyncingStrategy<B>>,
201
202 client: Arc<Client>,
204
205 num_connected: Arc<AtomicUsize>,
207
208 is_major_syncing: Arc<AtomicBool>,
210
211 network_service: service::network::NetworkServiceHandle,
213
214 service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
216
217 roles: Roles,
219
220 genesis_hash: B::Hash,
222
223 event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
225
226 tick_timeout: Interval,
228
229 peers: HashMap<PeerId, Peer<B>>,
231
232 important_peers: HashSet<PeerId>,
235
236 default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
238
239 default_peers_set_no_slot_peers: HashSet<PeerId>,
241
242 default_peers_set_num_full: usize,
245
246 default_peers_set_num_light: usize,
248
249 max_in_peers: usize,
251
252 num_in_peers: usize,
254
255 dynamic_no_slot_peers: HashSet<PeerId>,
258
259 block_announce_validator: BlockAnnounceValidatorStream<B>,
261
262 block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
264
265 boot_node_ids: HashSet<PeerId>,
267
268 block_announce_protocol_name: ProtocolName,
270
271 metrics: Option<Metrics>,
273
274 notification_service: Box<dyn NotificationService>,
276
277 peer_store_handle: Arc<dyn PeerStoreProvider>,
279
280 pending_responses: PendingResponses,
282
283 import_queue: Box<dyn ImportQueueService<B>>,
285}
286
287impl<B: BlockT, Client> SyncingEngine<B, Client>
288where
289 B: BlockT,
290 Client: HeaderBackend<B>
291 + BlockBackend<B>
292 + HeaderMetadata<B, Error = sp_blockchain::Error>
293 + ProofProvider<B>
294 + Send
295 + Sync
296 + 'static,
297{
298 pub fn new<N>(
299 roles: Roles,
300 client: Arc<Client>,
301 metrics_registry: Option<&Registry>,
302 network_metrics: NotificationMetrics,
303 net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
304 protocol_id: ProtocolId,
305 fork_id: Option<&str>,
306 block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
307 syncing_strategy: Box<dyn SyncingStrategy<B>>,
308 network_service: service::network::NetworkServiceHandle,
309 import_queue: Box<dyn ImportQueueService<B>>,
310 peer_store_handle: Arc<dyn PeerStoreProvider>,
311 ) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
312 where
313 N: NetworkBackend<B, <B as BlockT>::Hash>,
314 {
315 let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
316 net_config.network_config.default_peers_set.out_peers)
317 .max(1);
318 let important_peers = {
319 let mut imp_p = HashSet::new();
320 for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
321 imp_p.insert(reserved.peer_id);
322 }
323 for config in net_config.notification_protocols() {
324 let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
325 imp_p.extend(peer_ids);
326 }
327
328 imp_p.shrink_to_fit();
329 imp_p
330 };
331 let boot_node_ids = {
332 let mut list = HashSet::new();
333 for node in &net_config.network_config.boot_nodes {
334 list.insert(node.peer_id);
335 }
336 list.shrink_to_fit();
337 list
338 };
339 let default_peers_set_no_slot_peers = {
340 let mut no_slot_p: HashSet<PeerId> = net_config
341 .network_config
342 .default_peers_set
343 .reserved_nodes
344 .iter()
345 .map(|reserved| reserved.peer_id)
346 .collect();
347 no_slot_p.shrink_to_fit();
348 no_slot_p
349 };
350 let default_peers_set_num_full =
351 net_config.network_config.default_peers_set_num_full as usize;
352 let default_peers_set_num_light = {
353 let total = net_config.network_config.default_peers_set.out_peers +
354 net_config.network_config.default_peers_set.in_peers;
355 total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
356 };
357
358 let info = client.info();
359
360 let (block_announce_config, notification_service) =
361 Self::get_block_announce_proto_config::<N>(
362 protocol_id,
363 fork_id,
364 roles,
365 info.best_number,
366 info.best_hash,
367 info.genesis_hash,
368 &net_config.network_config.default_peers_set,
369 network_metrics,
370 Arc::clone(&peer_store_handle),
371 );
372
373 let block_announce_protocol_name = block_announce_config.protocol_name().clone();
374 let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
375 let num_connected = Arc::new(AtomicUsize::new(0));
376 let is_major_syncing = Arc::new(AtomicBool::new(false));
377
378 let max_full_peers = net_config.network_config.default_peers_set_num_full;
381 let max_out_peers = net_config.network_config.default_peers_set.out_peers;
382 let max_in_peers = (max_full_peers - max_out_peers) as usize;
383
384 let tick_timeout = {
385 let mut interval = tokio::time::interval(TICK_TIMEOUT);
386 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
387 interval
388 };
389
390 Ok((
391 Self {
392 roles,
393 client,
394 strategy: syncing_strategy,
395 network_service,
396 peers: HashMap::new(),
397 block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
398 block_announce_protocol_name,
399 block_announce_validator: BlockAnnounceValidatorStream::new(
400 block_announce_validator,
401 ),
402 num_connected: num_connected.clone(),
403 is_major_syncing: is_major_syncing.clone(),
404 service_rx,
405 genesis_hash: info.genesis_hash,
406 important_peers,
407 default_peers_set_no_slot_connected_peers: HashSet::new(),
408 boot_node_ids,
409 default_peers_set_no_slot_peers,
410 default_peers_set_num_full,
411 default_peers_set_num_light,
412 num_in_peers: 0usize,
413 max_in_peers,
414 dynamic_no_slot_peers: HashSet::new(),
415 event_streams: Vec::new(),
416 notification_service,
417 tick_timeout,
418 peer_store_handle,
419 metrics: if let Some(r) = metrics_registry {
420 match Metrics::register(r, is_major_syncing.clone()) {
421 Ok(metrics) => Some(metrics),
422 Err(err) => {
423 log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
424 None
425 },
426 }
427 } else {
428 None
429 },
430 pending_responses: PendingResponses::new(),
431 import_queue,
432 },
433 SyncingService::new(tx, num_connected, is_major_syncing),
434 block_announce_config,
435 ))
436 }
437
438 fn update_peer_info(
439 &mut self,
440 peer_id: &PeerId,
441 best_hash: B::Hash,
442 best_number: NumberFor<B>,
443 ) {
444 if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
445 peer.info.best_hash = best_hash;
446 peer.info.best_number = best_number;
447 }
448 }
449
450 fn process_block_announce_validation_result(
452 &mut self,
453 validation_result: BlockAnnounceValidationResult<B::Header>,
454 ) {
455 match validation_result {
456 BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
457 BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
458 if let Some((best_hash, best_number)) =
459 self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
460 {
461 self.update_peer_info(&peer_id, best_hash, best_number);
462 }
463
464 if let Some(data) = announce.data {
465 if !data.is_empty() {
466 self.block_announce_data_cache.insert(announce.header.hash(), data);
467 }
468 }
469 },
470 BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
471 if disconnect {
472 log::debug!(
473 target: LOG_TARGET,
474 "Disconnecting peer {peer_id} due to block announce validation failure",
475 );
476 self.network_service
477 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
478 }
479
480 self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
481 },
482 }
483 }
484
485 pub fn push_block_announce_validation(
487 &mut self,
488 peer_id: PeerId,
489 announce: BlockAnnounce<B::Header>,
490 ) {
491 let hash = announce.header.hash();
492
493 let peer = match self.peers.get_mut(&peer_id) {
494 Some(p) => p,
495 None => {
496 log::error!(
497 target: LOG_TARGET,
498 "Received block announce from disconnected peer {peer_id}",
499 );
500 debug_assert!(false);
501 return;
502 },
503 };
504 peer.known_blocks.insert(hash);
505
506 if peer.info.roles.is_full() {
507 let is_best = match announce.state.unwrap_or(BlockState::Best) {
508 BlockState::Best => true,
509 BlockState::Normal => false,
510 };
511
512 self.block_announce_validator
513 .push_block_announce_validation(peer_id, hash, announce, is_best);
514 }
515 }
516
517 pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
522 let header = match self.client.header(hash) {
523 Ok(Some(header)) => header,
524 Ok(None) => {
525 log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
526 return;
527 },
528 Err(e) => {
529 log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
530 return;
531 },
532 };
533
534 if header.number().is_zero() {
536 return;
537 }
538
539 let is_best = self.client.info().best_hash == hash;
540 log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
541
542 let data = data
543 .or_else(|| self.block_announce_data_cache.get(&hash).cloned())
544 .unwrap_or_default();
545
546 for (peer_id, ref mut peer) in self.peers.iter_mut() {
547 let inserted = peer.known_blocks.insert(hash);
548 if inserted {
549 log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
550 let message = BlockAnnounce {
551 header: header.clone(),
552 state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
553 data: Some(data.clone()),
554 };
555
556 let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
557 }
558 }
559 }
560
561 pub async fn run(mut self) {
562 loop {
563 tokio::select! {
564 _ = self.tick_timeout.tick() => {
565 },
569 command = self.service_rx.select_next_some() =>
570 self.process_service_command(command),
571 notification_event = self.notification_service.next_event() => match notification_event {
572 Some(event) => self.process_notification_event(event),
573 None => {
574 error!(
575 target: LOG_TARGET,
576 "Terminating `SyncingEngine` because `NotificationService` has terminated.",
577 );
578
579 return;
580 }
581 },
582 response_event = self.pending_responses.select_next_some() =>
583 self.process_response_event(response_event),
584 validation_result = self.block_announce_validator.select_next_some() =>
585 self.process_block_announce_validation_result(validation_result),
586 }
587
588 self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
590
591 if let Err(e) = self.process_strategy_actions() {
593 error!(
594 target: LOG_TARGET,
595 "Terminating `SyncingEngine` due to fatal error: {e:?}.",
596 );
597 return;
598 }
599 }
600 }
601
602 fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
603 for action in self.strategy.actions(&self.network_service)? {
604 match action {
605 SyncingAction::StartRequest { peer_id, key, request, remove_obsolete } => {
606 if !self.peers.contains_key(&peer_id) {
607 trace!(
608 target: LOG_TARGET,
609 "Cannot start request with strategy key {key:?} to unknown peer \
610 {peer_id}",
611 );
612 debug_assert!(false);
613 continue;
614 }
615 if remove_obsolete {
616 if self.pending_responses.remove(peer_id, key) {
617 warn!(
618 target: LOG_TARGET,
619 "Processed `SyncingAction::StartRequest` to {peer_id} with \
620 strategy key {key:?}. Stale response removed!",
621 )
622 } else {
623 trace!(
624 target: LOG_TARGET,
625 "Processed `SyncingAction::StartRequest` to {peer_id} with \
626 strategy key {key:?}.",
627 )
628 }
629 }
630
631 self.pending_responses.insert(peer_id, key, request);
632 },
633 SyncingAction::CancelRequest { peer_id, key } => {
634 let removed = self.pending_responses.remove(peer_id, key);
635
636 trace!(
637 target: LOG_TARGET,
638 "Processed `SyncingAction::CancelRequest`, response removed: {removed}.",
639 );
640 },
641 SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
642 self.pending_responses.remove_all(&peer_id);
643 self.network_service
644 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
645 self.network_service.report_peer(peer_id, rep);
646
647 trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
648 },
649 SyncingAction::ImportBlocks { origin, blocks } => {
650 let count = blocks.len();
651 self.import_blocks(origin, blocks);
652
653 trace!(
654 target: LOG_TARGET,
655 "Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
656 );
657 },
658 SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
659 self.import_justifications(peer_id, hash, number, justifications);
660
661 trace!(
662 target: LOG_TARGET,
663 "Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
664 peer_id,
665 hash,
666 number,
667 )
668 },
669 SyncingAction::Finished => {},
671 }
672 }
673
674 Ok(())
675 }
676
677 fn apply_no_slot_set(&mut self, new_dynamic_no_slot: HashSet<PeerId>) {
680 let connected_peers = &self.peers;
681 apply_no_slot_set_inner(
682 |peer_id| {
683 connected_peers
684 .get(peer_id)
685 .map(|peer| peer.inbound && peer.info.roles.is_full())
686 },
687 &self.default_peers_set_no_slot_peers,
688 &self.dynamic_no_slot_peers,
689 &new_dynamic_no_slot,
690 &mut self.default_peers_set_no_slot_connected_peers,
691 &mut self.num_in_peers,
692 self.max_in_peers,
693 &self.network_service,
694 &self.block_announce_protocol_name,
695 );
696 self.dynamic_no_slot_peers = new_dynamic_no_slot;
697 }
698
699 fn process_service_command(&mut self, command: ToServiceCommand<B>) {
700 match command {
701 ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
702 self.strategy.set_sync_fork_request(peers, &hash, number);
703 },
704 ToServiceCommand::EventStream(tx) => {
705 for peer_id in self.peers.keys() {
707 let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
708 }
709 self.event_streams.push(tx);
710 },
711 ToServiceCommand::RequestJustification(hash, number) => {
712 self.strategy.request_justification(&hash, number)
713 },
714 ToServiceCommand::ClearJustificationRequests => {
715 self.strategy.clear_justification_requests()
716 },
717 ToServiceCommand::BlocksProcessed(imported, count, results) => {
718 self.strategy.on_blocks_processed(imported, count, results);
719 },
720 ToServiceCommand::JustificationImported(peer_id, hash, number, import_result) => {
721 let success =
722 matches!(import_result, sc_consensus::JustificationImportResult::Success);
723 self.strategy.on_justification_import(hash, number, success);
724
725 match import_result {
726 sc_consensus::JustificationImportResult::OutdatedJustification => {
727 log::info!(
728 target: LOG_TARGET,
729 "๐ Outdated justification provided by {peer_id} for #{hash}",
730 );
731 },
732 sc_consensus::JustificationImportResult::Failure => {
733 log::info!(
734 target: LOG_TARGET,
735 "๐ Invalid justification provided by {peer_id} for #{hash}",
736 );
737 self.network_service
738 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
739 self.network_service.report_peer(
740 peer_id,
741 ReputationChange::new_fatal("Invalid justification"),
742 );
743 },
744 sc_consensus::JustificationImportResult::Success => {
745 log::debug!(
746 target: LOG_TARGET,
747 "Justification for block #{hash} ({number}) imported from {peer_id} successfully",
748 );
749 },
750 }
751 },
752 ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
753 ToServiceCommand::NewBestBlockImported(hash, number) => {
754 log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
755
756 self.strategy.update_chain_info(&hash, number);
757 let _ = self.notification_service.try_set_handshake(
758 BlockAnnouncesHandshake::<B>::build(
759 self.roles,
760 number,
761 hash,
762 self.genesis_hash,
763 )
764 .encode(),
765 );
766 },
767 ToServiceCommand::Status(tx) => {
768 let _ = tx.send(self.strategy.status());
769 },
770 ToServiceCommand::NumActivePeers(tx) => {
771 let _ = tx.send(self.num_active_peers());
772 },
773 ToServiceCommand::NumDownloadedBlocks(tx) => {
774 let _ = tx.send(self.strategy.num_downloaded_blocks());
775 },
776 ToServiceCommand::NumSyncRequests(tx) => {
777 let _ = tx.send(self.strategy.num_sync_requests());
778 },
779 ToServiceCommand::PeersInfo(tx) => {
780 let peers_info =
781 self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
782 let _ = tx.send(peers_info);
783 },
784 ToServiceCommand::SetNoSlotPeers(peers) => self.apply_no_slot_set(peers),
785 ToServiceCommand::OnBlockFinalized(hash, header) => {
786 self.strategy.on_block_finalized(&hash, *header.number())
787 },
788 }
789 }
790
791 fn process_notification_event(&mut self, event: NotificationEvent) {
792 match event {
793 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
794 let validation_result = self
795 .validate_connection(&peer, handshake, Direction::Inbound)
796 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
797
798 let _ = result_tx.send(validation_result);
799 },
800 NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
801 log::debug!(
802 target: LOG_TARGET,
803 "Substream opened for {peer}, handshake {handshake:?}"
804 );
805
806 match self.validate_connection(&peer, handshake, direction) {
807 Ok(handshake) => {
808 if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
809 log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
810 self.network_service
811 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
812 }
813 },
814 Err(wrong_genesis) => {
815 log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
816
817 if wrong_genesis {
818 self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
819 }
820
821 self.network_service
822 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
823 },
824 }
825 },
826 NotificationEvent::NotificationStreamClosed { peer } => {
827 self.on_sync_peer_disconnected(peer);
828 },
829 NotificationEvent::NotificationReceived { peer, notification } => {
830 if !self.peers.contains_key(&peer) {
831 log::error!(
832 target: LOG_TARGET,
833 "received notification from {peer} who had been earlier refused by `SyncingEngine`",
834 );
835 return;
836 }
837
838 let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
839 log::warn!(target: LOG_TARGET, "failed to decode block announce");
840 return;
841 };
842
843 self.push_block_announce_validation(peer, announce);
844 },
845 }
846 }
847
848 fn is_no_slot_peer(&self, peer_id: &PeerId) -> bool {
849 self.default_peers_set_no_slot_peers.contains(peer_id) ||
850 self.dynamic_no_slot_peers.contains(peer_id)
851 }
852
853 fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
857 let Some(info) = self.peers.remove(&peer_id) else {
858 log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
859 return;
860 };
861 if let Some(metrics) = &self.metrics {
862 metrics.peers.dec();
863 }
864 self.num_connected.fetch_sub(1, Ordering::AcqRel);
865
866 if self.important_peers.contains(&peer_id) {
867 log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
868 } else {
869 log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
870 }
871
872 if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
873 info.inbound &&
874 info.info.roles.is_full()
875 {
876 match self.num_in_peers.checked_sub(1) {
877 Some(value) => {
878 self.num_in_peers = value;
879 },
880 None => {
881 log::error!(
882 target: LOG_TARGET,
883 "trying to disconnect an inbound node which is not counted as inbound"
884 );
885 debug_assert!(false);
886 },
887 }
888 }
889
890 self.strategy.remove_peer(&peer_id);
891 self.pending_responses.remove_all(&peer_id);
892 self.event_streams
893 .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
894 }
895
896 fn validate_handshake(
898 &mut self,
899 peer_id: &PeerId,
900 handshake: Vec<u8>,
901 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
902 log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
903
904 let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
905 .map_err(|error| {
906 log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
907 false
908 })?;
909
910 if handshake.genesis_hash != self.genesis_hash {
911 if self.important_peers.contains(&peer_id) {
912 log::error!(
913 target: LOG_TARGET,
914 "Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
915 self.genesis_hash,
916 handshake.genesis_hash,
917 );
918 } else if self.boot_node_ids.contains(&peer_id) {
919 log::error!(
920 target: LOG_TARGET,
921 "Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
922 self.genesis_hash,
923 handshake.genesis_hash,
924 );
925 } else {
926 log::debug!(
927 target: LOG_TARGET,
928 "Peer is on different chain (our genesis: {} theirs: {})",
929 self.genesis_hash,
930 handshake.genesis_hash
931 );
932 }
933
934 return Err(true);
935 }
936
937 Ok(handshake)
938 }
939
940 fn validate_connection(
954 &mut self,
955 peer_id: &PeerId,
956 handshake: Vec<u8>,
957 direction: Direction,
958 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
959 log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
960
961 let handshake = self.validate_handshake(peer_id, handshake)?;
962
963 if self.peers.contains_key(&peer_id) {
964 log::error!(
965 target: LOG_TARGET,
966 "Called `validate_connection()` with already connected peer {peer_id}",
967 );
968 debug_assert!(false);
969 return Err(false);
970 }
971
972 let no_slot_peer = self.is_no_slot_peer(&peer_id);
973 let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
974
975 if handshake.roles.is_full() &&
976 self.strategy.num_peers() >=
977 self.default_peers_set_num_full +
978 self.default_peers_set_no_slot_connected_peers.len() +
979 this_peer_reserved_slot
980 {
981 log::debug!(
982 target: LOG_TARGET,
983 "Too many full nodes, rejecting {peer_id} (no_slot_peer={no_slot_peer}, num_peers={}, full_cap={}, no_slot_connected={}, this_reserved={})",
984 self.strategy.num_peers(),
985 self.default_peers_set_num_full,
986 self.default_peers_set_no_slot_connected_peers.len(),
987 this_peer_reserved_slot,
988 );
989 return Err(false);
990 }
991
992 if !no_slot_peer &&
994 handshake.roles.is_full() &&
995 direction.is_inbound() &&
996 self.num_in_peers >= self.max_in_peers
997 {
998 if self.num_in_peers > self.max_in_peers {
999 log::warn!(
1000 target: LOG_TARGET,
1001 "num_in_peers ({}) exceeds max_in_peers ({}), this is a slot accounting bug ",
1002 self.num_in_peers,
1003 self.max_in_peers,
1004 );
1005 debug_assert!(false);
1006 }
1007 log::debug!(
1008 target: LOG_TARGET,
1009 "All inbound slots have been consumed, rejecting {peer_id} (no_slot_peer={no_slot_peer}, num_in_peers={}, max_in_peers={})",
1010 self.num_in_peers,
1011 self.max_in_peers,
1012 );
1013 return Err(false);
1014 }
1015
1016 if handshake.roles.is_light() &&
1021 (self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
1022 {
1023 log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
1024 return Err(false);
1025 }
1026
1027 Ok(handshake)
1028 }
1029
1030 fn on_sync_peer_connected(
1036 &mut self,
1037 peer_id: PeerId,
1038 status: &BlockAnnouncesHandshake<B>,
1039 direction: Direction,
1040 ) -> Result<(), ()> {
1041 log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
1042
1043 let peer = Peer {
1044 info: ExtendedPeerInfo {
1045 roles: status.roles,
1046 best_hash: status.best_hash,
1047 best_number: status.best_number,
1048 },
1049 known_blocks: LruHashSet::new(
1050 NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
1051 ),
1052 inbound: direction.is_inbound(),
1053 };
1054
1055 if status.roles.is_full() {
1057 self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
1058 }
1059
1060 log::debug!(target: LOG_TARGET, "Connected {peer_id}");
1061
1062 if self.peers.insert(peer_id, peer).is_none() {
1063 if let Some(metrics) = &self.metrics {
1064 metrics.peers.inc();
1065 }
1066 self.num_connected.fetch_add(1, Ordering::AcqRel);
1067 }
1068 self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
1069
1070 if self.is_no_slot_peer(&peer_id) {
1071 self.default_peers_set_no_slot_connected_peers.insert(peer_id);
1072 } else if direction.is_inbound() && status.roles.is_full() {
1073 self.num_in_peers += 1;
1074 }
1075
1076 self.event_streams
1077 .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
1078
1079 Ok(())
1080 }
1081
1082 fn process_response_event(&mut self, response_event: ResponseEvent) {
1083 let ResponseEvent { peer_id, key, response: response_result } = response_event;
1084
1085 match response_result {
1086 Ok(Ok((response, protocol_name))) => {
1087 self.strategy.on_generic_response(&peer_id, key, protocol_name, response);
1088 },
1089 Ok(Err(e)) => {
1090 debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
1091
1092 match e {
1093 RequestFailure::Network(OutboundFailure::Timeout) => {
1094 self.network_service.report_peer(peer_id, rep::TIMEOUT);
1095 self.network_service
1096 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1097 },
1098 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1099 self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
1100 self.network_service
1101 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1102 },
1103 RequestFailure::Network(OutboundFailure::DialFailure) => {
1104 self.network_service
1105 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1106 },
1107 RequestFailure::Refused => {
1108 self.network_service.report_peer(peer_id, rep::REFUSED);
1109 self.network_service
1110 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1111 },
1112 RequestFailure::Network(OutboundFailure::ConnectionClosed) |
1113 RequestFailure::NotConnected => {
1114 self.network_service
1115 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1116 },
1117 RequestFailure::UnknownProtocol => {
1118 debug_assert!(false, "Block request protocol should always be known.");
1119 },
1120 RequestFailure::InvalidRequest => {
1121 debug_assert!(false, "Block request payload should always be valid.");
1122 },
1123 RequestFailure::Obsolete => {
1124 debug_assert!(
1125 false,
1126 "Can not receive `RequestFailure::Obsolete` after dropping the \
1127 response receiver.",
1128 );
1129 },
1130 RequestFailure::Network(OutboundFailure::Io(_)) => {
1131 self.network_service.report_peer(peer_id, rep::IO);
1132 self.network_service
1133 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1134 },
1135 }
1136 },
1137 Err(oneshot::Canceled) => {
1138 trace!(
1139 target: LOG_TARGET,
1140 "Request to peer {peer_id:?} failed due to oneshot being canceled.",
1141 );
1142 self.network_service
1143 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1144 },
1145 }
1146 }
1147
1148 fn num_active_peers(&self) -> usize {
1150 self.pending_responses.len()
1151 }
1152
1153 fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1155 protocol_id: ProtocolId,
1156 fork_id: Option<&str>,
1157 roles: Roles,
1158 best_number: NumberFor<B>,
1159 best_hash: B::Hash,
1160 genesis_hash: B::Hash,
1161 set_config: &SetConfig,
1162 metrics: NotificationMetrics,
1163 peer_store_handle: Arc<dyn PeerStoreProvider>,
1164 ) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1165 let block_announces_protocol = block_announces_protocol_name(genesis_hash, fork_id);
1166
1167 N::notification_config(
1168 block_announces_protocol.into(),
1169 iter::once(block_announces_legacy_protocol_name(&protocol_id).into()).collect(),
1170 MAX_BLOCK_ANNOUNCE_SIZE,
1171 Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1172 roles,
1173 best_number,
1174 best_hash,
1175 genesis_hash,
1176 ))),
1177 set_config.clone(),
1178 metrics,
1179 peer_store_handle,
1180 )
1181 }
1182
1183 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1185 if let Some(metrics) = &self.metrics {
1186 metrics.import_queue_blocks_submitted.inc();
1187 }
1188
1189 self.import_queue.import_blocks(origin, blocks);
1190 }
1191
1192 fn import_justifications(
1194 &mut self,
1195 peer_id: PeerId,
1196 hash: B::Hash,
1197 number: NumberFor<B>,
1198 justifications: Justifications,
1199 ) {
1200 if let Some(metrics) = &self.metrics {
1201 metrics.import_queue_justifications_submitted.inc();
1202 }
1203
1204 self.import_queue.import_justifications(peer_id, hash, number, justifications);
1205 }
1206}
1207
1208fn apply_no_slot_set_inner(
1219 peer_inbound_full: impl Fn(&PeerId) -> Option<bool>,
1220 static_no_slot: &HashSet<PeerId>,
1221 old_dynamic_no_slot: &HashSet<PeerId>,
1222 new_dynamic_no_slot: &HashSet<PeerId>,
1223 connected_no_slot: &mut HashSet<PeerId>,
1224 num_in_peers: &mut usize,
1225 max_in_peers: usize,
1226 network_service: &service::network::NetworkServiceHandle,
1227 protocol: &ProtocolName,
1228) {
1229 let slot_impact = |peer_id: &PeerId| -> Option<bool> {
1231 if static_no_slot.contains(peer_id) {
1232 return None;
1233 }
1234 peer_inbound_full(peer_id)
1235 };
1236
1237 let mut promoted = 0;
1238 let mut demoted = 0;
1239 let mut disconnected = 0;
1240
1241 for peer_id in new_dynamic_no_slot.difference(old_dynamic_no_slot) {
1242 let Some(affects_slots) = slot_impact(peer_id) else { continue };
1243 if !connected_no_slot.insert(*peer_id) {
1245 log::error!(
1246 target: LOG_TARGET,
1247 "{peer_id} promoted to no-slot but was already in connected_no_slot",
1248 );
1249 debug_assert!(false);
1250 continue;
1251 }
1252 if affects_slots {
1253 if let Some(n) = num_in_peers.checked_sub(1) {
1254 *num_in_peers = n;
1255 } else {
1256 log::error!(
1257 target: LOG_TARGET,
1258 "num_in_peers underflow promoting {peer_id} to no-slot",
1259 );
1260 debug_assert!(false);
1261 }
1262 promoted += 1;
1263 }
1264 }
1265
1266 for peer_id in old_dynamic_no_slot.difference(new_dynamic_no_slot) {
1267 let Some(affects_slots) = slot_impact(peer_id) else { continue };
1268 if !connected_no_slot.contains(peer_id) {
1269 continue;
1270 }
1271 if affects_slots && *num_in_peers >= max_in_peers {
1272 log::debug!(
1273 target: LOG_TARGET,
1274 "Demoting {peer_id} would exceed max_in_peers ({max_in_peers}); disconnecting",
1275 );
1276 network_service.disconnect_peer(*peer_id, protocol.clone());
1277 disconnected += 1;
1278 continue;
1279 }
1280 connected_no_slot.remove(peer_id);
1281 if affects_slots {
1282 *num_in_peers += 1;
1283 demoted += 1;
1284 }
1285 }
1286
1287 log::debug!(
1288 target: LOG_TARGET,
1289 "Dynamic no-slot peer set updated: {} peers: +{} in, -{} out, {} disconnected",
1290 new_dynamic_no_slot.len(),
1291 promoted,
1292 demoted,
1293 disconnected,
1294 );
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299 use super::*;
1300
1301 fn fresh_peers<const N: usize>() -> [PeerId; N] {
1302 std::array::from_fn(|_| PeerId::random())
1303 }
1304
1305 fn set_of<const N: usize>(peers: [PeerId; N]) -> HashSet<PeerId> {
1306 peers.into_iter().collect()
1307 }
1308
1309 #[track_caller]
1313 fn run_apply(
1314 connected: Vec<(PeerId, bool)>,
1315 static_no_slot: HashSet<PeerId>,
1316 old_dynamic: HashSet<PeerId>,
1317 new_dynamic: HashSet<PeerId>,
1318 initial_connected_no_slot: HashSet<PeerId>,
1319 initial_num_in_peers: usize,
1320 ) -> (HashSet<PeerId>, usize) {
1321 let (connected_no_slot, num_in, disconnects) = run_apply_with_cap(
1322 connected,
1323 static_no_slot,
1324 old_dynamic,
1325 new_dynamic,
1326 initial_connected_no_slot,
1327 initial_num_in_peers,
1328 usize::MAX,
1329 );
1330 assert!(disconnects.is_empty(), "unexpected disconnects: {disconnects:?}");
1331 (connected_no_slot, num_in)
1332 }
1333
1334 #[track_caller]
1338 fn run_apply_with_cap(
1339 connected: Vec<(PeerId, bool)>,
1340 static_no_slot: HashSet<PeerId>,
1341 old_dynamic: HashSet<PeerId>,
1342 new_dynamic: HashSet<PeerId>,
1343 initial_connected_no_slot: HashSet<PeerId>,
1344 initial_num_in_peers: usize,
1345 max_in_peers: usize,
1346 ) -> (HashSet<PeerId>, usize, Vec<PeerId>) {
1347 use crate::service::network::{NetworkServiceHandle, ToServiceCommand as NetCmd};
1348
1349 let peer_inbound_full: HashMap<PeerId, bool> = connected.into_iter().collect();
1350 let (tx, mut rx) = tracing_unbounded::<NetCmd>("test_apply_no_slot_set_disconnects", 100);
1351 let network_service = NetworkServiceHandle::new(tx);
1352 let protocol: ProtocolName = "/test/block-announces/1".into();
1353 let mut connected_no_slot = initial_connected_no_slot;
1354 let mut num_in_peers = initial_num_in_peers;
1355 apply_no_slot_set_inner(
1356 |peer_id| peer_inbound_full.get(peer_id).copied(),
1357 &static_no_slot,
1358 &old_dynamic,
1359 &new_dynamic,
1360 &mut connected_no_slot,
1361 &mut num_in_peers,
1362 max_in_peers,
1363 &network_service,
1364 &protocol,
1365 );
1366 drop(network_service);
1367
1368 let mut disconnects = Vec::new();
1369 while let Ok(cmd) = rx.try_recv() {
1370 if let NetCmd::DisconnectPeer(peer, _) = cmd {
1371 disconnects.push(peer);
1372 }
1373 }
1374 (connected_no_slot, num_in_peers, disconnects)
1375 }
1376
1377 #[test]
1378 fn apply_promotes_multiple_inbound_full_peers() {
1379 let [a, b, c, already] = fresh_peers();
1382 let (connected_no_slot, num_in) = run_apply(
1383 vec![(a, true), (b, true), (c, true), (already, true)],
1384 HashSet::new(),
1385 set_of([already]),
1386 set_of([a, b, c, already]),
1387 set_of([already]),
1388 10,
1389 );
1390 assert_eq!(connected_no_slot, set_of([a, b, c, already]));
1391 assert_eq!(num_in, 7);
1392 }
1393
1394 #[test]
1395 fn apply_demotes_multiple_inbound_full_peers() {
1396 let [a, b, c, stays] = fresh_peers();
1397 let (connected_no_slot, num_in) = run_apply(
1398 vec![(a, true), (b, true), (c, true), (stays, true)],
1399 HashSet::new(),
1400 set_of([a, b, c, stays]),
1401 set_of([stays]),
1402 set_of([a, b, c, stays]),
1403 2,
1404 );
1405 assert_eq!(connected_no_slot, set_of([stays]));
1406 assert_eq!(num_in, 5);
1407 }
1408
1409 #[test]
1410 fn apply_ignores_non_slot_consuming_peers() {
1411 let [outbound, light] = fresh_peers();
1414 let (connected_no_slot, num_in) = run_apply(
1415 vec![(outbound, false), (light, false)],
1416 HashSet::new(),
1417 HashSet::new(),
1418 set_of([outbound, light]),
1419 HashSet::new(),
1420 5,
1421 );
1422 assert_eq!(connected_no_slot, set_of([outbound, light]));
1423 assert_eq!(num_in, 5);
1424 }
1425
1426 #[test]
1427 fn apply_static_peers_stay_no_slot_when_removed_from_dynamic() {
1428 let [s1, s2, control] = fresh_peers();
1431 let (connected_no_slot, num_in) = run_apply(
1432 vec![(s1, true), (s2, true), (control, true)],
1433 set_of([s1, s2]),
1434 set_of([s1, s2, control]),
1435 HashSet::new(),
1436 set_of([s1, s2, control]),
1437 2,
1438 );
1439 assert_eq!(connected_no_slot, set_of([s1, s2]));
1440 assert_eq!(num_in, 3);
1441 }
1442
1443 #[test]
1444 fn apply_static_peers_added_to_dynamic_are_unchanged() {
1445 let [s1, s2] = fresh_peers();
1446 let (connected_no_slot, num_in) = run_apply(
1447 vec![(s1, true), (s2, true)],
1448 set_of([s1, s2]),
1449 HashSet::new(),
1450 set_of([s1, s2]),
1451 set_of([s1, s2]),
1452 4,
1453 );
1454 assert_eq!(connected_no_slot, set_of([s1, s2]));
1455 assert_eq!(num_in, 4);
1456 }
1457
1458 #[test]
1459 fn apply_unconnected_peers_in_new_set_are_ignored() {
1460 let [connected_a, connected_b] = fresh_peers();
1463 let [unconnected_a, unconnected_b] = fresh_peers();
1464 let (connected_no_slot, num_in) = run_apply(
1465 vec![(connected_a, true), (connected_b, true)],
1466 HashSet::new(),
1467 HashSet::new(),
1468 set_of([unconnected_a, unconnected_b]),
1469 HashSet::new(),
1470 3,
1471 );
1472 assert!(connected_no_slot.is_empty());
1473 assert_eq!(num_in, 3);
1474 }
1475
1476 #[test]
1477 fn apply_idempotent_same_set() {
1478 let [in_full, out_full, light] = fresh_peers();
1479 let target = set_of([in_full, out_full, light]);
1480 let (connected_no_slot, num_in) = run_apply(
1481 vec![(in_full, true), (out_full, false), (light, false)],
1482 HashSet::new(),
1483 target.clone(),
1484 target.clone(),
1485 target.clone(),
1486 2,
1487 );
1488 assert_eq!(connected_no_slot, target);
1489 assert_eq!(num_in, 2);
1490 }
1491
1492 #[test]
1493 fn apply_empty_set_clears_dynamic_only_peers() {
1494 let [in1, in2, out, light] = fresh_peers();
1495 let [static_peer] = fresh_peers();
1496 let old = set_of([in1, in2, out, light, static_peer]);
1497 let (connected_no_slot, num_in) = run_apply(
1498 vec![(in1, true), (in2, true), (out, false), (light, false), (static_peer, true)],
1499 set_of([static_peer]),
1500 old.clone(),
1501 HashSet::new(),
1502 old,
1503 0,
1504 );
1505 assert_eq!(connected_no_slot, set_of([static_peer]));
1506 assert_eq!(num_in, 2);
1507 }
1508
1509 #[test]
1510 fn apply_mixed_promote_and_demote() {
1511 let [p1, p2] = fresh_peers();
1512 let [d1, d2] = fresh_peers();
1513 let (connected_no_slot, num_in) = run_apply(
1514 vec![(p1, true), (p2, true), (d1, true), (d2, true)],
1515 HashSet::new(),
1516 set_of([d1, d2]),
1517 set_of([p1, p2]),
1518 set_of([d1, d2]),
1519 5,
1520 );
1521 assert_eq!(connected_no_slot, set_of([p1, p2]));
1522 assert_eq!(num_in, 5);
1523 }
1524
1525 #[test]
1526 fn apply_demote_at_capacity_disconnects_peer() {
1527 let [px] = fresh_peers();
1533 let (connected_no_slot, num_in, disconnects) = run_apply_with_cap(
1534 vec![(px, true)],
1535 HashSet::new(),
1536 set_of([px]),
1537 HashSet::new(),
1538 set_of([px]),
1539 8,
1540 8,
1541 );
1542 assert_eq!(connected_no_slot, set_of([px]));
1543 assert_eq!(num_in, 8);
1544 assert_eq!(disconnects, vec![px]);
1545 }
1546
1547 #[test]
1548 fn apply_demote_below_capacity_increments_normally() {
1549 let [px] = fresh_peers();
1552 let (connected_no_slot, num_in, disconnects) = run_apply_with_cap(
1553 vec![(px, true)],
1554 HashSet::new(),
1555 set_of([px]),
1556 HashSet::new(),
1557 set_of([px]),
1558 7,
1559 8,
1560 );
1561 assert!(connected_no_slot.is_empty());
1562 assert_eq!(num_in, 8);
1563 assert!(disconnects.is_empty());
1564 }
1565}