1use crate::{
44 log,
45 platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49 borrow::ToOwned as _,
50 boxed::Box,
51 collections::BTreeMap,
52 format,
53 string::{String, ToString as _},
54 sync::Arc,
55 vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, num::NonZeroUsize, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65 header,
66 informant::{BytesDisplay, HashDisplay},
67 libp2p::{
68 connection,
69 multiaddr::{self, Multiaddr},
70 peer_id,
71 },
72 network::{basic_peering_strategy, bitswap_peering_strategy, codec, service},
73};
74
75pub use codec::{AffinityFilter, CallProofRequestConfig, Role};
76use service::SendTopicAffinityError;
77pub use service::{
78 ChainId, EncodedMerkleProof, PeerId, QueueNotificationError, SendBitswapMessageError,
79};
80
81#[derive(Debug, Clone)]
83pub struct StatementProtocolConfig {
84 max_seen_statements: NonZeroUsize,
86 false_positive_rate: f64,
87 bloom_seed: u128,
88 affinity_update_interval: Duration,
89}
90
91impl StatementProtocolConfig {
92 pub fn new(
93 max_seen_statements: NonZeroUsize,
94 false_positive_rate: f64,
95 bloom_seed: u128,
96 affinity_update_interval: Duration,
97 ) -> Self {
98 assert!(
99 false_positive_rate.is_finite()
100 && false_positive_rate > 0.0
101 && false_positive_rate < 1.0
102 );
103 assert!(!affinity_update_interval.is_zero());
104 StatementProtocolConfig {
105 max_seen_statements,
106 false_positive_rate,
107 bloom_seed,
108 affinity_update_interval,
109 }
110 }
111
112 pub fn max_seen_statements(&self) -> NonZeroUsize {
113 self.max_seen_statements
114 }
115
116 pub fn false_positive_rate(&self) -> f64 {
117 self.false_positive_rate
118 }
119
120 pub fn bloom_seed(&self) -> u128 {
121 self.bloom_seed
122 }
123
124 pub fn affinity_update_interval(&self) -> Duration {
125 self.affinity_update_interval
126 }
127}
128
129mod tasks;
130
131pub struct Config<TPlat> {
133 pub platform: TPlat,
135
136 pub identify_agent_version: String,
138
139 pub chains_capacity: usize,
141
142 pub connections_open_pool_size: u32,
146
147 pub connections_open_pool_restore_delay: Duration,
152}
153
154pub struct ConfigChain {
160 pub log_name: String,
162
163 pub num_out_slots: usize,
166
167 pub genesis_block_hash: [u8; 32],
173
174 pub best_block: (u64, [u8; 32]),
177
178 pub fork_id: Option<String>,
181
182 pub block_number_bytes: usize,
184
185 pub grandpa_protocol_finalized_block_height: Option<u64>,
188
189 pub statement_protocol_config: Option<StatementProtocolConfig>,
191}
192
193pub struct NetworkService<TPlat: PlatformRef> {
194 messages_tx: async_channel::Sender<ToBackground<TPlat>>,
196
197 platform: TPlat,
199}
200
201impl<TPlat: PlatformRef> NetworkService<TPlat> {
202 pub fn new(config: Config<TPlat>) -> Arc<Self> {
204 let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
205
206 let network = service::ChainNetwork::new(service::Config {
207 chains_capacity: config.chains_capacity,
208 connections_capacity: 32,
209 handshake_timeout: Duration::from_secs(4),
211 randomness_seed: {
212 let mut seed = [0; 32];
213 config.platform.fill_random_bytes(&mut seed);
214 seed
215 },
216 });
217
218 let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
220 let task = Box::pin(background_task(BackgroundTask {
221 randomness: rand_chacha::ChaCha20Rng::from_seed({
222 let mut seed = [0; 32];
223 config.platform.fill_random_bytes(&mut seed);
224 seed
225 }),
226 identify_agent_version: config.identify_agent_version,
227 tasks_messages_tx,
228 tasks_messages_rx: Box::pin(tasks_messages_rx),
229 peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
230 basic_peering_strategy::Config {
231 randomness_seed: {
232 let mut seed = [0; 32];
233 config.platform.fill_random_bytes(&mut seed);
234 seed
235 },
236 peers_capacity: 50, chains_capacity: config.chains_capacity,
238 },
239 ),
240 bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy::new(
241 bitswap_peering_strategy::Config {
242 randomness_seed: {
243 let mut seed = [0; 32];
244 config.platform.fill_random_bytes(&mut seed);
245 seed
246 },
247 peers_capacity: 50, },
249 ),
250 network,
251 connections_open_pool_size: config.connections_open_pool_size,
252 connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
253 num_recent_connection_opening: 0,
254 next_recent_connection_restore: None,
255 platform: config.platform.clone(),
256 open_gossip_links: BTreeMap::new(),
257 chains_ever_gossip_connected: HashSet::with_capacity_and_hasher(4, Default::default()),
258 v2_statement_peers: HashMap::with_capacity_and_hasher(4, Default::default()),
259 current_affinity_filter: HashMap::with_capacity_and_hasher(4, Default::default()),
260 event_pending_send: None,
261 event_senders: either::Left(Vec::new()),
262 pending_new_subscriptions: Vec::new(),
263 bitswap_event_pending_send: None,
264 bitswap_connected_peers: 0,
265 bitswap_event_senders: either::Left(Vec::new()),
266 pending_new_bitswap_subscriptions: Vec::new(),
267 important_nodes: HashMap::with_capacity_and_hasher(16, Default::default()),
268 main_messages_rx: Box::pin(main_messages_rx),
269 messages_rx: stream::SelectAll::new(),
270 blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
271 grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
272 storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
273 call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
274 child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
275 chains_by_next_discovery: BTreeMap::new(),
276 }));
277
278 config.platform.spawn_task("network-service".into(), {
279 let platform = config.platform.clone();
280 async move {
281 task.await;
282 log!(&platform, Debug, "network", "shutdown");
283 }
284 });
285
286 Arc::new(NetworkService {
287 messages_tx: main_messages_tx,
288 platform: config.platform,
289 })
290 }
291
292 pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
298 let (messages_tx, messages_rx) = async_channel::bounded(32);
299
300 self.platform.spawn_task("add-chain-message-send".into(), {
302 let config = service::ChainConfig {
303 grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
304 |commit_finalized_height| service::GrandpaState {
305 commit_finalized_height,
306 round_number: 1,
307 set_id: 0,
308 },
309 ),
310 enable_statement_protocol: config.statement_protocol_config.is_some(),
311 fork_id: config.fork_id.clone(),
312 block_number_bytes: config.block_number_bytes,
313 best_hash: config.best_block.1,
314 best_number: config.best_block.0,
315 genesis_hash: config.genesis_block_hash,
316 role: Role::Light,
317 allow_inbound_block_requests: false,
318 user_data: Chain {
319 log_name: config.log_name,
320 block_number_bytes: config.block_number_bytes,
321 num_out_slots: config.num_out_slots,
322 num_references: NonZero::<usize>::new(1).unwrap(),
323 next_discovery_period: Duration::from_secs(2),
324 next_discovery_when: self.platform.now(),
325 },
326 };
327
328 let messages_tx = self.messages_tx.clone();
329 async move {
330 let _ = messages_tx
331 .send(ToBackground::AddChain {
332 messages_rx,
333 config,
334 })
335 .await;
336 }
337 });
338
339 Arc::new(NetworkServiceChain {
340 _keep_alive_messages_tx: self.messages_tx.clone(),
341 messages_tx,
342 marker: core::marker::PhantomData,
343 })
344 }
345}
346
347pub struct NetworkServiceChain<TPlat: PlatformRef> {
348 _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
351
352 messages_tx: async_channel::Sender<ToBackgroundChain>,
354
355 marker: core::marker::PhantomData<TPlat>,
357}
358
359#[derive(Debug, Copy, Clone, PartialEq, Eq)]
361pub enum BanSeverity {
362 Low,
363 High,
364}
365
366impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
367 pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
384 let (tx, rx) = async_channel::bounded(128);
385
386 self.messages_tx
387 .send(ToBackgroundChain::Subscribe { sender: tx })
388 .await
389 .unwrap();
390
391 rx
392 }
393
394 pub async fn subscribe_bitswap(&self) -> async_channel::Receiver<BitswapEvent> {
409 let (tx, rx) = async_channel::bounded(128);
410
411 self.messages_tx
412 .send(ToBackgroundChain::SubscribeBitswap { sender: tx })
413 .await
414 .unwrap();
415
416 rx
417 }
418
419 pub async fn ban_and_disconnect(
431 &self,
432 peer_id: PeerId,
433 severity: BanSeverity,
434 reason: &'static str,
435 ) {
436 let _ = self
437 .messages_tx
438 .send(ToBackgroundChain::DisconnectAndBan {
439 peer_id,
440 severity,
441 reason,
442 })
443 .await;
444 }
445
446 pub async fn blocks_request(
449 self: Arc<Self>,
450 target: PeerId,
451 config: codec::BlocksRequestConfig,
452 timeout: Duration,
453 ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
454 let (tx, rx) = oneshot::channel();
455
456 self.messages_tx
457 .send(ToBackgroundChain::StartBlocksRequest {
458 target: target.clone(),
459 config,
460 timeout,
461 result: tx,
462 })
463 .await
464 .unwrap();
465
466 rx.await.unwrap()
467 }
468
469 pub async fn grandpa_warp_sync_request(
472 self: Arc<Self>,
473 target: PeerId,
474 begin_hash: [u8; 32],
475 timeout: Duration,
476 ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
477 let (tx, rx) = oneshot::channel();
478
479 self.messages_tx
480 .send(ToBackgroundChain::StartWarpSyncRequest {
481 target: target.clone(),
482 begin_hash,
483 timeout,
484 result: tx,
485 })
486 .await
487 .unwrap();
488
489 rx.await.unwrap()
490 }
491
492 pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
493 self.messages_tx
494 .send(ToBackgroundChain::SetLocalBestBlock {
495 best_hash,
496 best_number,
497 })
498 .await
499 .unwrap();
500 }
501
502 pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
503 self.messages_tx
504 .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
505 .await
506 .unwrap();
507 }
508
509 pub async fn storage_proof_request(
512 self: Arc<Self>,
513 target: PeerId, config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
515 timeout: Duration,
516 ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
517 let (tx, rx) = oneshot::channel();
518
519 self.messages_tx
520 .send(ToBackgroundChain::StartStorageProofRequest {
521 target: target.clone(),
522 config: codec::StorageProofRequestConfig {
523 block_hash: config.block_hash,
524 keys: config
525 .keys
526 .map(|key| key.as_ref().to_vec()) .collect::<Vec<_>>()
528 .into_iter(),
529 },
530 timeout,
531 result: tx,
532 })
533 .await
534 .unwrap();
535
536 rx.await.unwrap()
537 }
538
539 pub async fn call_proof_request(
544 self: Arc<Self>,
545 target: PeerId, config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
547 timeout: Duration,
548 ) -> Result<EncodedMerkleProof, CallProofRequestError> {
549 let (tx, rx) = oneshot::channel();
550
551 self.messages_tx
552 .send(ToBackgroundChain::StartCallProofRequest {
553 target: target.clone(),
554 config: codec::CallProofRequestConfig {
555 block_hash: config.block_hash,
556 method: config.method.into_owned().into(),
557 parameter_vectored: config
558 .parameter_vectored
559 .map(|v| v.as_ref().to_vec()) .collect::<Vec<_>>()
561 .into_iter(),
562 },
563 timeout,
564 result: tx,
565 })
566 .await
567 .unwrap();
568
569 rx.await.unwrap()
570 }
571
572 pub async fn child_storage_proof_request(
574 self: Arc<Self>,
575 target: PeerId,
576 config: codec::ChildStorageProofRequestConfig<
577 impl AsRef<[u8]> + Clone,
578 impl Iterator<Item = impl AsRef<[u8]> + Clone>,
579 >,
580 timeout: Duration,
581 ) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
582 let (tx, rx) = oneshot::channel();
583
584 self.messages_tx
585 .send(ToBackgroundChain::StartChildStorageProofRequest {
586 target: target.clone(),
587 config: ChildStorageProofRequestConfigOwned {
588 block_hash: config.block_hash,
589 child_trie: config.child_trie.as_ref().to_vec(),
590 keys: config
591 .keys
592 .map(|key| key.as_ref().to_vec())
593 .collect::<Vec<_>>(),
594 },
595 timeout,
596 result: tx,
597 })
598 .await
599 .unwrap();
600
601 rx.await.unwrap()
602 }
603
604 pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
614 let (tx, rx) = oneshot::channel();
615
616 self.messages_tx
617 .send(ToBackgroundChain::AnnounceTransaction {
618 transaction: transaction.to_vec(), result: tx,
620 })
621 .await
622 .unwrap();
623
624 rx.await.unwrap()
625 }
626
627 pub async fn send_block_announce(
629 self: Arc<Self>,
630 target: &PeerId,
631 scale_encoded_header: &[u8],
632 is_best: bool,
633 ) -> Result<(), QueueNotificationError> {
634 let (tx, rx) = oneshot::channel();
635
636 self.messages_tx
637 .send(ToBackgroundChain::SendBlockAnnounce {
638 target: target.clone(), scale_encoded_header: scale_encoded_header.to_vec(), is_best,
641 result: tx,
642 })
643 .await
644 .unwrap();
645
646 rx.await.unwrap()
647 }
648
649 pub async fn send_bitswap_message(
651 &self,
652 target: PeerId,
653 message: Vec<u8>,
654 ) -> Result<(), SendBitswapMessageError> {
655 let (tx, rx) = oneshot::channel();
656
657 self.messages_tx
658 .send(ToBackgroundChain::SendBitswapMessage {
659 target,
660 message,
661 result: tx,
662 })
663 .await
664 .unwrap();
665
666 rx.await.unwrap()
667 }
668
669 pub async fn broadcast_bitswap_message(
677 &self,
678 message: Vec<u8>,
679 ) -> Result<Vec<PeerId>, SendBitswapMessageError> {
680 let (tx, rx) = oneshot::channel();
681
682 self.messages_tx
683 .send(ToBackgroundChain::BroadcastBitswapMessage {
684 message,
685 result: tx,
686 })
687 .await
688 .unwrap();
689
690 rx.await.unwrap()
691 }
692
693 pub async fn broadcast_statement(
695 self: Arc<Self>,
696 statement: Vec<u8>,
697 ) -> BroadcastStatementResult {
698 let (tx, rx) = oneshot::channel();
699
700 self.messages_tx
701 .send(ToBackgroundChain::BroadcastStatement {
702 statement,
703 result: tx,
704 })
705 .await
706 .unwrap();
707
708 rx.await.unwrap()
709 }
710
711 pub async fn update_topic_affinity(&self, filter: AffinityFilter) {
712 self.messages_tx
713 .send(ToBackgroundChain::UpdateTopicAffinity { filter })
714 .await
715 .unwrap();
716 }
717
718 pub async fn discover(
724 &self,
725 list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
726 important_nodes: bool,
727 ) {
728 self.messages_tx
729 .send(ToBackgroundChain::Discover {
730 list: list
732 .into_iter()
733 .map(|(peer_id, addrs)| {
734 (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
735 })
736 .collect::<Vec<_>>()
737 .into_iter(),
738 important_nodes,
739 })
740 .await
741 .unwrap();
742 }
743
744 pub async fn discovered_nodes(
751 &self,
752 ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
753 let (tx, rx) = oneshot::channel();
754
755 self.messages_tx
756 .send(ToBackgroundChain::DiscoveredNodes { result: tx })
757 .await
758 .unwrap();
759
760 rx.await
761 .unwrap()
762 .into_iter()
763 .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
764 }
765
766 pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
769 let (tx, rx) = oneshot::channel();
770 self.messages_tx
771 .send(ToBackgroundChain::PeersList { result: tx })
772 .await
773 .unwrap();
774 rx.await.unwrap().into_iter()
775 }
776}
777
778#[derive(Debug, Clone)]
779pub struct BroadcastStatementResult {
780 pub sent: usize,
781 pub total: usize,
782}
783
784#[derive(Debug, Clone)]
786pub enum Event {
787 Connected {
788 peer_id: PeerId,
789 role: Role,
790 best_block_number: u64,
791 best_block_hash: [u8; 32],
792 },
793 Disconnected {
794 peer_id: PeerId,
795 },
796 BlockAnnounce {
797 peer_id: PeerId,
798 announce: service::EncodedBlockAnnounce,
799 },
800 GrandpaNeighborPacket {
801 peer_id: PeerId,
802 finalized_block_height: u64,
803 },
804 GrandpaCommitMessage {
806 peer_id: PeerId,
807 message: service::EncodedGrandpaCommitMessage,
808 },
809 StatementsNotification {
811 peer_id: PeerId,
812 statements: Vec<([u8; 32], codec::Statement)>,
813 },
814}
815
816#[derive(Debug, Clone)]
820pub enum BitswapEvent {
821 BitswapMessage {
822 peer_id: PeerId,
823 message: service::EncodedBitswapMessage,
824 },
825}
826
827#[derive(Debug, derive_more::Display, derive_more::Error)]
829pub enum BlocksRequestError {
830 NoConnection,
832 #[display("{_0}")]
834 Request(service::BlocksRequestError),
835}
836
837#[derive(Debug, derive_more::Display, derive_more::Error)]
839pub enum WarpSyncRequestError {
840 NoConnection,
842 #[display("{_0}")]
844 Request(service::GrandpaWarpSyncRequestError),
845}
846
847#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
849pub enum StorageProofRequestError {
850 NoConnection,
852 RequestTooLarge,
854 #[display("{_0}")]
856 Request(service::StorageProofRequestError),
857}
858
859#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
861pub enum CallProofRequestError {
862 NoConnection,
864 RequestTooLarge,
866 #[display("{_0}")]
868 Request(service::CallProofRequestError),
869}
870
871impl CallProofRequestError {
872 pub fn is_network_problem(&self) -> bool {
875 match self {
876 CallProofRequestError::Request(err) => err.is_network_problem(),
877 CallProofRequestError::RequestTooLarge => false,
878 CallProofRequestError::NoConnection => true,
879 }
880 }
881}
882
883#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
885pub enum ChildStorageProofRequestError {
886 NoConnection,
888 RequestTooLarge,
890 #[display("{_0}")]
892 Request(service::StorageProofRequestError),
893}
894
895impl ChildStorageProofRequestError {
896 pub fn is_network_problem(&self) -> bool {
899 match self {
900 ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
901 ChildStorageProofRequestError::RequestTooLarge => false,
902 ChildStorageProofRequestError::NoConnection => true,
903 }
904 }
905}
906
907struct ChildStorageProofRequestConfigOwned {
909 block_hash: [u8; 32],
910 child_trie: Vec<u8>,
911 keys: Vec<Vec<u8>>,
912}
913
914enum ToBackground<TPlat: PlatformRef> {
915 AddChain {
916 messages_rx: async_channel::Receiver<ToBackgroundChain>,
917 config: service::ChainConfig<Chain<TPlat>>,
918 },
919}
920
921enum ToBackgroundChain {
922 RemoveChain,
923 Subscribe {
924 sender: async_channel::Sender<Event>,
925 },
926 SubscribeBitswap {
927 sender: async_channel::Sender<BitswapEvent>,
928 },
929 DisconnectAndBan {
930 peer_id: PeerId,
931 severity: BanSeverity,
932 reason: &'static str,
933 },
934 StartBlocksRequest {
936 target: PeerId, config: codec::BlocksRequestConfig,
938 timeout: Duration,
939 result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
940 },
941 StartWarpSyncRequest {
943 target: PeerId,
944 begin_hash: [u8; 32],
945 timeout: Duration,
946 result:
947 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
948 },
949 StartStorageProofRequest {
951 target: PeerId,
952 config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
953 timeout: Duration,
954 result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
955 },
956 StartCallProofRequest {
958 target: PeerId, config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
960 timeout: Duration,
961 result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
962 },
963 StartChildStorageProofRequest {
965 target: PeerId,
966 config: ChildStorageProofRequestConfigOwned,
967 timeout: Duration,
968 result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
969 },
970 SetLocalBestBlock {
971 best_hash: [u8; 32],
972 best_number: u64,
973 },
974 SetLocalGrandpaState {
975 grandpa_state: service::GrandpaState,
976 },
977 AnnounceTransaction {
978 transaction: Vec<u8>,
979 result: oneshot::Sender<Vec<PeerId>>,
980 },
981 SendBlockAnnounce {
982 target: PeerId,
983 scale_encoded_header: Vec<u8>,
984 is_best: bool,
985 result: oneshot::Sender<Result<(), QueueNotificationError>>,
986 },
987 SendBitswapMessage {
988 target: PeerId,
989 message: Vec<u8>,
990 result: oneshot::Sender<Result<(), SendBitswapMessageError>>,
991 },
992 BroadcastBitswapMessage {
993 message: Vec<u8>,
994 result: oneshot::Sender<Result<Vec<PeerId>, SendBitswapMessageError>>,
995 },
996 BroadcastStatement {
997 statement: Vec<u8>,
998 result: oneshot::Sender<BroadcastStatementResult>,
999 },
1000 UpdateTopicAffinity {
1001 filter: AffinityFilter,
1002 },
1003 Discover {
1004 list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
1005 important_nodes: bool,
1006 },
1007 DiscoveredNodes {
1008 result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
1009 },
1010 PeersList {
1011 result: oneshot::Sender<Vec<PeerId>>,
1012 },
1013}
1014
1015struct BackgroundTask<TPlat: PlatformRef> {
1016 platform: TPlat,
1018
1019 randomness: rand_chacha::ChaCha20Rng,
1021
1022 identify_agent_version: String,
1024
1025 tasks_messages_tx:
1027 async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
1028
1029 tasks_messages_rx: Pin<
1031 Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
1032 >,
1033
1034 network: service::ChainNetwork<
1036 Chain<TPlat>,
1037 async_channel::Sender<service::CoordinatorToConnection>,
1038 TPlat::Instant,
1039 >,
1040
1041 peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
1043
1044 bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy<TPlat::Instant>,
1046
1047 connections_open_pool_size: u32,
1049
1050 connections_open_pool_restore_delay: Duration,
1052
1053 num_recent_connection_opening: u32,
1057
1058 next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
1060
1061 open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
1064
1065 chains_ever_gossip_connected: HashSet<ChainId, fnv::FnvBuildHasher>,
1068
1069 v2_statement_peers: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1071
1072 current_affinity_filter: HashMap<ChainId, AffinityFilter, fnv::FnvBuildHasher>,
1074
1075 important_nodes: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1079
1080 event_pending_send: Option<(ChainId, Event)>,
1082
1083 bitswap_event_pending_send: Option<BitswapEvent>,
1085
1086 bitswap_connected_peers: usize,
1091
1092 event_senders: either::Either<
1098 Vec<(ChainId, async_channel::Sender<Event>)>,
1099 Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
1100 >,
1101
1102 pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
1105
1106 bitswap_event_senders: either::Either<
1116 Vec<async_channel::Sender<BitswapEvent>>,
1117 Pin<Box<dyn Future<Output = Vec<async_channel::Sender<BitswapEvent>>> + Send>>,
1118 >,
1119
1120 pending_new_bitswap_subscriptions: Vec<async_channel::Sender<BitswapEvent>>,
1124
1125 main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
1126
1127 messages_rx:
1128 stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
1129
1130 blocks_requests: HashMap<
1131 service::SubstreamId,
1132 oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
1133 fnv::FnvBuildHasher,
1134 >,
1135
1136 grandpa_warp_sync_requests: HashMap<
1137 service::SubstreamId,
1138 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
1139 fnv::FnvBuildHasher,
1140 >,
1141
1142 storage_proof_requests: HashMap<
1143 service::SubstreamId,
1144 oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
1145 fnv::FnvBuildHasher,
1146 >,
1147
1148 call_proof_requests: HashMap<
1149 service::SubstreamId,
1150 oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
1151 fnv::FnvBuildHasher,
1152 >,
1153
1154 child_storage_proof_requests: HashMap<
1155 service::SubstreamId,
1156 oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
1157 fnv::FnvBuildHasher,
1158 >,
1159
1160 chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
1162}
1163
1164struct Chain<TPlat: PlatformRef> {
1165 log_name: String,
1166
1167 num_references: NonZero<usize>,
1169
1170 block_number_bytes: usize,
1173
1174 num_out_slots: usize,
1176
1177 next_discovery_when: TPlat::Instant,
1179
1180 next_discovery_period: Duration,
1183}
1184
1185#[derive(Clone)]
1186struct OpenGossipLinkState {
1187 role: Role,
1188 best_block_number: u64,
1189 best_block_hash: [u8; 32],
1190 finalized_block_height: Option<u64>,
1192}
1193
1194async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
1195 loop {
1196 futures_lite::future::yield_now().await;
1198
1199 enum WakeUpReason<TPlat: PlatformRef> {
1200 ForegroundClosed,
1201 Message(ToBackground<TPlat>),
1202 MessageForChain(ChainId, ToBackgroundChain),
1203 NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
1204 CanAssignSlot(PeerId, ChainId),
1205 CanAssignBitswapSlot(PeerId),
1206 NextRecentConnectionRestore,
1207 CanStartConnect(PeerId),
1208 CanOpenGossip(PeerId, ChainId),
1209 CanOpenBitswap(PeerId),
1210 MessageFromConnection {
1211 connection_id: service::ConnectionId,
1212 message: service::ConnectionToCoordinator,
1213 },
1214 MessageToConnection {
1215 connection_id: service::ConnectionId,
1216 message: service::CoordinatorToConnection,
1217 },
1218 EventSendersReady,
1219 BitswapEventSendersReady,
1220 StartDiscovery(ChainId),
1221 }
1222
1223 let wake_up_reason = {
1224 let message_received = async {
1225 task.main_messages_rx
1226 .next()
1227 .await
1228 .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
1229 };
1230 let message_for_chain_received = async {
1231 let Some((chain_id, message)) = task.messages_rx.next().await else {
1236 future::pending().await
1237 };
1238 WakeUpReason::MessageForChain(chain_id, message)
1239 };
1240 let message_from_task_received = async {
1241 let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
1242 WakeUpReason::MessageFromConnection {
1243 connection_id,
1244 message,
1245 }
1246 };
1247 let service_event = async {
1248 if let Some(event) = (task.event_pending_send.is_none()
1249 && task.bitswap_event_pending_send.is_none()
1250 && task.pending_new_subscriptions.is_empty()
1251 && task.pending_new_bitswap_subscriptions.is_empty())
1252 .then(|| task.network.next_event())
1253 .flatten()
1254 {
1255 WakeUpReason::NetworkEvent(event)
1256 } else if let Some(start_connect) = {
1257 let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
1258 .then(|| {
1259 task.network
1260 .unconnected_desired()
1261 .choose(&mut task.randomness)
1262 .cloned()
1263 })
1264 .flatten();
1265 x
1266 } {
1267 WakeUpReason::CanStartConnect(start_connect)
1268 } else if let Some((peer_id, chain_id)) = {
1269 let x = task
1270 .network
1271 .connected_unopened_gossip_desired()
1272 .choose(&mut task.randomness)
1273 .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
1274 x
1275 } {
1276 WakeUpReason::CanOpenGossip(peer_id, chain_id)
1277 } else if let Some(peer_id) = {
1278 let x = task
1279 .network
1280 .connected_unopened_bitswap_desired()
1281 .choose(&mut task.randomness)
1282 .cloned();
1283 x
1284 } {
1285 WakeUpReason::CanOpenBitswap(peer_id)
1286 } else if let Some((connection_id, message)) =
1287 task.network.pull_message_to_connection()
1288 {
1289 WakeUpReason::MessageToConnection {
1290 connection_id,
1291 message,
1292 }
1293 } else {
1294 'search: loop {
1295 let mut earlier_unban = None;
1296
1297 for chain_id in task.network.chains().collect::<Vec<_>>() {
1298 if task.network.gossip_desired_num(
1299 chain_id,
1300 service::GossipKind::ConsensusTransactions,
1301 ) >= task.network[chain_id].num_out_slots
1302 {
1303 continue;
1304 }
1305
1306 let now = task.platform.now();
1307
1308 if !task.chains_ever_gossip_connected.contains(&chain_id) {
1311 if let basic_peering_strategy::AssignablePeer::Assignable(peer_id) =
1312 task.peering_strategy.pick_assignable_peer_filtered(
1313 &chain_id,
1314 &now,
1315 |peer_id| {
1316 task.important_nodes
1317 .get(&chain_id)
1318 .map_or(false, |nodes| nodes.contains(peer_id))
1319 },
1320 )
1321 {
1322 break 'search WakeUpReason::CanAssignSlot(
1323 peer_id.clone(),
1324 chain_id,
1325 );
1326 }
1327 }
1328
1329 match task.peering_strategy.pick_assignable_peer(&chain_id, &now) {
1330 basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1331 break 'search WakeUpReason::CanAssignSlot(
1332 peer_id.clone(),
1333 chain_id,
1334 );
1335 }
1336 basic_peering_strategy::AssignablePeer::AllPeersBanned {
1337 next_unban,
1338 } => {
1339 if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1340 earlier_unban = Some(next_unban.clone());
1341 }
1342 }
1343 basic_peering_strategy::AssignablePeer::NoPeer => continue,
1344 }
1345 }
1346
1347 match task
1348 .bitswap_peering_strategy
1349 .pick_assignable_peer(&task.platform.now())
1350 {
1351 bitswap_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1352 break 'search WakeUpReason::CanAssignBitswapSlot(peer_id.clone());
1353 }
1354 bitswap_peering_strategy::AssignablePeer::AllPeersBanned {
1355 next_unban,
1356 } => {
1357 if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1358 earlier_unban = Some(next_unban.clone());
1359 }
1360 }
1361 bitswap_peering_strategy::AssignablePeer::NoPeer => {}
1362 }
1363
1364 if let Some(earlier_unban) = earlier_unban {
1365 task.platform.sleep_until(earlier_unban).await;
1366 } else {
1367 future::pending::<()>().await;
1368 }
1369 }
1370 }
1371 };
1372 let next_recent_connection_restore = async {
1373 if task.num_recent_connection_opening != 0
1374 && task.next_recent_connection_restore.is_none()
1375 {
1376 task.next_recent_connection_restore = Some(Box::pin(
1377 task.platform
1378 .sleep(task.connections_open_pool_restore_delay),
1379 ));
1380 }
1381 if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1382 delay.await;
1383 task.next_recent_connection_restore = None;
1384 WakeUpReason::NextRecentConnectionRestore
1385 } else {
1386 future::pending().await
1387 }
1388 };
1389 let finished_sending_event = async {
1390 if let either::Right(event_sending_future) = &mut task.event_senders {
1391 let event_senders = event_sending_future.await;
1392 task.event_senders = either::Left(event_senders);
1393 WakeUpReason::EventSendersReady
1394 } else if task.event_pending_send.is_some()
1395 || !task.pending_new_subscriptions.is_empty()
1396 {
1397 WakeUpReason::EventSendersReady
1398 } else {
1399 future::pending().await
1400 }
1401 };
1402 let finished_sending_bitswap_event = async {
1403 if let either::Right(bitswap_event_sending_future) = &mut task.bitswap_event_senders
1404 {
1405 let bitswap_event_senders = bitswap_event_sending_future.await;
1406 task.bitswap_event_senders = either::Left(bitswap_event_senders);
1407 WakeUpReason::BitswapEventSendersReady
1408 } else if task.bitswap_event_pending_send.is_some()
1409 || !task.pending_new_bitswap_subscriptions.is_empty()
1410 {
1411 WakeUpReason::BitswapEventSendersReady
1412 } else {
1413 future::pending().await
1414 }
1415 };
1416 let start_discovery = async {
1417 let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1418 future::pending().await
1419 };
1420 next_discovery.get_mut().await;
1421 let ((_, chain_id), _) = next_discovery.remove_entry();
1422 WakeUpReason::StartDiscovery(chain_id)
1423 };
1424
1425 message_for_chain_received
1426 .or(message_received)
1427 .or(message_from_task_received)
1428 .or(service_event)
1429 .or(next_recent_connection_restore)
1430 .or(finished_sending_event)
1431 .or(finished_sending_bitswap_event)
1432 .or(start_discovery)
1433 .await
1434 };
1435
1436 match wake_up_reason {
1437 WakeUpReason::ForegroundClosed => {
1438 return;
1440 }
1441 WakeUpReason::Message(ToBackground::AddChain {
1442 messages_rx,
1443 config,
1444 }) => {
1445 let chain_id = match task.network.add_chain(config) {
1447 Ok(id) => id,
1448 Err(service::AddChainError::Duplicate { existing_identical }) => {
1449 task.network[existing_identical].num_references = task.network
1450 [existing_identical]
1451 .num_references
1452 .checked_add(1)
1453 .unwrap();
1454 existing_identical
1455 }
1456 };
1457
1458 task.chains_by_next_discovery.insert(
1459 (task.network[chain_id].next_discovery_when.clone(), chain_id),
1460 Box::pin(
1461 task.platform
1462 .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1463 ),
1464 );
1465
1466 task.messages_rx
1467 .push(Box::pin(
1468 messages_rx
1469 .map(move |msg| (chain_id, msg))
1470 .chain(stream::once(future::ready((
1471 chain_id,
1472 ToBackgroundChain::RemoveChain,
1473 )))),
1474 ) as Pin<Box<_>>);
1475
1476 log!(
1477 &task.platform,
1478 Debug,
1479 "network",
1480 "chain-added",
1481 id = task.network[chain_id].log_name
1482 );
1483 }
1484 WakeUpReason::EventSendersReady => {
1485 let either::Left(event_senders) = &mut task.event_senders else {
1489 unreachable!()
1490 };
1491
1492 if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1493 task.event_pending_send.take()
1494 {
1495 let mut event_senders = mem::take(event_senders);
1496 task.event_senders = either::Right(Box::pin(async move {
1497 for index in (0..event_senders.len()).rev() {
1500 let (event_sender_chain_id, event_sender) =
1501 event_senders.swap_remove(index);
1502 if event_sender_chain_id == event_to_dispatch_chain_id {
1503 if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1504 continue;
1505 }
1506 }
1507 event_senders.push((event_sender_chain_id, event_sender));
1508 }
1509 event_senders
1510 }));
1511 } else if !task.pending_new_subscriptions.is_empty() {
1512 let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1513 let mut event_senders = mem::take(event_senders);
1514 let open_gossip_links = task.open_gossip_links.clone();
1516 task.event_senders = either::Right(Box::pin(async move {
1517 for (chain_id, new_subscription) in pending_new_subscriptions {
1518 for ((link_chain_id, peer_id), state) in &open_gossip_links {
1519 if *link_chain_id != chain_id {
1521 continue;
1522 }
1523
1524 let _ = new_subscription
1525 .send(Event::Connected {
1526 peer_id: peer_id.clone(),
1527 role: state.role,
1528 best_block_number: state.best_block_number,
1529 best_block_hash: state.best_block_hash,
1530 })
1531 .await;
1532
1533 if let Some(finalized_block_height) = state.finalized_block_height {
1534 let _ = new_subscription
1535 .send(Event::GrandpaNeighborPacket {
1536 peer_id: peer_id.clone(),
1537 finalized_block_height,
1538 })
1539 .await;
1540 }
1541 }
1542
1543 event_senders.push((chain_id, new_subscription));
1544 }
1545
1546 event_senders
1547 }));
1548 }
1549 }
1550 WakeUpReason::BitswapEventSendersReady => {
1551 let either::Left(bitswap_event_senders) = &mut task.bitswap_event_senders else {
1553 unreachable!()
1554 };
1555
1556 if let Some(event_to_dispatch) = task.bitswap_event_pending_send.take() {
1557 let mut bitswap_event_senders = mem::take(bitswap_event_senders);
1558 task.bitswap_event_senders = either::Right(Box::pin(async move {
1559 for index in (0..bitswap_event_senders.len()).rev() {
1562 let event_sender = bitswap_event_senders.swap_remove(index);
1563 if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1564 continue;
1565 }
1566 bitswap_event_senders.push(event_sender);
1567 }
1568 bitswap_event_senders
1569 }));
1570 } else if !task.pending_new_bitswap_subscriptions.is_empty() {
1571 bitswap_event_senders.append(&mut task.pending_new_bitswap_subscriptions);
1572 }
1573 }
1574 WakeUpReason::MessageFromConnection {
1575 connection_id,
1576 message,
1577 } => {
1578 task.network
1579 .inject_connection_message(connection_id, message);
1580 }
1581 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1582 if let Some(new_ref) =
1583 NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1584 {
1585 task.network[chain_id].num_references = new_ref;
1586 continue;
1587 }
1588
1589 for peer_id in task
1590 .network
1591 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1592 .cloned()
1593 .collect::<Vec<_>>()
1594 {
1595 task.network
1596 .gossip_close(
1597 chain_id,
1598 &peer_id,
1599 service::GossipKind::ConsensusTransactions,
1600 )
1601 .unwrap();
1602
1603 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1604 debug_assert!(_was_in.is_some());
1605 }
1606
1607 let _was_in = task
1608 .chains_by_next_discovery
1609 .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1610 debug_assert!(_was_in.is_some());
1611
1612 log!(
1613 &task.platform,
1614 Debug,
1615 "network",
1616 "chain-removed",
1617 id = task.network[chain_id].log_name
1618 );
1619 task.v2_statement_peers.remove(&chain_id);
1620 task.current_affinity_filter.remove(&chain_id);
1621 task.important_nodes.remove(&chain_id);
1622 task.chains_ever_gossip_connected.remove(&chain_id);
1623 task.network.remove_chain(chain_id).unwrap();
1624 task.peering_strategy.remove_chain_peers(&chain_id);
1625 }
1626 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1627 task.pending_new_subscriptions.push((chain_id, sender));
1628 }
1629 WakeUpReason::MessageForChain(
1630 _chain_id,
1631 ToBackgroundChain::SubscribeBitswap { sender },
1632 ) => {
1633 task.pending_new_bitswap_subscriptions.push(sender);
1634 }
1635 WakeUpReason::MessageForChain(
1636 chain_id,
1637 ToBackgroundChain::DisconnectAndBan {
1638 peer_id,
1639 severity,
1640 reason,
1641 },
1642 ) => {
1643 let ban_duration = Duration::from_secs(match severity {
1644 BanSeverity::Low => 10,
1645 BanSeverity::High => 40,
1646 });
1647
1648 let had_slot = matches!(
1649 task.peering_strategy.unassign_slot_and_ban(
1650 &chain_id,
1651 &peer_id,
1652 task.platform.now() + ban_duration,
1653 ),
1654 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1655 );
1656
1657 if had_slot {
1658 log!(
1659 &task.platform,
1660 Debug,
1661 "network",
1662 "slot-unassigned",
1663 chain = &task.network[chain_id].log_name,
1664 peer_id,
1665 ?ban_duration,
1666 reason = "user-ban",
1667 user_reason = reason
1668 );
1669 task.network.gossip_remove_desired(
1670 chain_id,
1671 &peer_id,
1672 service::GossipKind::ConsensusTransactions,
1673 );
1674 }
1675
1676 if task.network.gossip_is_connected(
1677 chain_id,
1678 &peer_id,
1679 service::GossipKind::ConsensusTransactions,
1680 ) {
1681 let _closed_result = task.network.gossip_close(
1682 chain_id,
1683 &peer_id,
1684 service::GossipKind::ConsensusTransactions,
1685 );
1686 debug_assert!(_closed_result.is_ok());
1687
1688 log!(
1689 &task.platform,
1690 Debug,
1691 "network",
1692 "gossip-closed",
1693 chain = &task.network[chain_id].log_name,
1694 peer_id,
1695 );
1696
1697 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1698 debug_assert!(_was_in.is_some());
1699
1700 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
1701 peers.remove(&peer_id);
1702 }
1703
1704 debug_assert!(task.event_pending_send.is_none());
1705 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1706 }
1707 }
1708 WakeUpReason::MessageForChain(
1709 chain_id,
1710 ToBackgroundChain::StartBlocksRequest {
1711 target,
1712 config,
1713 timeout,
1714 result,
1715 },
1716 ) => {
1717 match &config.start {
1718 codec::BlocksRequestConfigStart::Hash(hash) => {
1719 log!(
1720 &task.platform,
1721 Debug,
1722 "network",
1723 "blocks-request-started",
1724 chain = task.network[chain_id].log_name, target,
1725 start = HashDisplay(hash),
1726 num = config.desired_count.get(),
1727 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1728 header = ?config.fields.header, body = ?config.fields.body,
1729 justifications = ?config.fields.justifications
1730 );
1731 }
1732 codec::BlocksRequestConfigStart::Number(number) => {
1733 log!(
1734 &task.platform,
1735 Debug,
1736 "network",
1737 "blocks-request-started",
1738 chain = task.network[chain_id].log_name, target, start = number,
1739 num = config.desired_count.get(),
1740 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1741 header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1742 );
1743 }
1744 }
1745
1746 match task
1747 .network
1748 .start_blocks_request(&target, chain_id, config.clone(), timeout)
1749 {
1750 Ok(substream_id) => {
1751 task.blocks_requests.insert(substream_id, result);
1752 }
1753 Err(service::StartRequestError::NoConnection) => {
1754 log!(
1755 &task.platform,
1756 Debug,
1757 "network",
1758 "blocks-request-error",
1759 chain = task.network[chain_id].log_name,
1760 target,
1761 error = "NoConnection"
1762 );
1763 let _ = result.send(Err(BlocksRequestError::NoConnection));
1764 }
1765 }
1766 }
1767 WakeUpReason::MessageForChain(
1768 chain_id,
1769 ToBackgroundChain::StartWarpSyncRequest {
1770 target,
1771 begin_hash,
1772 timeout,
1773 result,
1774 },
1775 ) => {
1776 log!(
1777 &task.platform,
1778 Debug,
1779 "network",
1780 "warp-sync-request-started",
1781 chain = task.network[chain_id].log_name,
1782 target,
1783 start = HashDisplay(&begin_hash)
1784 );
1785
1786 match task
1787 .network
1788 .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1789 {
1790 Ok(substream_id) => {
1791 task.grandpa_warp_sync_requests.insert(substream_id, result);
1792 }
1793 Err(service::StartRequestError::NoConnection) => {
1794 log!(
1795 &task.platform,
1796 Debug,
1797 "network",
1798 "warp-sync-request-error",
1799 chain = task.network[chain_id].log_name,
1800 target,
1801 error = "NoConnection"
1802 );
1803 let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1804 }
1805 }
1806 }
1807 WakeUpReason::MessageForChain(
1808 chain_id,
1809 ToBackgroundChain::StartStorageProofRequest {
1810 target,
1811 config,
1812 timeout,
1813 result,
1814 },
1815 ) => {
1816 log!(
1817 &task.platform,
1818 Debug,
1819 "network",
1820 "storage-proof-request-started",
1821 chain = task.network[chain_id].log_name,
1822 target,
1823 block_hash = HashDisplay(&config.block_hash)
1824 );
1825
1826 match task.network.start_storage_proof_request(
1827 &target,
1828 chain_id,
1829 config.clone(),
1830 timeout,
1831 ) {
1832 Ok(substream_id) => {
1833 task.storage_proof_requests.insert(substream_id, result);
1834 }
1835 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1836 log!(
1837 &task.platform,
1838 Debug,
1839 "network",
1840 "storage-proof-request-error",
1841 chain = task.network[chain_id].log_name,
1842 target,
1843 error = "NoConnection"
1844 );
1845 let _ = result.send(Err(StorageProofRequestError::NoConnection));
1846 }
1847 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1848 log!(
1849 &task.platform,
1850 Debug,
1851 "network",
1852 "storage-proof-request-error",
1853 chain = task.network[chain_id].log_name,
1854 target,
1855 error = "RequestTooLarge"
1856 );
1857 let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1858 }
1859 };
1860 }
1861 WakeUpReason::MessageForChain(
1862 chain_id,
1863 ToBackgroundChain::StartCallProofRequest {
1864 target,
1865 config,
1866 timeout,
1867 result,
1868 },
1869 ) => {
1870 log!(
1871 &task.platform,
1872 Debug,
1873 "network",
1874 "call-proof-request-started",
1875 chain = task.network[chain_id].log_name,
1876 target,
1877 block_hash = HashDisplay(&config.block_hash),
1878 function = config.method
1879 );
1880 match task.network.start_call_proof_request(
1883 &target,
1884 chain_id,
1885 config.clone(),
1886 timeout,
1887 ) {
1888 Ok(substream_id) => {
1889 task.call_proof_requests.insert(substream_id, result);
1890 }
1891 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1892 log!(
1893 &task.platform,
1894 Debug,
1895 "network",
1896 "call-proof-request-error",
1897 chain = task.network[chain_id].log_name,
1898 target,
1899 error = "NoConnection"
1900 );
1901 let _ = result.send(Err(CallProofRequestError::NoConnection));
1902 }
1903 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1904 log!(
1905 &task.platform,
1906 Debug,
1907 "network",
1908 "call-proof-request-error",
1909 chain = task.network[chain_id].log_name,
1910 target,
1911 error = "RequestTooLarge"
1912 );
1913 let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1914 }
1915 };
1916 }
1917 WakeUpReason::MessageForChain(
1918 chain_id,
1919 ToBackgroundChain::StartChildStorageProofRequest {
1920 target,
1921 config,
1922 timeout,
1923 result,
1924 },
1925 ) => {
1926 log!(
1927 &task.platform,
1928 Debug,
1929 "network",
1930 "child-storage-proof-request-started",
1931 chain = task.network[chain_id].log_name,
1932 target,
1933 block_hash = HashDisplay(&config.block_hash)
1934 );
1935
1936 match task.network.start_child_storage_proof_request(
1937 &target,
1938 chain_id,
1939 codec::ChildStorageProofRequestConfig {
1940 block_hash: config.block_hash,
1941 child_trie: &config.child_trie,
1942 keys: config.keys.iter().map(|k| k.as_slice()),
1943 },
1944 timeout,
1945 ) {
1946 Ok(substream_id) => {
1947 task.child_storage_proof_requests
1948 .insert(substream_id, result);
1949 }
1950 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1951 log!(
1952 &task.platform,
1953 Debug,
1954 "network",
1955 "child-storage-proof-request-error",
1956 chain = task.network[chain_id].log_name,
1957 target,
1958 error = "NoConnection"
1959 );
1960 let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
1961 }
1962 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1963 log!(
1964 &task.platform,
1965 Debug,
1966 "network",
1967 "child-storage-proof-request-error",
1968 chain = task.network[chain_id].log_name,
1969 target,
1970 error = "RequestTooLarge"
1971 );
1972 let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
1973 }
1974 };
1975 }
1976 WakeUpReason::MessageForChain(
1977 chain_id,
1978 ToBackgroundChain::SetLocalBestBlock {
1979 best_hash,
1980 best_number,
1981 },
1982 ) => {
1983 task.network
1984 .set_chain_local_best_block(chain_id, best_hash, best_number);
1985 }
1986 WakeUpReason::MessageForChain(
1987 chain_id,
1988 ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1989 ) => {
1990 log!(
1991 &task.platform,
1992 Debug,
1993 "network",
1994 "local-grandpa-state-announced",
1995 chain = task.network[chain_id].log_name,
1996 set_id = grandpa_state.set_id,
1997 commit_finalized_height = grandpa_state.commit_finalized_height,
1998 );
1999
2000 task.network
2003 .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
2004 }
2005 WakeUpReason::MessageForChain(
2006 chain_id,
2007 ToBackgroundChain::AnnounceTransaction {
2008 transaction,
2009 result,
2010 },
2011 ) => {
2012 let peers_to_send = task
2015 .network
2016 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2017 .cloned()
2018 .collect::<Vec<_>>();
2019
2020 let mut peers_sent = Vec::with_capacity(peers_to_send.len());
2021 let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
2022 for peer in &peers_to_send {
2023 match task
2024 .network
2025 .gossip_send_transaction(peer, chain_id, &transaction)
2026 {
2027 Ok(()) => peers_sent.push(peer.to_base58()),
2028 Err(QueueNotificationError::QueueFull) => {
2029 peers_queue_full.push(peer.to_base58())
2030 }
2031 Err(QueueNotificationError::NoConnection) => unreachable!(),
2032 }
2033 }
2034
2035 log!(
2036 &task.platform,
2037 Debug,
2038 "network",
2039 "transaction-announced",
2040 chain = task.network[chain_id].log_name,
2041 transaction =
2042 hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
2043 size = transaction.len(),
2044 peers_sent = peers_sent.join(", "),
2045 peers_queue_full = peers_queue_full.join(", "),
2046 );
2047
2048 let _ = result.send(peers_to_send);
2049 }
2050 WakeUpReason::MessageForChain(
2051 chain_id,
2052 ToBackgroundChain::SendBlockAnnounce {
2053 target,
2054 scale_encoded_header,
2055 is_best,
2056 result,
2057 },
2058 ) => {
2059 let _ = result.send(task.network.gossip_send_block_announce(
2061 &target,
2062 chain_id,
2063 &scale_encoded_header,
2064 is_best,
2065 ));
2066 }
2067 WakeUpReason::MessageForChain(
2068 _chain_id,
2069 ToBackgroundChain::SendBitswapMessage {
2070 target,
2071 message,
2072 result,
2073 },
2074 ) => {
2075 let _ = result.send(task.network.bitswap_send_message(&target, message));
2076 }
2077 WakeUpReason::MessageForChain(
2078 _chain_id,
2079 ToBackgroundChain::BroadcastBitswapMessage { message, result },
2080 ) => {
2081 let peers = task
2082 .network
2083 .established_bitswap_desired()
2084 .cloned()
2085 .collect::<Vec<_>>();
2086 let results = peers
2087 .iter()
2088 .map(|peer| {
2089 (
2090 peer,
2091 task.network.bitswap_send_message(peer, message.clone()),
2092 )
2093 })
2094 .collect::<Vec<_>>(); let succeeded_peers = results
2097 .iter()
2098 .filter_map(|(peer, r)| r.is_ok().then(|| (*peer).clone()))
2099 .collect::<Vec<_>>();
2100
2101 let r = if !succeeded_peers.is_empty() {
2103 Ok(succeeded_peers)
2104 } else if results
2105 .iter()
2106 .any(|(_peer, r)| matches!(r, Err(SendBitswapMessageError::QueueFull)))
2107 {
2108 Err(SendBitswapMessageError::QueueFull)
2111 } else {
2112 Err(SendBitswapMessageError::NoConnection)
2115 };
2116
2117 let _ = result.send(r);
2118 }
2119 WakeUpReason::MessageForChain(
2120 chain_id,
2121 ToBackgroundChain::BroadcastStatement { statement, result },
2122 ) => {
2123 let peers_to_send = task
2124 .network
2125 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2126 .cloned()
2127 .collect::<Vec<_>>();
2128
2129 let total = peers_to_send.len();
2130 let mut sent = 0;
2131 for peer in &peers_to_send {
2132 if task
2133 .network
2134 .gossip_send_statement(peer, chain_id, statement.clone())
2135 .is_ok()
2136 {
2137 sent += 1;
2138 }
2139 }
2140
2141 log!(
2142 &task.platform,
2143 Debug,
2144 "network",
2145 "statement-broadcast",
2146 chain = task.network[chain_id].log_name,
2147 sent,
2148 total,
2149 );
2150
2151 let _ = result.send(BroadcastStatementResult { sent, total });
2152 }
2153 WakeUpReason::MessageForChain(
2154 chain_id,
2155 ToBackgroundChain::UpdateTopicAffinity { filter },
2156 ) => {
2157 task.current_affinity_filter
2158 .insert(chain_id, filter.clone());
2159 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2160 let mut to_remove = Vec::new();
2161 for peer_id in peers.iter() {
2162 if let Err(
2163 SendTopicAffinityError::NoConnection
2164 | SendTopicAffinityError::ProtocolV1,
2165 ) = task.network.send_topic_affinity(peer_id, chain_id, &filter)
2166 {
2167 to_remove.push(peer_id.clone());
2168 }
2169 }
2170 for peer_id in &to_remove {
2171 peers.remove(peer_id);
2172 }
2173 }
2174 }
2175 WakeUpReason::MessageForChain(
2176 chain_id,
2177 ToBackgroundChain::Discover {
2178 list,
2179 important_nodes,
2180 },
2181 ) => {
2182 for (peer_id, addrs) in list {
2183 if important_nodes {
2184 task.important_nodes
2185 .entry(chain_id)
2186 .or_default()
2187 .insert(peer_id.clone());
2188 }
2189
2190 task.peering_strategy
2193 .insert_chain_peer(chain_id, peer_id.clone(), 30); for addr in addrs {
2196 let _ =
2197 task.peering_strategy
2198 .insert_address(&peer_id, addr.into_bytes(), 10);
2199 }
2201 }
2202 }
2203 WakeUpReason::MessageForChain(
2204 chain_id,
2205 ToBackgroundChain::DiscoveredNodes { result },
2206 ) => {
2207 let _ = result.send(
2209 task.peering_strategy
2210 .chain_peers_unordered(&chain_id)
2211 .map(|peer_id| {
2212 let addrs = task
2213 .peering_strategy
2214 .peer_addresses(peer_id)
2215 .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
2216 .collect::<Vec<_>>();
2217 (peer_id.clone(), addrs)
2218 })
2219 .collect::<Vec<_>>(),
2220 );
2221 }
2222 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
2223 let _ = result.send(
2224 task.network
2225 .gossip_connected_peers(
2226 chain_id,
2227 service::GossipKind::ConsensusTransactions,
2228 )
2229 .cloned()
2230 .collect(),
2231 );
2232 }
2233 WakeUpReason::StartDiscovery(chain_id) => {
2234 let chain = &mut task.network[chain_id];
2236 chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
2237 chain.next_discovery_period =
2238 cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
2239 task.chains_by_next_discovery.insert(
2240 (chain.next_discovery_when.clone(), chain_id),
2241 Box::pin(
2242 task.platform
2243 .sleep(task.network[chain_id].next_discovery_period),
2244 ),
2245 );
2246
2247 const PARALLEL_FIND_NODE_PER_ROUND: usize = 3;
2259
2260 let mut candidates: Vec<PeerId> = task
2261 .network
2262 .kademlia_capable_peers(chain_id)
2263 .cloned()
2264 .collect();
2265 for p in task
2266 .network
2267 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2268 .cloned()
2269 {
2270 if !candidates.contains(&p) {
2271 candidates.push(p);
2272 }
2273 }
2274
2275 let started = dispatch_find_node_requests(
2276 &mut task.network,
2277 &mut task.randomness,
2278 chain_id,
2279 &candidates,
2280 PARALLEL_FIND_NODE_PER_ROUND,
2281 );
2282
2283 let chain_log_name = &task.network[chain_id].log_name;
2284 for (request_target, requested_peer_id) in &started {
2285 log!(
2286 &task.platform,
2287 Debug,
2288 "network",
2289 "discovery-find-node-started",
2290 chain = chain_log_name,
2291 request_target,
2292 requested_peer_id
2293 );
2294 }
2295 if started.is_empty() {
2296 log!(
2297 &task.platform,
2298 Debug,
2299 "network",
2300 "discovery-skipped-no-peer",
2301 chain = chain_log_name
2302 );
2303 }
2304 }
2305 WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
2306 peer_id,
2307 expected_peer_id,
2308 id,
2309 }) => {
2310 let remote_addr =
2311 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
2313 {
2314 log!(
2315 &task.platform,
2316 Debug,
2317 "network",
2318 "handshake-finished-peer-id-mismatch",
2319 remote_addr,
2320 expected_peer_id,
2321 actual_peer_id = peer_id
2322 );
2323
2324 let _was_in = task
2325 .peering_strategy
2326 .decrease_address_connections_and_remove_if_zero(
2327 expected_peer_id,
2328 remote_addr.as_ref(),
2329 );
2330 debug_assert!(_was_in.is_ok());
2331 let _ = task.peering_strategy.increase_address_connections(
2332 &peer_id,
2333 remote_addr.into_bytes().to_vec(),
2334 10,
2335 );
2336 } else {
2337 log!(
2338 &task.platform,
2339 Debug,
2340 "network",
2341 "handshake-finished",
2342 remote_addr,
2343 peer_id
2344 );
2345 }
2346
2347 task.bitswap_peering_strategy
2348 .increase_peer_connections(&peer_id);
2349 }
2350 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2351 expected_peer_id: Some(_),
2352 ..
2353 })
2354 | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
2355 let (address, peer_id, handshake_finished) = match wake_up_reason {
2356 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2357 address,
2358 expected_peer_id: Some(peer_id),
2359 ..
2360 }) => (address, peer_id, false),
2361 WakeUpReason::NetworkEvent(service::Event::Disconnected {
2362 address,
2363 peer_id,
2364 ..
2365 }) => (address, peer_id, true),
2366 _ => unreachable!(),
2367 };
2368
2369 task.peering_strategy
2370 .decrease_address_connections(&peer_id, &address)
2371 .unwrap();
2372 let address = Multiaddr::from_bytes(address).unwrap();
2373 log!(
2374 &task.platform,
2375 Debug,
2376 "network",
2377 "connection-shutdown",
2378 peer_id,
2379 address,
2380 ?handshake_finished
2381 );
2382
2383 let ban_duration = if handshake_finished {
2394 Duration::from_secs(5)
2395 } else {
2396 Duration::from_secs(2)
2397 };
2398 task.network.gossip_remove_desired_all(
2399 &peer_id,
2400 service::GossipKind::ConsensusTransactions,
2401 );
2402 for (&chain_id, what_happened) in task
2403 .peering_strategy
2404 .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
2405 {
2406 if matches!(
2407 what_happened,
2408 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2409 ) {
2410 log!(
2411 &task.platform,
2412 Debug,
2413 "network",
2414 "slot-unassigned",
2415 chain = &task.network[chain_id].log_name,
2416 peer_id,
2417 ?ban_duration,
2418 reason = "pre-handshake-disconnect"
2420 );
2421 }
2422 }
2423
2424 if handshake_finished {
2425 task.network.bitswap_remove_desired(&peer_id);
2426 let what_happened = task
2427 .bitswap_peering_strategy
2428 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration);
2429 if matches!(
2430 what_happened,
2431 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true },
2432 ) {
2433 log!(
2434 &task.platform,
2435 Debug,
2436 "network",
2437 "bitswap-slot-unassigned",
2438 peer_id,
2439 ?ban_duration,
2440 reason = "disconnect",
2441 );
2442 }
2443 let _ = task
2444 .bitswap_peering_strategy
2445 .decrease_peer_connections(&peer_id);
2446 }
2447 }
2448 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2449 expected_peer_id: None,
2450 ..
2451 }) => {
2452 debug_assert!(false);
2455 }
2456 WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
2457 id,
2458 peer_id,
2459 ping_time,
2460 }) => {
2461 let remote_addr =
2462 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); log!(
2464 &task.platform,
2465 Debug,
2466 "network",
2467 "pong",
2468 peer_id,
2469 remote_addr,
2470 ?ping_time
2471 );
2472 }
2473 WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
2474 chain_id,
2475 peer_id,
2476 announce,
2477 }) => {
2478 log!(
2479 &task.platform,
2480 Debug,
2481 "network",
2482 "block-announce-received",
2483 chain = &task.network[chain_id].log_name,
2484 peer_id,
2485 block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
2486 announce.decode().scale_encoded_header
2487 )),
2488 is_best = announce.decode().is_best
2489 );
2490
2491 let decoded_announce = announce.decode();
2492 if decoded_announce.is_best {
2493 let link = task
2494 .open_gossip_links
2495 .get_mut(&(chain_id, peer_id.clone()))
2496 .unwrap();
2497 if let Ok(decoded) = header::decode(
2498 decoded_announce.scale_encoded_header,
2499 task.network[chain_id].block_number_bytes,
2500 ) {
2501 link.best_block_hash = header::hash_from_scale_encoded_header(
2502 decoded_announce.scale_encoded_header,
2503 );
2504 link.best_block_number = decoded.number;
2505 }
2506 }
2507
2508 debug_assert!(task.event_pending_send.is_none());
2509 task.event_pending_send =
2510 Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
2511 }
2512 WakeUpReason::NetworkEvent(service::Event::GossipConnected {
2513 peer_id,
2514 chain_id,
2515 role,
2516 best_number,
2517 best_hash,
2518 kind: service::GossipKind::ConsensusTransactions,
2519 }) => {
2520 log!(
2521 &task.platform,
2522 Debug,
2523 "network",
2524 "gossip-open-success",
2525 chain = &task.network[chain_id].log_name,
2526 peer_id,
2527 best_number,
2528 best_hash = HashDisplay(&best_hash)
2529 );
2530
2531 let _prev_value = task.open_gossip_links.insert(
2532 (chain_id, peer_id.clone()),
2533 OpenGossipLinkState {
2534 best_block_number: best_number,
2535 best_block_hash: best_hash,
2536 role,
2537 finalized_block_height: None,
2538 },
2539 );
2540 debug_assert!(_prev_value.is_none());
2541
2542 task.chains_ever_gossip_connected.insert(chain_id);
2543
2544 debug_assert!(task.event_pending_send.is_none());
2545 task.event_pending_send = Some((
2546 chain_id,
2547 Event::Connected {
2548 peer_id,
2549 role,
2550 best_block_number: best_number,
2551 best_block_hash: best_hash,
2552 },
2553 ));
2554 }
2555 WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
2556 peer_id,
2557 chain_id,
2558 error,
2559 kind: service::GossipKind::ConsensusTransactions,
2560 }) => {
2561 log!(
2562 &task.platform,
2563 Debug,
2564 "network",
2565 "gossip-open-error",
2566 chain = &task.network[chain_id].log_name,
2567 peer_id,
2568 ?error,
2569 );
2570 let ban_duration = Duration::from_millis(5500);
2574
2575 let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
2578 matches!(
2579 task.peering_strategy
2580 .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
2581 basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
2582 )
2583 } else {
2584 matches!(
2585 task.peering_strategy.unassign_slot_and_ban(
2586 &chain_id,
2587 &peer_id,
2588 task.platform.now() + ban_duration,
2589 ),
2590 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2591 )
2592 };
2593
2594 if had_slot {
2595 log!(
2596 &task.platform,
2597 Debug,
2598 "network",
2599 "slot-unassigned",
2600 chain = &task.network[chain_id].log_name,
2601 peer_id,
2602 ?ban_duration,
2603 reason = "gossip-open-failed"
2604 );
2605 task.network.gossip_remove_desired(
2606 chain_id,
2607 &peer_id,
2608 service::GossipKind::ConsensusTransactions,
2609 );
2610 }
2611 }
2612 WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
2613 peer_id,
2614 chain_id,
2615 kind: service::GossipKind::ConsensusTransactions,
2616 }) => {
2617 log!(
2618 &task.platform,
2619 Debug,
2620 "network",
2621 "gossip-closed",
2622 chain = &task.network[chain_id].log_name,
2623 peer_id,
2624 );
2625 let ban_duration = Duration::from_secs(10);
2626
2627 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
2628 debug_assert!(_was_in.is_some());
2629
2630 if matches!(
2633 task.peering_strategy.unassign_slot_and_ban(
2634 &chain_id,
2635 &peer_id,
2636 task.platform.now() + ban_duration,
2637 ),
2638 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2639 ) {
2640 log!(
2641 &task.platform,
2642 Debug,
2643 "network",
2644 "slot-unassigned",
2645 chain = &task.network[chain_id].log_name,
2646 peer_id,
2647 ?ban_duration,
2648 reason = "gossip-closed"
2649 );
2650 task.network.gossip_remove_desired(
2651 chain_id,
2652 &peer_id,
2653 service::GossipKind::ConsensusTransactions,
2654 );
2655 }
2656
2657 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2658 peers.remove(&peer_id);
2659 }
2660
2661 debug_assert!(task.event_pending_send.is_none());
2662 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2663 }
2664 WakeUpReason::NetworkEvent(service::Event::BitswapConnected { peer_id }) => {
2665 task.bitswap_connected_peers = task.bitswap_connected_peers.saturating_add(1);
2666 log!(
2667 &task.platform,
2668 Debug,
2669 "network",
2670 "bitswap-open-success",
2671 peer_id,
2672 total = task.bitswap_connected_peers
2673 );
2674 }
2675 WakeUpReason::NetworkEvent(service::Event::BitswapOpenFailed { peer_id, error }) => {
2676 log!(
2677 &task.platform,
2678 Debug,
2679 "network",
2680 "bitswap-open-error",
2681 peer_id,
2682 ?error
2683 );
2684 let ban_duration = if error.is_protocol_not_available() {
2685 Duration::from_secs(600)
2686 } else {
2687 Duration::from_secs(15)
2688 };
2689 if matches!(
2690 task.bitswap_peering_strategy
2691 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2692 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2693 ) {
2694 log!(
2695 &task.platform,
2696 Debug,
2697 "network",
2698 "bitswap-slot-unassigned",
2699 peer_id,
2700 ?ban_duration,
2701 reason = "bitswap-open-failed"
2702 );
2703 task.network.bitswap_remove_desired(&peer_id);
2704 }
2705 }
2706 WakeUpReason::NetworkEvent(service::Event::BitswapMessage { peer_id, message }) => {
2707 log!(
2708 &task.platform,
2709 Debug,
2710 "network",
2711 "bitswap-message-received",
2712 peer_id
2713 );
2714 debug_assert!(task.bitswap_event_pending_send.is_none());
2715 task.bitswap_event_pending_send =
2716 Some(BitswapEvent::BitswapMessage { peer_id, message });
2717 }
2718 WakeUpReason::NetworkEvent(service::Event::BitswapDisconnected { peer_id }) => {
2719 debug_assert!(task.bitswap_connected_peers > 0);
2720 task.bitswap_connected_peers = task.bitswap_connected_peers.saturating_sub(1);
2721 log!(
2722 &task.platform,
2723 Debug,
2724 "network",
2725 "bitswap-closed",
2726 peer_id,
2727 total = task.bitswap_connected_peers
2728 );
2729 let ban_duration = Duration::from_secs(10);
2730 if matches!(
2731 task.bitswap_peering_strategy
2732 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2733 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2734 ) {
2735 log!(
2736 &task.platform,
2737 Debug,
2738 "network",
2739 "bitswap-slot-unassigned",
2740 peer_id,
2741 ?ban_duration,
2742 reason = "bitswap-closed"
2743 );
2744 task.network.bitswap_remove_desired(&peer_id);
2745 }
2746 }
2747 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2748 substream_id,
2749 peer_id,
2750 chain_id,
2751 response: service::RequestResult::Blocks(response),
2752 }) => {
2753 match &response {
2754 Ok(blocks) => {
2755 log!(
2756 &task.platform,
2757 Debug,
2758 "network",
2759 "blocks-request-success",
2760 chain = task.network[chain_id].log_name,
2761 target = peer_id,
2762 num_blocks = blocks.len(),
2763 block_data_total_size =
2764 BytesDisplay(blocks.iter().fold(0, |sum, block| {
2765 let block_size = block.header.as_ref().map_or(0, |h| h.len())
2766 + block
2767 .body
2768 .as_ref()
2769 .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2770 + block
2771 .justifications
2772 .as_ref()
2773 .into_iter()
2774 .flat_map(|l| l.iter())
2775 .fold(0, |s, j| s + j.justification.len());
2776 sum + u64::try_from(block_size).unwrap()
2777 }))
2778 );
2779 }
2780 Err(error) => {
2781 log!(
2782 &task.platform,
2783 Debug,
2784 "network",
2785 "blocks-request-error",
2786 chain = task.network[chain_id].log_name,
2787 target = peer_id,
2788 ?error
2789 );
2790 }
2791 }
2792
2793 match &response {
2794 Ok(_) => {}
2795 Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2796 Err(err) => {
2797 log!(
2798 &task.platform,
2799 Debug,
2800 "network",
2801 format!(
2802 "Error in block request with {}. This might indicate an \
2803 incompatibility. Error: {}",
2804 peer_id, err
2805 )
2806 );
2807 }
2808 }
2809
2810 let _ = task
2811 .blocks_requests
2812 .remove(&substream_id)
2813 .unwrap()
2814 .send(response.map_err(BlocksRequestError::Request));
2815 }
2816 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2817 substream_id,
2818 peer_id,
2819 chain_id,
2820 response: service::RequestResult::GrandpaWarpSync(response),
2821 }) => {
2822 match &response {
2823 Ok(response) => {
2824 let decoded = response.decode();
2826 log!(
2827 &task.platform,
2828 Debug,
2829 "network",
2830 "warp-sync-request-success",
2831 chain = task.network[chain_id].log_name,
2832 target = peer_id,
2833 num_fragments = decoded.fragments.len(),
2834 is_finished = ?decoded.is_finished,
2835 );
2836 }
2837 Err(error) => {
2838 log!(
2839 &task.platform,
2840 Debug,
2841 "network",
2842 "warp-sync-request-error",
2843 chain = task.network[chain_id].log_name,
2844 target = peer_id,
2845 ?error,
2846 );
2847 }
2848 }
2849
2850 let _ = task
2851 .grandpa_warp_sync_requests
2852 .remove(&substream_id)
2853 .unwrap()
2854 .send(response.map_err(WarpSyncRequestError::Request));
2855 }
2856 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2857 substream_id,
2858 peer_id,
2859 chain_id,
2860 response: service::RequestResult::StorageProof(response),
2861 }) => {
2862 match &response {
2863 Ok(items) => {
2864 let decoded = items.decode();
2865 log!(
2866 &task.platform,
2867 Debug,
2868 "network",
2869 "storage-proof-request-success",
2870 chain = task.network[chain_id].log_name,
2871 target = peer_id,
2872 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2873 );
2874 }
2875 Err(error) => {
2876 log!(
2877 &task.platform,
2878 Debug,
2879 "network",
2880 "storage-proof-request-error",
2881 chain = task.network[chain_id].log_name,
2882 target = peer_id,
2883 ?error
2884 );
2885 }
2886 }
2887
2888 if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
2891 let _ = sender.send(response.map_err(StorageProofRequestError::Request));
2892 } else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
2893 {
2894 let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
2895 } else {
2896 unreachable!()
2897 }
2898 }
2899 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2900 substream_id,
2901 peer_id,
2902 chain_id,
2903 response: service::RequestResult::CallProof(response),
2904 }) => {
2905 match &response {
2906 Ok(items) => {
2907 let decoded = items.decode();
2908 log!(
2909 &task.platform,
2910 Debug,
2911 "network",
2912 "call-proof-request-success",
2913 chain = task.network[chain_id].log_name,
2914 target = peer_id,
2915 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2916 );
2917 }
2918 Err(error) => {
2919 log!(
2920 &task.platform,
2921 Debug,
2922 "network",
2923 "call-proof-request-error",
2924 chain = task.network[chain_id].log_name,
2925 target = peer_id,
2926 ?error
2927 );
2928 }
2929 }
2930
2931 let _ = task
2932 .call_proof_requests
2933 .remove(&substream_id)
2934 .unwrap()
2935 .send(response.map_err(CallProofRequestError::Request));
2936 }
2937 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2938 peer_id: requestee_peer_id,
2939 chain_id,
2940 response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2941 ..
2942 }) => {
2943 let mut any_new_peer = false;
2948 for (peer_id, mut addrs) in nodes {
2949 if addrs.len() >= 10 {
2952 addrs.truncate(10);
2953 }
2954
2955 let mut valid_addrs = Vec::with_capacity(addrs.len());
2956 for addr in addrs {
2957 match Multiaddr::from_bytes(addr) {
2958 Ok(mut a) => {
2959 if !pop_p2p_if_matches(&mut a, &peer_id) {
2960 log!(
2961 &task.platform,
2962 Debug,
2963 "network",
2964 "discovered-address-peer-id-mismatch",
2965 chain = &task.network[chain_id].log_name,
2966 announced_peer_id = peer_id,
2967 addr = &a,
2968 obtained_from = requestee_peer_id
2969 );
2970 continue;
2971 }
2972 if platform::address_parse::multiaddr_to_address(&a)
2973 .ok()
2974 .map_or(false, |addr| {
2975 task.platform.supports_connection_type((&addr).into())
2976 })
2977 {
2978 valid_addrs.push(a)
2979 } else {
2980 log!(
2981 &task.platform,
2982 Debug,
2983 "network",
2984 "discovered-address-not-supported",
2985 chain = &task.network[chain_id].log_name,
2986 peer_id,
2987 addr = &a,
2988 obtained_from = requestee_peer_id
2989 );
2990 }
2991 }
2992 Err((error, addr)) => {
2993 log!(
2994 &task.platform,
2995 Debug,
2996 "network",
2997 "discovered-address-invalid",
2998 chain = &task.network[chain_id].log_name,
2999 peer_id,
3000 error,
3001 addr = hex::encode(&addr),
3002 obtained_from = requestee_peer_id
3003 );
3004 }
3005 }
3006 }
3007
3008 if !valid_addrs.is_empty() {
3009 let insert_outcome =
3012 task.peering_strategy
3013 .insert_chain_peer(chain_id, peer_id.clone(), 30); if let basic_peering_strategy::InsertChainPeerResult::Inserted {
3016 peer_removed,
3017 } = insert_outcome
3018 {
3019 any_new_peer = true;
3020 if let Some(peer_removed) = peer_removed {
3021 log!(
3022 &task.platform,
3023 Debug,
3024 "network",
3025 "peer-purged-from-address-book",
3026 chain = &task.network[chain_id].log_name,
3027 peer_id = peer_removed,
3028 );
3029 }
3030
3031 log!(
3032 &task.platform,
3033 Debug,
3034 "network",
3035 "peer-discovered",
3036 chain = &task.network[chain_id].log_name,
3037 peer_id,
3038 addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), obtained_from = requestee_peer_id
3040 );
3041 }
3042 }
3043
3044 for addr in valid_addrs {
3045 let _insert_result =
3046 task.peering_strategy
3047 .insert_address(&peer_id, addr.into_bytes(), 10); debug_assert!(!matches!(
3049 _insert_result,
3050 basic_peering_strategy::InsertAddressResult::UnknownPeer
3051 ));
3052 }
3053 }
3054
3055 if any_new_peer {
3056 task.network[chain_id].next_discovery_period = Duration::from_secs(2);
3057 }
3058 }
3059 WakeUpReason::NetworkEvent(service::Event::RequestResult {
3060 peer_id,
3061 chain_id,
3062 response: service::RequestResult::KademliaFindNode(Err(error)),
3063 ..
3064 }) => {
3065 log!(
3066 &task.platform,
3067 Debug,
3068 "network",
3069 "discovery-find-node-error",
3070 chain = &task.network[chain_id].log_name,
3071 ?error,
3072 find_node_target = peer_id,
3073 );
3074
3075 match error {
3078 service::KademliaFindNodeError::RequestFailed(err)
3079 if !err.is_protocol_error() => {}
3080
3081 service::KademliaFindNodeError::RequestFailed(
3082 service::RequestError::Substream(
3083 connection::established::RequestError::ProtocolNotAvailable,
3084 ),
3085 ) => {
3086 log!(
3088 &task.platform,
3089 Warn,
3090 "network",
3091 format!(
3092 "Problem during discovery on {}: protocol not available. \
3093 This might indicate that the version of Substrate used by \
3094 the chain doesn't include \
3095 <https://github.com/paritytech/substrate/pull/12545>.",
3096 &task.network[chain_id].log_name
3097 )
3098 );
3099 }
3100 _ => {
3101 log!(
3102 &task.platform,
3103 Debug,
3104 "network",
3105 format!(
3106 "Problem during discovery on {}: {}",
3107 &task.network[chain_id].log_name, error
3108 )
3109 );
3110 }
3111 }
3112 }
3113 WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
3114 unreachable!()
3116 }
3117 WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
3118 peer_id,
3119 chain_id,
3120 kind: service::GossipKind::ConsensusTransactions,
3121 }) => {
3122 if task
3127 .network
3128 .opened_gossip_undesired_by_chain(chain_id)
3129 .count()
3130 < 4
3131 {
3132 log!(
3133 &task.platform,
3134 Debug,
3135 "network",
3136 "gossip-in-request",
3137 chain = &task.network[chain_id].log_name,
3138 peer_id,
3139 outcome = "accepted"
3140 );
3141 task.network
3142 .gossip_open(
3143 chain_id,
3144 &peer_id,
3145 service::GossipKind::ConsensusTransactions,
3146 )
3147 .unwrap();
3148 } else {
3149 log!(
3150 &task.platform,
3151 Debug,
3152 "network",
3153 "gossip-in-request",
3154 chain = &task.network[chain_id].log_name,
3155 peer_id,
3156 outcome = "rejected",
3157 );
3158 task.network
3159 .gossip_close(
3160 chain_id,
3161 &peer_id,
3162 service::GossipKind::ConsensusTransactions,
3163 )
3164 .unwrap();
3165 }
3166 }
3167 WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
3168 unreachable!()
3170 }
3171 WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
3172 peer_id,
3173 substream_id,
3174 }) => {
3175 log!(
3176 &task.platform,
3177 Debug,
3178 "network",
3179 "identify-request-received",
3180 peer_id,
3181 );
3182 task.network
3183 .respond_identify(substream_id, &task.identify_agent_version);
3184 }
3185 WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
3186 WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
3187 unreachable!()
3189 }
3190 WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
3191 chain_id,
3192 peer_id,
3193 state,
3194 }) => {
3195 log!(
3196 &task.platform,
3197 Debug,
3198 "network",
3199 "grandpa-neighbor-packet-received",
3200 chain = &task.network[chain_id].log_name,
3201 peer_id,
3202 round_number = state.round_number,
3203 set_id = state.set_id,
3204 commit_finalized_height = state.commit_finalized_height,
3205 );
3206
3207 task.open_gossip_links
3208 .get_mut(&(chain_id, peer_id.clone()))
3209 .unwrap()
3210 .finalized_block_height = Some(state.commit_finalized_height);
3211
3212 debug_assert!(task.event_pending_send.is_none());
3213 task.event_pending_send = Some((
3214 chain_id,
3215 Event::GrandpaNeighborPacket {
3216 peer_id,
3217 finalized_block_height: state.commit_finalized_height,
3218 },
3219 ));
3220 }
3221 WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
3222 chain_id,
3223 peer_id,
3224 message,
3225 }) => {
3226 log!(
3227 &task.platform,
3228 Debug,
3229 "network",
3230 "grandpa-commit-message-received",
3231 chain = &task.network[chain_id].log_name,
3232 peer_id,
3233 target_block_hash = HashDisplay(message.decode().target_hash),
3234 );
3235
3236 debug_assert!(task.event_pending_send.is_none());
3237 task.event_pending_send =
3238 Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
3239 }
3240 WakeUpReason::NetworkEvent(service::Event::StatementsNotification {
3241 chain_id,
3242 peer_id,
3243 statements,
3244 }) => {
3245 debug_assert!(task.event_pending_send.is_none());
3246
3247 if statements.is_empty() {
3248 continue;
3249 }
3250
3251 task.event_pending_send = Some((
3252 chain_id,
3253 Event::StatementsNotification {
3254 peer_id,
3255 statements,
3256 },
3257 ));
3258 }
3259 WakeUpReason::NetworkEvent(service::Event::StatementProtocolConnected {
3260 peer_id,
3261 chain_id,
3262 version,
3263 }) => {
3264 log!(
3265 &task.platform,
3266 Trace,
3267 "network",
3268 "statement-protocol-open-success",
3269 chain = &task.network[chain_id].log_name,
3270 peer_id,
3271 ?version,
3272 );
3273
3274 if matches!(version, codec::StatementProtocolVersion::V2) {
3275 task.v2_statement_peers
3276 .entry(chain_id)
3277 .or_insert_with(|| {
3278 HashSet::with_capacity_and_hasher(16, Default::default())
3279 })
3280 .insert(peer_id.clone());
3281 if let Some(filter) = task.current_affinity_filter.get(&chain_id) {
3282 if let Err(
3283 SendTopicAffinityError::NoConnection
3284 | SendTopicAffinityError::ProtocolV1,
3285 ) = task.network.send_topic_affinity(&peer_id, chain_id, filter)
3286 {
3287 task.v2_statement_peers
3288 .get_mut(&chain_id)
3289 .unwrap()
3290 .remove(&peer_id);
3291 }
3292 }
3293 }
3294 }
3295 WakeUpReason::NetworkEvent(service::Event::StatementTopicAffinityReceived {
3297 ..
3298 }) => {}
3299 WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
3300 log!(
3302 &task.platform,
3303 Warn,
3304 "network",
3305 "protocol-error",
3306 peer_id,
3307 ?error
3308 );
3309
3310 }
3312 WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
3313 task.peering_strategy.assign_slot(&chain_id, &peer_id);
3314
3315 log!(
3316 &task.platform,
3317 Debug,
3318 "network",
3319 "slot-assigned",
3320 chain = &task.network[chain_id].log_name,
3321 peer_id
3322 );
3323
3324 task.network.gossip_insert_desired(
3325 chain_id,
3326 peer_id,
3327 service::GossipKind::ConsensusTransactions,
3328 );
3329 }
3330 WakeUpReason::CanAssignBitswapSlot(peer_id) => {
3331 task.bitswap_peering_strategy.assign_slot(&peer_id).unwrap();
3332
3333 log!(
3334 &task.platform,
3335 Debug,
3336 "network",
3337 "bitswap-slot-assigned",
3338 peer_id
3339 );
3340
3341 task.network.bitswap_insert_desired(peer_id);
3342 }
3343 WakeUpReason::NextRecentConnectionRestore => {
3344 task.num_recent_connection_opening =
3345 task.num_recent_connection_opening.saturating_sub(1);
3346 }
3347 WakeUpReason::CanStartConnect(expected_peer_id) => {
3348 let Some(multiaddr) = task
3349 .peering_strategy
3350 .pick_address_and_add_connection(&expected_peer_id)
3351 else {
3352 task.network.gossip_remove_desired_all(
3354 &expected_peer_id,
3355 service::GossipKind::ConsensusTransactions,
3356 );
3357 let ban_duration = Duration::from_secs(10);
3358 for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
3359 &expected_peer_id,
3360 task.platform.now() + ban_duration,
3361 ) {
3362 if matches!(
3363 what_happened,
3364 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
3365 ) {
3366 log!(
3367 &task.platform,
3368 Debug,
3369 "network",
3370 "slot-unassigned",
3371 chain = &task.network[chain_id].log_name,
3372 peer_id = expected_peer_id,
3373 ?ban_duration,
3374 reason = "no-address"
3375 );
3376 }
3377 }
3378 continue;
3379 };
3380
3381 let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
3382 Ok(a) => a,
3383 Err((multiaddr::FromBytesError, addr)) => {
3384 let _was_in = task
3386 .peering_strategy
3387 .decrease_address_connections_and_remove_if_zero(
3388 &expected_peer_id,
3389 &addr,
3390 );
3391 debug_assert!(_was_in.is_ok());
3392 continue;
3393 }
3394 };
3395
3396 let address = address_parse::multiaddr_to_address(&multiaddr)
3397 .ok()
3398 .filter(|addr| {
3399 task.platform.supports_connection_type(match &addr {
3400 address_parse::AddressOrMultiStreamAddress::Address(addr) => {
3401 From::from(addr)
3402 }
3403 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3404 addr,
3405 ) => From::from(addr),
3406 })
3407 });
3408
3409 let Some(address) = address else {
3410 let _was_in = task
3412 .peering_strategy
3413 .decrease_address_connections_and_remove_if_zero(
3414 &expected_peer_id,
3415 multiaddr.as_ref(),
3416 );
3417 debug_assert!(_was_in.is_ok());
3418 continue;
3419 };
3420
3421 let noise_key = {
3423 let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
3424 task.platform.fill_random_bytes(&mut *noise_static_key);
3425 let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
3426 task.platform.fill_random_bytes(&mut *libp2p_key);
3427 connection::NoiseKey::new(&libp2p_key, &noise_static_key)
3428 };
3429
3430 log!(
3431 &task.platform,
3432 Debug,
3433 "network",
3434 "connection-started",
3435 expected_peer_id,
3436 remote_addr = multiaddr,
3437 local_peer_id =
3438 peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
3439 .into_peer_id(),
3440 );
3441
3442 task.num_recent_connection_opening += 1;
3443
3444 let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
3445 async_channel::bounded(8);
3446 let task_name = format!("connection-{}", multiaddr);
3447
3448 match address {
3449 address_parse::AddressOrMultiStreamAddress::Address(address) => {
3450 let connection = task.platform.connect_stream(address).await;
3453
3454 let (connection_id, connection_task) =
3455 task.network.add_single_stream_connection(
3456 task.platform.now(),
3457 service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
3458 is_initiator: true,
3459 noise_key: &noise_key,
3460 },
3461 multiaddr.clone().into_bytes(),
3462 Some(expected_peer_id.clone()),
3463 coordinator_to_connection_tx,
3464 );
3465
3466 task.platform.spawn_task(
3467 task_name.into(),
3468 tasks::single_stream_connection_task::<TPlat>(
3469 connection,
3470 multiaddr.to_string(),
3471 task.platform.clone(),
3472 connection_id,
3473 connection_task,
3474 coordinator_to_connection_rx,
3475 task.tasks_messages_tx.clone(),
3476 ),
3477 );
3478 }
3479 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3480 platform::MultiStreamAddress::WebRtc {
3481 ip,
3482 port,
3483 remote_certificate_sha256,
3484 },
3485 ) => {
3486 let connection = task
3491 .platform
3492 .connect_multistream(platform::MultiStreamAddress::WebRtc {
3493 ip,
3494 port,
3495 remote_certificate_sha256,
3496 })
3497 .await;
3498
3499 let local_tls_certificate_multihash = [18u8, 32]
3501 .into_iter()
3502 .chain(connection.local_tls_certificate_sha256.into_iter())
3503 .collect();
3504 let remote_tls_certificate_multihash = [18u8, 32]
3505 .into_iter()
3506 .chain(remote_certificate_sha256.iter().copied())
3507 .collect();
3508
3509 let (connection_id, connection_task) =
3510 task.network.add_multi_stream_connection(
3511 task.platform.now(),
3512 service::MultiStreamHandshakeKind::WebRtc {
3513 is_initiator: true,
3514 local_tls_certificate_multihash,
3515 remote_tls_certificate_multihash,
3516 noise_key: &noise_key,
3517 },
3518 multiaddr.clone().into_bytes(),
3519 Some(expected_peer_id.clone()),
3520 coordinator_to_connection_tx,
3521 );
3522
3523 task.platform.spawn_task(
3524 task_name.into(),
3525 tasks::webrtc_multi_stream_connection_task::<TPlat>(
3526 connection.connection,
3527 multiaddr.to_string(),
3528 task.platform.clone(),
3529 connection_id,
3530 connection_task,
3531 coordinator_to_connection_rx,
3532 task.tasks_messages_tx.clone(),
3533 ),
3534 );
3535 }
3536 }
3537 }
3538 WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
3539 task.network
3540 .gossip_open(
3541 chain_id,
3542 &peer_id,
3543 service::GossipKind::ConsensusTransactions,
3544 )
3545 .unwrap();
3546
3547 log!(
3548 &task.platform,
3549 Debug,
3550 "network",
3551 "gossip-open-start",
3552 chain = &task.network[chain_id].log_name,
3553 peer_id,
3554 );
3555 }
3556 WakeUpReason::CanOpenBitswap(peer_id) => {
3557 task.network.bitswap_open(&peer_id).unwrap();
3558
3559 log!(
3560 &task.platform,
3561 Debug,
3562 "network",
3563 "bitswap-open-start",
3564 peer_id
3565 );
3566 }
3567 WakeUpReason::MessageToConnection {
3568 connection_id,
3569 message,
3570 } => {
3571 let _send_result = task.network[connection_id].send(message).await;
3579 debug_assert!(_send_result.is_ok());
3580 }
3581 }
3582 }
3583}
3584
3585fn dispatch_find_node_requests<TChain, TConn, TNow>(
3592 network: &mut service::ChainNetwork<TChain, TConn, TNow>,
3593 randomness: &mut impl rand_chacha::rand_core::RngCore,
3594 chain_id: service::ChainId,
3595 candidates: &[PeerId],
3596 max: usize,
3597) -> Vec<(PeerId, PeerId)>
3598where
3599 TNow: Clone
3600 + core::ops::Add<Duration, Output = TNow>
3601 + core::ops::Sub<TNow, Output = Duration>
3602 + Ord,
3603{
3604 let mut started = Vec::with_capacity(max);
3605
3606 for target in candidates {
3607 if started.len() >= max {
3608 break;
3609 }
3610
3611 let random_peer_id = {
3612 let mut pub_key = [0; 32];
3613 randomness.fill_bytes(&mut pub_key);
3614 PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
3615 };
3616
3617 match network.start_kademlia_find_node_request(
3618 target,
3619 chain_id,
3620 &random_peer_id,
3621 Duration::from_secs(20),
3622 ) {
3623 Ok(_) => started.push((target.clone(), random_peer_id)),
3624 Err(service::StartRequestError::NoConnection) => {}
3625 }
3626 }
3627
3628 started
3629}
3630
3631fn pop_p2p_if_matches(
3634 addr: &mut smoldot::libp2p::multiaddr::Multiaddr,
3635 expected_peer: &smoldot::libp2p::peer_id::PeerId,
3636) -> bool {
3637 use smoldot::libp2p::multiaddr::Protocol;
3638 match addr.iter().last() {
3639 Some(Protocol::P2p(mh)) => {
3640 if mh.into_bytes() == expected_peer.as_bytes() {
3641 addr.pop();
3642 true
3643 } else {
3644 false
3645 }
3646 }
3647 _ => true,
3648 }
3649}
3650
3651#[cfg(test)]
3652mod tests {
3653 use super::{Role, dispatch_find_node_requests, pop_p2p_if_matches, service};
3654 use core::time::Duration;
3655 use rand_chacha::rand_core::SeedableRng as _;
3656 use smoldot::libp2p::{multiaddr::Multiaddr, peer_id::PeerId};
3657
3658 const PEER_A: &str = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN";
3662 const PEER_B: &str = "12D3KooWQk1yQtG1YugyKjiQf6KNk8VjGGAT5xy1FWcnRKN4yXYJ";
3663
3664 fn peer(s: &str) -> PeerId {
3665 PeerId::from_bytes(bs58::decode(s).into_vec().unwrap()).unwrap()
3666 }
3667
3668 #[test]
3669 fn no_suffix_passes_through_unchanged() {
3670 let mut addr: Multiaddr = "/ip4/127.0.0.1/tcp/30333/ws".parse().unwrap();
3671 let before = addr.clone();
3672 assert!(pop_p2p_if_matches(&mut addr, &peer(PEER_A)));
3673 assert_eq!(addr, before);
3674 }
3675
3676 #[test]
3677 fn matching_suffix_is_stripped() {
3678 let mut addr: Multiaddr = format!("/ip4/127.0.0.1/tcp/30333/ws/p2p/{PEER_A}")
3679 .parse()
3680 .unwrap();
3681 assert!(pop_p2p_if_matches(&mut addr, &peer(PEER_A)));
3682 let expected: Multiaddr = "/ip4/127.0.0.1/tcp/30333/ws".parse().unwrap();
3683 assert_eq!(addr, expected);
3684 }
3685
3686 #[test]
3687 fn mismatched_suffix_rejects_and_keeps_addr() {
3688 let original: Multiaddr = format!("/ip4/127.0.0.1/tcp/30333/ws/p2p/{PEER_A}")
3689 .parse()
3690 .unwrap();
3691 let mut addr = original.clone();
3692 assert!(!pop_p2p_if_matches(&mut addr, &peer(PEER_B)));
3693 assert_eq!(addr, original);
3694 }
3695
3696 fn empty_network() -> (service::ChainNetwork<(), (), Duration>, service::ChainId) {
3697 let mut network = service::ChainNetwork::new(service::Config {
3698 connections_capacity: 8,
3699 chains_capacity: 1,
3700 randomness_seed: [0; 32],
3701 handshake_timeout: Duration::from_secs(10),
3702 });
3703 let chain_id = network
3704 .add_chain(service::ChainConfig {
3705 user_data: (),
3706 genesis_hash: [0; 32],
3707 fork_id: None,
3708 block_number_bytes: 4,
3709 grandpa_protocol_config: None,
3710 allow_inbound_block_requests: false,
3711 best_hash: [0; 32],
3712 best_number: 0,
3713 role: Role::Light,
3714 enable_statement_protocol: false,
3715 })
3716 .unwrap();
3717 (network, chain_id)
3718 }
3719
3720 #[test]
3723 fn dispatch_skips_unreachable_candidates() {
3724 let (mut network, chain_id) = empty_network();
3725 let mut randomness = rand_chacha::ChaCha20Rng::from_seed([7; 32]);
3726
3727 let candidates = [peer(PEER_A), peer(PEER_B)];
3728 let started =
3729 dispatch_find_node_requests(&mut network, &mut randomness, chain_id, &candidates, 3);
3730
3731 assert!(started.is_empty());
3732 }
3733}