smoldot_light/
network_service.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Background network service.
19//!
20//! The [`NetworkService`] manages background tasks dedicated to connecting to other nodes.
21//! Importantly, its design is oriented towards the particular use case of the light client.
22//!
23//! The [`NetworkService`] spawns one background task (using [`PlatformRef::spawn_task`]) for
24//! each active connection.
25//!
26//! The objective of the [`NetworkService`] in general is to try stay connected as much as
27//! possible to the nodes of the peer-to-peer network of the chain, and maintain open substreams
28//! with them in order to send out requests (e.g. block requests) and notifications (e.g. block
29//! announces).
30//!
31//! Connectivity to the network is performed in the background as an implementation detail of
32//! the service. The public API only allows emitting requests and notifications towards the
33//! already-connected nodes.
34//!
35//! After a [`NetworkService`] is created, one can add chains using [`NetworkService::add_chain`].
36//! If all references to a [`NetworkServiceChain`] are destroyed, the chain is automatically
37//! purged.
38//!
39//! An important part of the API is the list of channel receivers of [`Event`] returned by
40//! [`NetworkServiceChain::subscribe`]. These channels inform the foreground about updates to the
41//! network connectivity.
42
43use crate::{
44    log,
45    platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49    borrow::ToOwned as _,
50    boxed::Box,
51    collections::BTreeMap,
52    format,
53    string::{String, ToString as _},
54    sync::Arc,
55    vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, num::NonZeroUsize, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65    header,
66    informant::{BytesDisplay, HashDisplay},
67    libp2p::{
68        connection,
69        multiaddr::{self, Multiaddr},
70        peer_id,
71    },
72    network::{basic_peering_strategy, bitswap_peering_strategy, codec, service},
73};
74
75pub use codec::{AffinityFilter, CallProofRequestConfig, Role};
76use service::SendTopicAffinityError;
77pub use service::{
78    ChainId, EncodedMerkleProof, PeerId, QueueNotificationError, SendBitswapMessageError,
79};
80
81/// Configuration for the Statement Store protocol.
82#[derive(Debug, Clone)]
83pub struct StatementProtocolConfig {
84    /// Per-subscription LRU cache size used for deduplicating delivered statements.
85    max_seen_statements: NonZeroUsize,
86    false_positive_rate: f64,
87    bloom_seed: u128,
88    affinity_update_interval: Duration,
89}
90
91impl StatementProtocolConfig {
92    pub fn new(
93        max_seen_statements: NonZeroUsize,
94        false_positive_rate: f64,
95        bloom_seed: u128,
96        affinity_update_interval: Duration,
97    ) -> Self {
98        assert!(
99            false_positive_rate.is_finite()
100                && false_positive_rate > 0.0
101                && false_positive_rate < 1.0
102        );
103        assert!(!affinity_update_interval.is_zero());
104        StatementProtocolConfig {
105            max_seen_statements,
106            false_positive_rate,
107            bloom_seed,
108            affinity_update_interval,
109        }
110    }
111
112    pub fn max_seen_statements(&self) -> NonZeroUsize {
113        self.max_seen_statements
114    }
115
116    pub fn false_positive_rate(&self) -> f64 {
117        self.false_positive_rate
118    }
119
120    pub fn bloom_seed(&self) -> u128 {
121        self.bloom_seed
122    }
123
124    pub fn affinity_update_interval(&self) -> Duration {
125        self.affinity_update_interval
126    }
127}
128
129mod tasks;
130
131/// Configuration for a [`NetworkService`].
132pub struct Config<TPlat> {
133    /// Access to the platform's capabilities.
134    pub platform: TPlat,
135
136    /// Value sent back for the agent version when receiving an identification request.
137    pub identify_agent_version: String,
138
139    /// Capacity to allocate for the list of chains.
140    pub chains_capacity: usize,
141
142    /// Maximum number of connections that the service can open simultaneously. After this value
143    /// has been reached, a new connection can be opened after each
144    /// [`Config::connections_open_pool_restore_delay`].
145    pub connections_open_pool_size: u32,
146
147    /// Delay after which the service can open a new connection.
148    /// The delay is cumulative. If no connection has been opened for example for twice this
149    /// duration, then two connections can be opened at the same time, up to a maximum of
150    /// [`Config::connections_open_pool_size`].
151    pub connections_open_pool_restore_delay: Duration,
152}
153
154/// See [`NetworkService::add_chain`].
155///
156/// Note that this configuration is intentionally missing a field containing the bootstrap
157/// nodes of the chain. Bootstrap nodes are supposed to be added afterwards by calling
158/// [`NetworkServiceChain::discover`].
159pub struct ConfigChain {
160    /// Name of the chain, for logging purposes.
161    pub log_name: String,
162
163    /// Number of "out slots" of this chain. We establish simultaneously gossip substreams up to
164    /// this number of peers.
165    pub num_out_slots: usize,
166
167    /// Hash of the genesis block of the chain. Sent to other nodes in order to determine whether
168    /// the chains match.
169    ///
170    /// > **Note**: Be aware that this *must* be the *genesis* block, not any block known to be
171    /// >           in the chain.
172    pub genesis_block_hash: [u8; 32],
173
174    /// Number and hash of the current best block. Can later be updated with
175    /// [`NetworkServiceChain::set_local_best_block`].
176    pub best_block: (u64, [u8; 32]),
177
178    /// Optional identifier to insert into the networking protocol names. Used to differentiate
179    /// between chains with the same genesis hash.
180    pub fork_id: Option<String>,
181
182    /// Number of bytes of the block number in the networking protocol.
183    pub block_number_bytes: usize,
184
185    /// Must be `Some` if and only if the chain uses the GrandPa networking protocol. Contains the
186    /// number of the finalized block at the time of the initialization.
187    pub grandpa_protocol_finalized_block_height: Option<u64>,
188
189    /// If `Some`, enables the statement store protocol.
190    pub statement_protocol_config: Option<StatementProtocolConfig>,
191}
192
193pub struct NetworkService<TPlat: PlatformRef> {
194    /// Channel connected to the background service.
195    messages_tx: async_channel::Sender<ToBackground<TPlat>>,
196
197    /// See [`Config::platform`].
198    platform: TPlat,
199}
200
201impl<TPlat: PlatformRef> NetworkService<TPlat> {
202    /// Initializes the network service with the given configuration.
203    pub fn new(config: Config<TPlat>) -> Arc<Self> {
204        let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
205
206        let network = service::ChainNetwork::new(service::Config {
207            chains_capacity: config.chains_capacity,
208            connections_capacity: 32,
209            // Shortened from 8s: parallel dials hold slots until this fires.
210            handshake_timeout: Duration::from_secs(4),
211            randomness_seed: {
212                let mut seed = [0; 32];
213                config.platform.fill_random_bytes(&mut seed);
214                seed
215            },
216        });
217
218        // Spawn main task that processes the network service.
219        let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
220        let task = Box::pin(background_task(BackgroundTask {
221            randomness: rand_chacha::ChaCha20Rng::from_seed({
222                let mut seed = [0; 32];
223                config.platform.fill_random_bytes(&mut seed);
224                seed
225            }),
226            identify_agent_version: config.identify_agent_version,
227            tasks_messages_tx,
228            tasks_messages_rx: Box::pin(tasks_messages_rx),
229            peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
230                basic_peering_strategy::Config {
231                    randomness_seed: {
232                        let mut seed = [0; 32];
233                        config.platform.fill_random_bytes(&mut seed);
234                        seed
235                    },
236                    peers_capacity: 50, // TODO: ?
237                    chains_capacity: config.chains_capacity,
238                },
239            ),
240            bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy::new(
241                bitswap_peering_strategy::Config {
242                    randomness_seed: {
243                        let mut seed = [0; 32];
244                        config.platform.fill_random_bytes(&mut seed);
245                        seed
246                    },
247                    peers_capacity: 50, // TODO: hardcoded to the same value as `peering_strategy`.
248                },
249            ),
250            network,
251            connections_open_pool_size: config.connections_open_pool_size,
252            connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
253            num_recent_connection_opening: 0,
254            next_recent_connection_restore: None,
255            platform: config.platform.clone(),
256            open_gossip_links: BTreeMap::new(),
257            chains_ever_gossip_connected: HashSet::with_capacity_and_hasher(4, Default::default()),
258            v2_statement_peers: HashMap::with_capacity_and_hasher(4, Default::default()),
259            current_affinity_filter: HashMap::with_capacity_and_hasher(4, Default::default()),
260            event_pending_send: None,
261            event_senders: either::Left(Vec::new()),
262            pending_new_subscriptions: Vec::new(),
263            bitswap_event_pending_send: None,
264            bitswap_connected_peers: 0,
265            bitswap_event_senders: either::Left(Vec::new()),
266            pending_new_bitswap_subscriptions: Vec::new(),
267            important_nodes: HashMap::with_capacity_and_hasher(16, Default::default()),
268            main_messages_rx: Box::pin(main_messages_rx),
269            messages_rx: stream::SelectAll::new(),
270            blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
271            grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
272            storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
273            call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
274            child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
275            chains_by_next_discovery: BTreeMap::new(),
276        }));
277
278        config.platform.spawn_task("network-service".into(), {
279            let platform = config.platform.clone();
280            async move {
281                task.await;
282                log!(&platform, Debug, "network", "shutdown");
283            }
284        });
285
286        Arc::new(NetworkService {
287            messages_tx: main_messages_tx,
288            platform: config.platform,
289        })
290    }
291
292    /// Adds a chain to the list of chains that the network service connects to.
293    ///
294    /// Returns an object representing the chain and that allows interacting with it. If all
295    /// references to [`NetworkServiceChain`] are destroyed, the network service automatically
296    /// purges that chain.
297    pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
298        let (messages_tx, messages_rx) = async_channel::bounded(32);
299
300        // TODO: this code is hacky because we don't want to make `add_chain` async at the moment, because it's not convenient for lib.rs
301        self.platform.spawn_task("add-chain-message-send".into(), {
302            let config = service::ChainConfig {
303                grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
304                    |commit_finalized_height| service::GrandpaState {
305                        commit_finalized_height,
306                        round_number: 1,
307                        set_id: 0,
308                    },
309                ),
310                enable_statement_protocol: config.statement_protocol_config.is_some(),
311                fork_id: config.fork_id.clone(),
312                block_number_bytes: config.block_number_bytes,
313                best_hash: config.best_block.1,
314                best_number: config.best_block.0,
315                genesis_hash: config.genesis_block_hash,
316                role: Role::Light,
317                allow_inbound_block_requests: false,
318                user_data: Chain {
319                    log_name: config.log_name,
320                    block_number_bytes: config.block_number_bytes,
321                    num_out_slots: config.num_out_slots,
322                    num_references: NonZero::<usize>::new(1).unwrap(),
323                    next_discovery_period: Duration::from_secs(2),
324                    next_discovery_when: self.platform.now(),
325                },
326            };
327
328            let messages_tx = self.messages_tx.clone();
329            async move {
330                let _ = messages_tx
331                    .send(ToBackground::AddChain {
332                        messages_rx,
333                        config,
334                    })
335                    .await;
336            }
337        });
338
339        Arc::new(NetworkServiceChain {
340            _keep_alive_messages_tx: self.messages_tx.clone(),
341            messages_tx,
342            marker: core::marker::PhantomData,
343        })
344    }
345}
346
347pub struct NetworkServiceChain<TPlat: PlatformRef> {
348    /// Copy of [`NetworkService::messages_tx`]. Used in order to maintain the network service
349    /// background task alive.
350    _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
351
352    /// Channel to send messages to the background task.
353    messages_tx: async_channel::Sender<ToBackgroundChain>,
354
355    /// Dummy to hold the `TPlat` type.
356    marker: core::marker::PhantomData<TPlat>,
357}
358
359/// Severity of a ban. See [`NetworkServiceChain::ban_and_disconnect`].
360#[derive(Debug, Copy, Clone, PartialEq, Eq)]
361pub enum BanSeverity {
362    Low,
363    High,
364}
365
366impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
367    /// Subscribes to the networking events that happen on the given chain.
368    ///
369    /// Calling this function returns a `Receiver` that receives events about the chain.
370    /// The new channel will immediately receive events about all the existing connections, so
371    /// that it is able to maintain a coherent view of the network.
372    ///
373    /// Note that this function is `async`, but it should return very quickly.
374    ///
375    /// The `Receiver` **must** be polled continuously. When the channel is full, the networking
376    /// connections will be back-pressured until the channel isn't full anymore.
377    ///
378    /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed.
379    /// If `None` is yielded and the [`NetworkService`] is still alive, you should call
380    /// [`NetworkServiceChain::subscribe`] again to obtain a new `Receiver`.
381    ///
382    // TODO: consider not killing the background until the channel is destroyed, as that would be a more sensical behaviour
383    pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
384        let (tx, rx) = async_channel::bounded(128);
385
386        self.messages_tx
387            .send(ToBackgroundChain::Subscribe { sender: tx })
388            .await
389            .unwrap();
390
391        rx
392    }
393
394    /// Subscribes to the Bitswap events that happen on the network. Bitswap events subscription is
395    /// separate from other network service events, because Bitswap events are big and are not
396    /// needed by the most of subscribers.
397    ///
398    /// Note that this function is `async`, but it should return very quickly.
399    ///
400    /// The `Receiver` **must** be polled continuously. When the channel is full, the networking
401    /// connections will be back-pressured until the channel isn't full anymore.
402    ///
403    /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed.
404    /// If `None` is yielded and the [`NetworkService`] is still alive, you should call
405    /// [`NetworkServiceChain::subscribe_bitswap`] again to obtain a new `Receiver`.
406    ///
407    // TODO: the last section of the doc seem to contradict itself.
408    pub async fn subscribe_bitswap(&self) -> async_channel::Receiver<BitswapEvent> {
409        let (tx, rx) = async_channel::bounded(128);
410
411        self.messages_tx
412            .send(ToBackgroundChain::SubscribeBitswap { sender: tx })
413            .await
414            .unwrap();
415
416        rx
417    }
418
419    /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be
420    /// generated. Prevents a new gossip link with the same peer from being reopened for a
421    /// little while.
422    ///
423    /// `reason` is a human-readable string printed in the logs.
424    ///
425    /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the
426    /// reconnection was already happening as the call to this function is still being processed.
427    /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other
428    /// words, this function guarantees that we will be disconnected in the future rather than
429    /// guarantees that we will disconnect.
430    pub async fn ban_and_disconnect(
431        &self,
432        peer_id: PeerId,
433        severity: BanSeverity,
434        reason: &'static str,
435    ) {
436        let _ = self
437            .messages_tx
438            .send(ToBackgroundChain::DisconnectAndBan {
439                peer_id,
440                severity,
441                reason,
442            })
443            .await;
444    }
445
446    /// Sends a blocks request to the given peer.
447    // TODO: more docs
448    pub async fn blocks_request(
449        self: Arc<Self>,
450        target: PeerId,
451        config: codec::BlocksRequestConfig,
452        timeout: Duration,
453    ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
454        let (tx, rx) = oneshot::channel();
455
456        self.messages_tx
457            .send(ToBackgroundChain::StartBlocksRequest {
458                target: target.clone(),
459                config,
460                timeout,
461                result: tx,
462            })
463            .await
464            .unwrap();
465
466        rx.await.unwrap()
467    }
468
469    /// Sends a grandpa warp sync request to the given peer.
470    // TODO: more docs
471    pub async fn grandpa_warp_sync_request(
472        self: Arc<Self>,
473        target: PeerId,
474        begin_hash: [u8; 32],
475        timeout: Duration,
476    ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
477        let (tx, rx) = oneshot::channel();
478
479        self.messages_tx
480            .send(ToBackgroundChain::StartWarpSyncRequest {
481                target: target.clone(),
482                begin_hash,
483                timeout,
484                result: tx,
485            })
486            .await
487            .unwrap();
488
489        rx.await.unwrap()
490    }
491
492    pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
493        self.messages_tx
494            .send(ToBackgroundChain::SetLocalBestBlock {
495                best_hash,
496                best_number,
497            })
498            .await
499            .unwrap();
500    }
501
502    pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
503        self.messages_tx
504            .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
505            .await
506            .unwrap();
507    }
508
509    /// Sends a storage proof request to the given peer.
510    // TODO: more docs
511    pub async fn storage_proof_request(
512        self: Arc<Self>,
513        target: PeerId, // TODO: takes by value because of futures longevity issue
514        config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
515        timeout: Duration,
516    ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
517        let (tx, rx) = oneshot::channel();
518
519        self.messages_tx
520            .send(ToBackgroundChain::StartStorageProofRequest {
521                target: target.clone(),
522                config: codec::StorageProofRequestConfig {
523                    block_hash: config.block_hash,
524                    keys: config
525                        .keys
526                        .map(|key| key.as_ref().to_vec()) // TODO: to_vec() overhead
527                        .collect::<Vec<_>>()
528                        .into_iter(),
529                },
530                timeout,
531                result: tx,
532            })
533            .await
534            .unwrap();
535
536        rx.await.unwrap()
537    }
538
539    /// Sends a call proof request to the given peer.
540    ///
541    /// See also [`NetworkServiceChain::call_proof_request`].
542    // TODO: more docs
543    pub async fn call_proof_request(
544        self: Arc<Self>,
545        target: PeerId, // TODO: takes by value because of futures longevity issue
546        config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
547        timeout: Duration,
548    ) -> Result<EncodedMerkleProof, CallProofRequestError> {
549        let (tx, rx) = oneshot::channel();
550
551        self.messages_tx
552            .send(ToBackgroundChain::StartCallProofRequest {
553                target: target.clone(),
554                config: codec::CallProofRequestConfig {
555                    block_hash: config.block_hash,
556                    method: config.method.into_owned().into(),
557                    parameter_vectored: config
558                        .parameter_vectored
559                        .map(|v| v.as_ref().to_vec()) // TODO: to_vec() overhead
560                        .collect::<Vec<_>>()
561                        .into_iter(),
562                },
563                timeout,
564                result: tx,
565            })
566            .await
567            .unwrap();
568
569        rx.await.unwrap()
570    }
571
572    /// Sends a child storage proof request to the given peer.
573    pub async fn child_storage_proof_request(
574        self: Arc<Self>,
575        target: PeerId,
576        config: codec::ChildStorageProofRequestConfig<
577            impl AsRef<[u8]> + Clone,
578            impl Iterator<Item = impl AsRef<[u8]> + Clone>,
579        >,
580        timeout: Duration,
581    ) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
582        let (tx, rx) = oneshot::channel();
583
584        self.messages_tx
585            .send(ToBackgroundChain::StartChildStorageProofRequest {
586                target: target.clone(),
587                config: ChildStorageProofRequestConfigOwned {
588                    block_hash: config.block_hash,
589                    child_trie: config.child_trie.as_ref().to_vec(),
590                    keys: config
591                        .keys
592                        .map(|key| key.as_ref().to_vec())
593                        .collect::<Vec<_>>(),
594                },
595                timeout,
596                result: tx,
597            })
598            .await
599            .unwrap();
600
601        rx.await.unwrap()
602    }
603
604    /// Announces transaction to the peers we are connected to.
605    ///
606    /// Returns a list of peers that we have sent the transaction to. Can return an empty `Vec`
607    /// if we didn't send the transaction to any peer.
608    ///
609    /// Note that the remote doesn't confirm that it has received the transaction. Because
610    /// networking is inherently unreliable, successfully sending a transaction to a peer doesn't
611    /// necessarily mean that the remote has received it. In practice, however, the likelihood of
612    /// a transaction not being received are extremely low. This can be considered as known flaw.
613    pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
614        let (tx, rx) = oneshot::channel();
615
616        self.messages_tx
617            .send(ToBackgroundChain::AnnounceTransaction {
618                transaction: transaction.to_vec(), // TODO: ovheread
619                result: tx,
620            })
621            .await
622            .unwrap();
623
624        rx.await.unwrap()
625    }
626
627    /// See [`service::ChainNetwork::gossip_send_block_announce`].
628    pub async fn send_block_announce(
629        self: Arc<Self>,
630        target: &PeerId,
631        scale_encoded_header: &[u8],
632        is_best: bool,
633    ) -> Result<(), QueueNotificationError> {
634        let (tx, rx) = oneshot::channel();
635
636        self.messages_tx
637            .send(ToBackgroundChain::SendBlockAnnounce {
638                target: target.clone(),                              // TODO: overhead
639                scale_encoded_header: scale_encoded_header.to_vec(), // TODO: overhead
640                is_best,
641                result: tx,
642            })
643            .await
644            .unwrap();
645
646        rx.await.unwrap()
647    }
648
649    /// Send Bitswap message to the given peer.
650    pub async fn send_bitswap_message(
651        &self,
652        target: PeerId,
653        message: Vec<u8>,
654    ) -> Result<(), SendBitswapMessageError> {
655        let (tx, rx) = oneshot::channel();
656
657        self.messages_tx
658            .send(ToBackgroundChain::SendBitswapMessage {
659                target,
660                message,
661                result: tx,
662            })
663            .await
664            .unwrap();
665
666        rx.await.unwrap()
667    }
668
669    /// Broadcast Bitswap message to all [`service::ChainNetwork::established_bitswap_desired`]
670    /// peers.
671    ///
672    /// Returns the peers message was broadcast to or an error if the message cannot be sent
673    /// to at least one peer.
674    // TODO: better use a dedicated error type instead of reusing a lower-level
675    // `SendBitswapMessageErorr`.
676    pub async fn broadcast_bitswap_message(
677        &self,
678        message: Vec<u8>,
679    ) -> Result<Vec<PeerId>, SendBitswapMessageError> {
680        let (tx, rx) = oneshot::channel();
681
682        self.messages_tx
683            .send(ToBackgroundChain::BroadcastBitswapMessage {
684                message,
685                result: tx,
686            })
687            .await
688            .unwrap();
689
690        rx.await.unwrap()
691    }
692
693    /// Broadcast a statement notification to all gossip-connected peers.
694    pub async fn broadcast_statement(
695        self: Arc<Self>,
696        statement: Vec<u8>,
697    ) -> BroadcastStatementResult {
698        let (tx, rx) = oneshot::channel();
699
700        self.messages_tx
701            .send(ToBackgroundChain::BroadcastStatement {
702                statement,
703                result: tx,
704            })
705            .await
706            .unwrap();
707
708        rx.await.unwrap()
709    }
710
711    pub async fn update_topic_affinity(&self, filter: AffinityFilter) {
712        self.messages_tx
713            .send(ToBackgroundChain::UpdateTopicAffinity { filter })
714            .await
715            .unwrap();
716    }
717
718    /// Marks the given peers as belonging to the given chain, and adds some addresses to these
719    /// peers to the address book.
720    ///
721    /// The `important_nodes` parameter indicates whether these nodes are considered note-worthy
722    /// and should have additional logging.
723    pub async fn discover(
724        &self,
725        list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
726        important_nodes: bool,
727    ) {
728        self.messages_tx
729            .send(ToBackgroundChain::Discover {
730                // TODO: overhead
731                list: list
732                    .into_iter()
733                    .map(|(peer_id, addrs)| {
734                        (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
735                    })
736                    .collect::<Vec<_>>()
737                    .into_iter(),
738                important_nodes,
739            })
740            .await
741            .unwrap();
742    }
743
744    /// Returns a list of nodes (their [`PeerId`] and multiaddresses) that we know are part of
745    /// the network.
746    ///
747    /// Nodes that are discovered might disappear over time. In other words, there is no guarantee
748    /// that a node that has been added through [`NetworkServiceChain::discover`] will later be
749    /// returned by [`NetworkServiceChain::discovered_nodes`].
750    pub async fn discovered_nodes(
751        &self,
752    ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
753        let (tx, rx) = oneshot::channel();
754
755        self.messages_tx
756            .send(ToBackgroundChain::DiscoveredNodes { result: tx })
757            .await
758            .unwrap();
759
760        rx.await
761            .unwrap()
762            .into_iter()
763            .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
764    }
765
766    /// Returns an iterator to the list of [`PeerId`]s that we have an established connection
767    /// with.
768    pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
769        let (tx, rx) = oneshot::channel();
770        self.messages_tx
771            .send(ToBackgroundChain::PeersList { result: tx })
772            .await
773            .unwrap();
774        rx.await.unwrap().into_iter()
775    }
776}
777
778#[derive(Debug, Clone)]
779pub struct BroadcastStatementResult {
780    pub sent: usize,
781    pub total: usize,
782}
783
784/// Event that can happen on the network service.
785#[derive(Debug, Clone)]
786pub enum Event {
787    Connected {
788        peer_id: PeerId,
789        role: Role,
790        best_block_number: u64,
791        best_block_hash: [u8; 32],
792    },
793    Disconnected {
794        peer_id: PeerId,
795    },
796    BlockAnnounce {
797        peer_id: PeerId,
798        announce: service::EncodedBlockAnnounce,
799    },
800    GrandpaNeighborPacket {
801        peer_id: PeerId,
802        finalized_block_height: u64,
803    },
804    /// Received a GrandPa commit message from the network.
805    GrandpaCommitMessage {
806        peer_id: PeerId,
807        message: service::EncodedGrandpaCommitMessage,
808    },
809    /// Received a statement notification from the network.
810    StatementsNotification {
811        peer_id: PeerId,
812        statements: Vec<([u8; 32], codec::Statement)>,
813    },
814}
815
816/// Bitswap event that can be generated by the network service. Because Bitswap messages are big
817/// (up to 2 MiB) and can be delivered at high rate, we use a dedicated subscriber to not copy them
818/// to all network service subscribers.
819#[derive(Debug, Clone)]
820pub enum BitswapEvent {
821    BitswapMessage {
822        peer_id: PeerId,
823        message: service::EncodedBitswapMessage,
824    },
825}
826
827/// Error returned by [`NetworkServiceChain::blocks_request`].
828#[derive(Debug, derive_more::Display, derive_more::Error)]
829pub enum BlocksRequestError {
830    /// No established connection with the target.
831    NoConnection,
832    /// Error during the request.
833    #[display("{_0}")]
834    Request(service::BlocksRequestError),
835}
836
837/// Error returned by [`NetworkServiceChain::grandpa_warp_sync_request`].
838#[derive(Debug, derive_more::Display, derive_more::Error)]
839pub enum WarpSyncRequestError {
840    /// No established connection with the target.
841    NoConnection,
842    /// Error during the request.
843    #[display("{_0}")]
844    Request(service::GrandpaWarpSyncRequestError),
845}
846
847/// Error returned by [`NetworkServiceChain::storage_proof_request`].
848#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
849pub enum StorageProofRequestError {
850    /// No established connection with the target.
851    NoConnection,
852    /// Storage proof request is too large and can't be sent.
853    RequestTooLarge,
854    /// Error during the request.
855    #[display("{_0}")]
856    Request(service::StorageProofRequestError),
857}
858
859/// Error returned by [`NetworkServiceChain::call_proof_request`].
860#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
861pub enum CallProofRequestError {
862    /// No established connection with the target.
863    NoConnection,
864    /// Call proof request is too large and can't be sent.
865    RequestTooLarge,
866    /// Error during the request.
867    #[display("{_0}")]
868    Request(service::CallProofRequestError),
869}
870
871impl CallProofRequestError {
872    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
873    /// issue.
874    pub fn is_network_problem(&self) -> bool {
875        match self {
876            CallProofRequestError::Request(err) => err.is_network_problem(),
877            CallProofRequestError::RequestTooLarge => false,
878            CallProofRequestError::NoConnection => true,
879        }
880    }
881}
882
883/// Error returned by [`NetworkServiceChain::child_storage_proof_request`].
884#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
885pub enum ChildStorageProofRequestError {
886    /// No established connection with the target.
887    NoConnection,
888    /// Child storage proof request is too large and can't be sent.
889    RequestTooLarge,
890    /// Error during the request.
891    #[display("{_0}")]
892    Request(service::StorageProofRequestError),
893}
894
895impl ChildStorageProofRequestError {
896    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
897    /// issue.
898    pub fn is_network_problem(&self) -> bool {
899        match self {
900            ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
901            ChildStorageProofRequestError::RequestTooLarge => false,
902            ChildStorageProofRequestError::NoConnection => true,
903        }
904    }
905}
906
907/// Owned version of [`codec::ChildStorageProofRequestConfig`] for sending across channel.
908struct ChildStorageProofRequestConfigOwned {
909    block_hash: [u8; 32],
910    child_trie: Vec<u8>,
911    keys: Vec<Vec<u8>>,
912}
913
914enum ToBackground<TPlat: PlatformRef> {
915    AddChain {
916        messages_rx: async_channel::Receiver<ToBackgroundChain>,
917        config: service::ChainConfig<Chain<TPlat>>,
918    },
919}
920
921enum ToBackgroundChain {
922    RemoveChain,
923    Subscribe {
924        sender: async_channel::Sender<Event>,
925    },
926    SubscribeBitswap {
927        sender: async_channel::Sender<BitswapEvent>,
928    },
929    DisconnectAndBan {
930        peer_id: PeerId,
931        severity: BanSeverity,
932        reason: &'static str,
933    },
934    // TODO: serialize the request before sending over channel
935    StartBlocksRequest {
936        target: PeerId, // TODO: takes by value because of future longevity issue
937        config: codec::BlocksRequestConfig,
938        timeout: Duration,
939        result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
940    },
941    // TODO: serialize the request before sending over channel
942    StartWarpSyncRequest {
943        target: PeerId,
944        begin_hash: [u8; 32],
945        timeout: Duration,
946        result:
947            oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
948    },
949    // TODO: serialize the request before sending over channel
950    StartStorageProofRequest {
951        target: PeerId,
952        config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
953        timeout: Duration,
954        result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
955    },
956    // TODO: serialize the request before sending over channel
957    StartCallProofRequest {
958        target: PeerId, // TODO: takes by value because of futures longevity issue
959        config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
960        timeout: Duration,
961        result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
962    },
963    // TODO: serialize the request before sending over channel
964    StartChildStorageProofRequest {
965        target: PeerId,
966        config: ChildStorageProofRequestConfigOwned,
967        timeout: Duration,
968        result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
969    },
970    SetLocalBestBlock {
971        best_hash: [u8; 32],
972        best_number: u64,
973    },
974    SetLocalGrandpaState {
975        grandpa_state: service::GrandpaState,
976    },
977    AnnounceTransaction {
978        transaction: Vec<u8>,
979        result: oneshot::Sender<Vec<PeerId>>,
980    },
981    SendBlockAnnounce {
982        target: PeerId,
983        scale_encoded_header: Vec<u8>,
984        is_best: bool,
985        result: oneshot::Sender<Result<(), QueueNotificationError>>,
986    },
987    SendBitswapMessage {
988        target: PeerId,
989        message: Vec<u8>,
990        result: oneshot::Sender<Result<(), SendBitswapMessageError>>,
991    },
992    BroadcastBitswapMessage {
993        message: Vec<u8>,
994        result: oneshot::Sender<Result<Vec<PeerId>, SendBitswapMessageError>>,
995    },
996    BroadcastStatement {
997        statement: Vec<u8>,
998        result: oneshot::Sender<BroadcastStatementResult>,
999    },
1000    UpdateTopicAffinity {
1001        filter: AffinityFilter,
1002    },
1003    Discover {
1004        list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
1005        important_nodes: bool,
1006    },
1007    DiscoveredNodes {
1008        result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
1009    },
1010    PeersList {
1011        result: oneshot::Sender<Vec<PeerId>>,
1012    },
1013}
1014
1015struct BackgroundTask<TPlat: PlatformRef> {
1016    /// See [`Config::platform`].
1017    platform: TPlat,
1018
1019    /// Random number generator.
1020    randomness: rand_chacha::ChaCha20Rng,
1021
1022    /// Value provided through [`Config::identify_agent_version`].
1023    identify_agent_version: String,
1024
1025    /// Channel to send messages to the background task.
1026    tasks_messages_tx:
1027        async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
1028
1029    /// Channel to receive messages destined to the background task.
1030    tasks_messages_rx: Pin<
1031        Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
1032    >,
1033
1034    /// Data structure holding the entire state of the networking.
1035    network: service::ChainNetwork<
1036        Chain<TPlat>,
1037        async_channel::Sender<service::CoordinatorToConnection>,
1038        TPlat::Instant,
1039    >,
1040
1041    /// All known peers and their addresses.
1042    peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
1043
1044    /// Bitswap slot assignment strategy.
1045    bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy<TPlat::Instant>,
1046
1047    /// See [`Config::connections_open_pool_size`].
1048    connections_open_pool_size: u32,
1049
1050    /// See [`Config::connections_open_pool_restore_delay`].
1051    connections_open_pool_restore_delay: Duration,
1052
1053    /// Every time a connection is opened, the value in this field is increased by one. After
1054    /// [`BackgroundTask::next_recent_connection_restore`] has yielded, the value is reduced by
1055    /// one.
1056    num_recent_connection_opening: u32,
1057
1058    /// Delay after which [`BackgroundTask::num_recent_connection_opening`] is increased by one.
1059    next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
1060
1061    /// List of all open gossip links.
1062    // TODO: using this data structure unfortunately means that PeerIds are cloned a lot, maybe some user data in ChainNetwork is better? not sure
1063    open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
1064
1065    /// Chains for which a gossip link has been opened at least once. Used to prefer bootnodes for
1066    /// out slots only until the chain first connects.
1067    chains_ever_gossip_connected: HashSet<ChainId, fnv::FnvBuildHasher>,
1068
1069    /// Connected peers using statement protocol V2, per chain.
1070    v2_statement_peers: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1071
1072    /// Current topic affinity filter per chain, sent to V2 peers on connect.
1073    current_affinity_filter: HashMap<ChainId, AffinityFilter, fnv::FnvBuildHasher>,
1074
1075    /// Important nodes per chain (in practice the bootnodes; see [`NetworkServiceChain::discover`]).
1076    /// They get extra logging, and slot preference until the chain first connects.
1077    // TODO: should also detect whenever we fail to open a block announces substream with any of these peers
1078    important_nodes: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1079
1080    /// Event about to be sent on the senders of [`BackgroundTask::event_senders`].
1081    event_pending_send: Option<(ChainId, Event)>,
1082
1083    /// Bitswap event about to be sent on the senders of [`BackgroundTask::bitswap_event_senders`].
1084    bitswap_event_pending_send: Option<BitswapEvent>,
1085
1086    /// Running count of peers with an open Bitswap substream. Maintained from
1087    /// `service::Event::BitswapConnected` / `BitswapDisconnected`. Used for diagnostic logging
1088    /// only; the authoritative per-peer state lives in
1089    /// [`BackgroundTask::bitswap_peering_strategy`].
1090    bitswap_connected_peers: usize,
1091
1092    /// Sending events through the public API.
1093    ///
1094    /// Contains either senders, or a `Future` that is currently sending an event and will yield
1095    /// the senders back once it is finished.
1096    // TODO: sort by ChainId instead of using a Vec?
1097    event_senders: either::Either<
1098        Vec<(ChainId, async_channel::Sender<Event>)>,
1099        Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
1100    >,
1101
1102    /// Whenever [`NetworkServiceChain::subscribe`] is called, the new sender is added to this list.
1103    /// Once [`BackgroundTask::event_senders`] is ready, we properly initialize these senders.
1104    pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
1105
1106    /// Sending Bitswap events through the public API. We use separate channels for Bitswap events,
1107    /// because Bitswap messages are big and only few of event subscribers are interested in them.
1108    ///
1109    /// Contains either senders, or a `Future` that is currently sending an event and will yield
1110    /// the senders back once it is finished.
1111    ///
1112    /// Note that compared to `event_senders`, `bitswap_event_senders` are not associated with
1113    /// chains, because Bitswap messages coming from the network do not have the information about
1114    /// what chain they are coming from.
1115    bitswap_event_senders: either::Either<
1116        Vec<async_channel::Sender<BitswapEvent>>,
1117        Pin<Box<dyn Future<Output = Vec<async_channel::Sender<BitswapEvent>>> + Send>>,
1118    >,
1119
1120    /// Whenever [`NetworkServiceChain::subscribe_bitswap`] is called, the new sender is added to
1121    /// this list. Once [`BackgroundTask::bitswap_event_senders`] is ready, we properly initialize
1122    /// these senders.
1123    pending_new_bitswap_subscriptions: Vec<async_channel::Sender<BitswapEvent>>,
1124
1125    main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
1126
1127    messages_rx:
1128        stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
1129
1130    blocks_requests: HashMap<
1131        service::SubstreamId,
1132        oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
1133        fnv::FnvBuildHasher,
1134    >,
1135
1136    grandpa_warp_sync_requests: HashMap<
1137        service::SubstreamId,
1138        oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
1139        fnv::FnvBuildHasher,
1140    >,
1141
1142    storage_proof_requests: HashMap<
1143        service::SubstreamId,
1144        oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
1145        fnv::FnvBuildHasher,
1146    >,
1147
1148    call_proof_requests: HashMap<
1149        service::SubstreamId,
1150        oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
1151        fnv::FnvBuildHasher,
1152    >,
1153
1154    child_storage_proof_requests: HashMap<
1155        service::SubstreamId,
1156        oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
1157        fnv::FnvBuildHasher,
1158    >,
1159
1160    /// All chains, indexed by the value of [`Chain::next_discovery_when`].
1161    chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
1162}
1163
1164struct Chain<TPlat: PlatformRef> {
1165    log_name: String,
1166
1167    // TODO: this field is a hack due to the fact that `add_chain` can't be `async`; should eventually be fixed after a lib.rs refactor
1168    num_references: NonZero<usize>,
1169
1170    /// See [`ConfigChain::block_number_bytes`].
1171    // TODO: redundant with ChainNetwork? since we might not need to know this in the future i'm reluctant to add a getter to ChainNetwork
1172    block_number_bytes: usize,
1173
1174    /// See [`ConfigChain::num_out_slots`].
1175    num_out_slots: usize,
1176
1177    /// When the next discovery should be started for this chain.
1178    next_discovery_when: TPlat::Instant,
1179
1180    /// After [`Chain::next_discovery_when`] is reached, the following discovery happens after
1181    /// the given duration.
1182    next_discovery_period: Duration,
1183}
1184
1185#[derive(Clone)]
1186struct OpenGossipLinkState {
1187    role: Role,
1188    best_block_number: u64,
1189    best_block_hash: [u8; 32],
1190    /// `None` if unknown.
1191    finalized_block_height: Option<u64>,
1192}
1193
1194async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
1195    loop {
1196        // Yield at every loop in order to provide better tasks granularity.
1197        futures_lite::future::yield_now().await;
1198
1199        enum WakeUpReason<TPlat: PlatformRef> {
1200            ForegroundClosed,
1201            Message(ToBackground<TPlat>),
1202            MessageForChain(ChainId, ToBackgroundChain),
1203            NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
1204            CanAssignSlot(PeerId, ChainId),
1205            CanAssignBitswapSlot(PeerId),
1206            NextRecentConnectionRestore,
1207            CanStartConnect(PeerId),
1208            CanOpenGossip(PeerId, ChainId),
1209            CanOpenBitswap(PeerId),
1210            MessageFromConnection {
1211                connection_id: service::ConnectionId,
1212                message: service::ConnectionToCoordinator,
1213            },
1214            MessageToConnection {
1215                connection_id: service::ConnectionId,
1216                message: service::CoordinatorToConnection,
1217            },
1218            EventSendersReady,
1219            BitswapEventSendersReady,
1220            StartDiscovery(ChainId),
1221        }
1222
1223        let wake_up_reason = {
1224            let message_received = async {
1225                task.main_messages_rx
1226                    .next()
1227                    .await
1228                    .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
1229            };
1230            let message_for_chain_received = async {
1231                // Note that when the last entry of `messages_rx` yields `None`, `messages_rx`
1232                // itself will yield `None`. For this reason, we can't use
1233                // `task.messages_rx.is_empty()` to determine whether `messages_rx` will
1234                // yield `None`.
1235                let Some((chain_id, message)) = task.messages_rx.next().await else {
1236                    future::pending().await
1237                };
1238                WakeUpReason::MessageForChain(chain_id, message)
1239            };
1240            let message_from_task_received = async {
1241                let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
1242                WakeUpReason::MessageFromConnection {
1243                    connection_id,
1244                    message,
1245                }
1246            };
1247            let service_event = async {
1248                if let Some(event) = (task.event_pending_send.is_none()
1249                    && task.bitswap_event_pending_send.is_none()
1250                    && task.pending_new_subscriptions.is_empty()
1251                    && task.pending_new_bitswap_subscriptions.is_empty())
1252                .then(|| task.network.next_event())
1253                .flatten()
1254                {
1255                    WakeUpReason::NetworkEvent(event)
1256                } else if let Some(start_connect) = {
1257                    let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
1258                        .then(|| {
1259                            task.network
1260                                .unconnected_desired()
1261                                .choose(&mut task.randomness)
1262                                .cloned()
1263                        })
1264                        .flatten();
1265                    x
1266                } {
1267                    WakeUpReason::CanStartConnect(start_connect)
1268                } else if let Some((peer_id, chain_id)) = {
1269                    let x = task
1270                        .network
1271                        .connected_unopened_gossip_desired()
1272                        .choose(&mut task.randomness)
1273                        .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
1274                    x
1275                } {
1276                    WakeUpReason::CanOpenGossip(peer_id, chain_id)
1277                } else if let Some(peer_id) = {
1278                    let x = task
1279                        .network
1280                        .connected_unopened_bitswap_desired()
1281                        .choose(&mut task.randomness)
1282                        .cloned();
1283                    x
1284                } {
1285                    WakeUpReason::CanOpenBitswap(peer_id)
1286                } else if let Some((connection_id, message)) =
1287                    task.network.pull_message_to_connection()
1288                {
1289                    WakeUpReason::MessageToConnection {
1290                        connection_id,
1291                        message,
1292                    }
1293                } else {
1294                    'search: loop {
1295                        let mut earlier_unban = None;
1296
1297                        for chain_id in task.network.chains().collect::<Vec<_>>() {
1298                            if task.network.gossip_desired_num(
1299                                chain_id,
1300                                service::GossipKind::ConsensusTransactions,
1301                            ) >= task.network[chain_id].num_out_slots
1302                            {
1303                                continue;
1304                            }
1305
1306                            let now = task.platform.now();
1307
1308                            // Until the chain first connects, prefer slots for important nodes
1309                            // (the bootnodes); otherwise use the general pool.
1310                            if !task.chains_ever_gossip_connected.contains(&chain_id) {
1311                                if let basic_peering_strategy::AssignablePeer::Assignable(peer_id) =
1312                                    task.peering_strategy.pick_assignable_peer_filtered(
1313                                        &chain_id,
1314                                        &now,
1315                                        |peer_id| {
1316                                            task.important_nodes
1317                                                .get(&chain_id)
1318                                                .map_or(false, |nodes| nodes.contains(peer_id))
1319                                        },
1320                                    )
1321                                {
1322                                    break 'search WakeUpReason::CanAssignSlot(
1323                                        peer_id.clone(),
1324                                        chain_id,
1325                                    );
1326                                }
1327                            }
1328
1329                            match task.peering_strategy.pick_assignable_peer(&chain_id, &now) {
1330                                basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1331                                    break 'search WakeUpReason::CanAssignSlot(
1332                                        peer_id.clone(),
1333                                        chain_id,
1334                                    );
1335                                }
1336                                basic_peering_strategy::AssignablePeer::AllPeersBanned {
1337                                    next_unban,
1338                                } => {
1339                                    if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1340                                        earlier_unban = Some(next_unban.clone());
1341                                    }
1342                                }
1343                                basic_peering_strategy::AssignablePeer::NoPeer => continue,
1344                            }
1345                        }
1346
1347                        match task
1348                            .bitswap_peering_strategy
1349                            .pick_assignable_peer(&task.platform.now())
1350                        {
1351                            bitswap_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1352                                break 'search WakeUpReason::CanAssignBitswapSlot(peer_id.clone());
1353                            }
1354                            bitswap_peering_strategy::AssignablePeer::AllPeersBanned {
1355                                next_unban,
1356                            } => {
1357                                if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1358                                    earlier_unban = Some(next_unban.clone());
1359                                }
1360                            }
1361                            bitswap_peering_strategy::AssignablePeer::NoPeer => {}
1362                        }
1363
1364                        if let Some(earlier_unban) = earlier_unban {
1365                            task.platform.sleep_until(earlier_unban).await;
1366                        } else {
1367                            future::pending::<()>().await;
1368                        }
1369                    }
1370                }
1371            };
1372            let next_recent_connection_restore = async {
1373                if task.num_recent_connection_opening != 0
1374                    && task.next_recent_connection_restore.is_none()
1375                {
1376                    task.next_recent_connection_restore = Some(Box::pin(
1377                        task.platform
1378                            .sleep(task.connections_open_pool_restore_delay),
1379                    ));
1380                }
1381                if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1382                    delay.await;
1383                    task.next_recent_connection_restore = None;
1384                    WakeUpReason::NextRecentConnectionRestore
1385                } else {
1386                    future::pending().await
1387                }
1388            };
1389            let finished_sending_event = async {
1390                if let either::Right(event_sending_future) = &mut task.event_senders {
1391                    let event_senders = event_sending_future.await;
1392                    task.event_senders = either::Left(event_senders);
1393                    WakeUpReason::EventSendersReady
1394                } else if task.event_pending_send.is_some()
1395                    || !task.pending_new_subscriptions.is_empty()
1396                {
1397                    WakeUpReason::EventSendersReady
1398                } else {
1399                    future::pending().await
1400                }
1401            };
1402            let finished_sending_bitswap_event = async {
1403                if let either::Right(bitswap_event_sending_future) = &mut task.bitswap_event_senders
1404                {
1405                    let bitswap_event_senders = bitswap_event_sending_future.await;
1406                    task.bitswap_event_senders = either::Left(bitswap_event_senders);
1407                    WakeUpReason::BitswapEventSendersReady
1408                } else if task.bitswap_event_pending_send.is_some()
1409                    || !task.pending_new_bitswap_subscriptions.is_empty()
1410                {
1411                    WakeUpReason::BitswapEventSendersReady
1412                } else {
1413                    future::pending().await
1414                }
1415            };
1416            let start_discovery = async {
1417                let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1418                    future::pending().await
1419                };
1420                next_discovery.get_mut().await;
1421                let ((_, chain_id), _) = next_discovery.remove_entry();
1422                WakeUpReason::StartDiscovery(chain_id)
1423            };
1424
1425            message_for_chain_received
1426                .or(message_received)
1427                .or(message_from_task_received)
1428                .or(service_event)
1429                .or(next_recent_connection_restore)
1430                .or(finished_sending_event)
1431                .or(finished_sending_bitswap_event)
1432                .or(start_discovery)
1433                .await
1434        };
1435
1436        match wake_up_reason {
1437            WakeUpReason::ForegroundClosed => {
1438                // End the task.
1439                return;
1440            }
1441            WakeUpReason::Message(ToBackground::AddChain {
1442                messages_rx,
1443                config,
1444            }) => {
1445                // TODO: this is not a completely clean way of handling duplicate chains, because the existing chain might have a different best block and role and all ; also, multiple sync services will call set_best_block and set_finalized_block
1446                let chain_id = match task.network.add_chain(config) {
1447                    Ok(id) => id,
1448                    Err(service::AddChainError::Duplicate { existing_identical }) => {
1449                        task.network[existing_identical].num_references = task.network
1450                            [existing_identical]
1451                            .num_references
1452                            .checked_add(1)
1453                            .unwrap();
1454                        existing_identical
1455                    }
1456                };
1457
1458                task.chains_by_next_discovery.insert(
1459                    (task.network[chain_id].next_discovery_when.clone(), chain_id),
1460                    Box::pin(
1461                        task.platform
1462                            .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1463                    ),
1464                );
1465
1466                task.messages_rx
1467                    .push(Box::pin(
1468                        messages_rx
1469                            .map(move |msg| (chain_id, msg))
1470                            .chain(stream::once(future::ready((
1471                                chain_id,
1472                                ToBackgroundChain::RemoveChain,
1473                            )))),
1474                    ) as Pin<Box<_>>);
1475
1476                log!(
1477                    &task.platform,
1478                    Debug,
1479                    "network",
1480                    "chain-added",
1481                    id = task.network[chain_id].log_name
1482                );
1483            }
1484            WakeUpReason::EventSendersReady => {
1485                // Dispatch the pending event, if any, to the various senders.
1486
1487                // We made sure that the senders were ready before generating an event.
1488                let either::Left(event_senders) = &mut task.event_senders else {
1489                    unreachable!()
1490                };
1491
1492                if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1493                    task.event_pending_send.take()
1494                {
1495                    let mut event_senders = mem::take(event_senders);
1496                    task.event_senders = either::Right(Box::pin(async move {
1497                        // Elements in `event_senders` are removed one by one and inserted
1498                        // back if the channel is still open.
1499                        for index in (0..event_senders.len()).rev() {
1500                            let (event_sender_chain_id, event_sender) =
1501                                event_senders.swap_remove(index);
1502                            if event_sender_chain_id == event_to_dispatch_chain_id {
1503                                if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1504                                    continue;
1505                                }
1506                            }
1507                            event_senders.push((event_sender_chain_id, event_sender));
1508                        }
1509                        event_senders
1510                    }));
1511                } else if !task.pending_new_subscriptions.is_empty() {
1512                    let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1513                    let mut event_senders = mem::take(event_senders);
1514                    // TODO: cloning :-/
1515                    let open_gossip_links = task.open_gossip_links.clone();
1516                    task.event_senders = either::Right(Box::pin(async move {
1517                        for (chain_id, new_subscription) in pending_new_subscriptions {
1518                            for ((link_chain_id, peer_id), state) in &open_gossip_links {
1519                                // TODO: optimize? this is O(n) by chain
1520                                if *link_chain_id != chain_id {
1521                                    continue;
1522                                }
1523
1524                                let _ = new_subscription
1525                                    .send(Event::Connected {
1526                                        peer_id: peer_id.clone(),
1527                                        role: state.role,
1528                                        best_block_number: state.best_block_number,
1529                                        best_block_hash: state.best_block_hash,
1530                                    })
1531                                    .await;
1532
1533                                if let Some(finalized_block_height) = state.finalized_block_height {
1534                                    let _ = new_subscription
1535                                        .send(Event::GrandpaNeighborPacket {
1536                                            peer_id: peer_id.clone(),
1537                                            finalized_block_height,
1538                                        })
1539                                        .await;
1540                                }
1541                            }
1542
1543                            event_senders.push((chain_id, new_subscription));
1544                        }
1545
1546                        event_senders
1547                    }));
1548                }
1549            }
1550            WakeUpReason::BitswapEventSendersReady => {
1551                // We made sure that the senders were ready before generating an event.
1552                let either::Left(bitswap_event_senders) = &mut task.bitswap_event_senders else {
1553                    unreachable!()
1554                };
1555
1556                if let Some(event_to_dispatch) = task.bitswap_event_pending_send.take() {
1557                    let mut bitswap_event_senders = mem::take(bitswap_event_senders);
1558                    task.bitswap_event_senders = either::Right(Box::pin(async move {
1559                        // Elements in `bitswap_event_senders` are removed one by one and
1560                        // inserted back if the channel is still open.
1561                        for index in (0..bitswap_event_senders.len()).rev() {
1562                            let event_sender = bitswap_event_senders.swap_remove(index);
1563                            if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1564                                continue;
1565                            }
1566                            bitswap_event_senders.push(event_sender);
1567                        }
1568                        bitswap_event_senders
1569                    }));
1570                } else if !task.pending_new_bitswap_subscriptions.is_empty() {
1571                    bitswap_event_senders.append(&mut task.pending_new_bitswap_subscriptions);
1572                }
1573            }
1574            WakeUpReason::MessageFromConnection {
1575                connection_id,
1576                message,
1577            } => {
1578                task.network
1579                    .inject_connection_message(connection_id, message);
1580            }
1581            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1582                if let Some(new_ref) =
1583                    NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1584                {
1585                    task.network[chain_id].num_references = new_ref;
1586                    continue;
1587                }
1588
1589                for peer_id in task
1590                    .network
1591                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1592                    .cloned()
1593                    .collect::<Vec<_>>()
1594                {
1595                    task.network
1596                        .gossip_close(
1597                            chain_id,
1598                            &peer_id,
1599                            service::GossipKind::ConsensusTransactions,
1600                        )
1601                        .unwrap();
1602
1603                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1604                    debug_assert!(_was_in.is_some());
1605                }
1606
1607                let _was_in = task
1608                    .chains_by_next_discovery
1609                    .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1610                debug_assert!(_was_in.is_some());
1611
1612                log!(
1613                    &task.platform,
1614                    Debug,
1615                    "network",
1616                    "chain-removed",
1617                    id = task.network[chain_id].log_name
1618                );
1619                task.v2_statement_peers.remove(&chain_id);
1620                task.current_affinity_filter.remove(&chain_id);
1621                task.important_nodes.remove(&chain_id);
1622                task.chains_ever_gossip_connected.remove(&chain_id);
1623                task.network.remove_chain(chain_id).unwrap();
1624                task.peering_strategy.remove_chain_peers(&chain_id);
1625            }
1626            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1627                task.pending_new_subscriptions.push((chain_id, sender));
1628            }
1629            WakeUpReason::MessageForChain(
1630                _chain_id,
1631                ToBackgroundChain::SubscribeBitswap { sender },
1632            ) => {
1633                task.pending_new_bitswap_subscriptions.push(sender);
1634            }
1635            WakeUpReason::MessageForChain(
1636                chain_id,
1637                ToBackgroundChain::DisconnectAndBan {
1638                    peer_id,
1639                    severity,
1640                    reason,
1641                },
1642            ) => {
1643                let ban_duration = Duration::from_secs(match severity {
1644                    BanSeverity::Low => 10,
1645                    BanSeverity::High => 40,
1646                });
1647
1648                let had_slot = matches!(
1649                    task.peering_strategy.unassign_slot_and_ban(
1650                        &chain_id,
1651                        &peer_id,
1652                        task.platform.now() + ban_duration,
1653                    ),
1654                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1655                );
1656
1657                if had_slot {
1658                    log!(
1659                        &task.platform,
1660                        Debug,
1661                        "network",
1662                        "slot-unassigned",
1663                        chain = &task.network[chain_id].log_name,
1664                        peer_id,
1665                        ?ban_duration,
1666                        reason = "user-ban",
1667                        user_reason = reason
1668                    );
1669                    task.network.gossip_remove_desired(
1670                        chain_id,
1671                        &peer_id,
1672                        service::GossipKind::ConsensusTransactions,
1673                    );
1674                }
1675
1676                if task.network.gossip_is_connected(
1677                    chain_id,
1678                    &peer_id,
1679                    service::GossipKind::ConsensusTransactions,
1680                ) {
1681                    let _closed_result = task.network.gossip_close(
1682                        chain_id,
1683                        &peer_id,
1684                        service::GossipKind::ConsensusTransactions,
1685                    );
1686                    debug_assert!(_closed_result.is_ok());
1687
1688                    log!(
1689                        &task.platform,
1690                        Debug,
1691                        "network",
1692                        "gossip-closed",
1693                        chain = &task.network[chain_id].log_name,
1694                        peer_id,
1695                    );
1696
1697                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1698                    debug_assert!(_was_in.is_some());
1699
1700                    if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
1701                        peers.remove(&peer_id);
1702                    }
1703
1704                    debug_assert!(task.event_pending_send.is_none());
1705                    task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1706                }
1707            }
1708            WakeUpReason::MessageForChain(
1709                chain_id,
1710                ToBackgroundChain::StartBlocksRequest {
1711                    target,
1712                    config,
1713                    timeout,
1714                    result,
1715                },
1716            ) => {
1717                match &config.start {
1718                    codec::BlocksRequestConfigStart::Hash(hash) => {
1719                        log!(
1720                            &task.platform,
1721                            Debug,
1722                            "network",
1723                            "blocks-request-started",
1724                            chain = task.network[chain_id].log_name, target,
1725                            start = HashDisplay(hash),
1726                            num = config.desired_count.get(),
1727                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1728                            header = ?config.fields.header, body = ?config.fields.body,
1729                            justifications = ?config.fields.justifications
1730                        );
1731                    }
1732                    codec::BlocksRequestConfigStart::Number(number) => {
1733                        log!(
1734                            &task.platform,
1735                            Debug,
1736                            "network",
1737                            "blocks-request-started",
1738                            chain = task.network[chain_id].log_name, target, start = number,
1739                            num = config.desired_count.get(),
1740                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1741                            header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1742                        );
1743                    }
1744                }
1745
1746                match task
1747                    .network
1748                    .start_blocks_request(&target, chain_id, config.clone(), timeout)
1749                {
1750                    Ok(substream_id) => {
1751                        task.blocks_requests.insert(substream_id, result);
1752                    }
1753                    Err(service::StartRequestError::NoConnection) => {
1754                        log!(
1755                            &task.platform,
1756                            Debug,
1757                            "network",
1758                            "blocks-request-error",
1759                            chain = task.network[chain_id].log_name,
1760                            target,
1761                            error = "NoConnection"
1762                        );
1763                        let _ = result.send(Err(BlocksRequestError::NoConnection));
1764                    }
1765                }
1766            }
1767            WakeUpReason::MessageForChain(
1768                chain_id,
1769                ToBackgroundChain::StartWarpSyncRequest {
1770                    target,
1771                    begin_hash,
1772                    timeout,
1773                    result,
1774                },
1775            ) => {
1776                log!(
1777                    &task.platform,
1778                    Debug,
1779                    "network",
1780                    "warp-sync-request-started",
1781                    chain = task.network[chain_id].log_name,
1782                    target,
1783                    start = HashDisplay(&begin_hash)
1784                );
1785
1786                match task
1787                    .network
1788                    .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1789                {
1790                    Ok(substream_id) => {
1791                        task.grandpa_warp_sync_requests.insert(substream_id, result);
1792                    }
1793                    Err(service::StartRequestError::NoConnection) => {
1794                        log!(
1795                            &task.platform,
1796                            Debug,
1797                            "network",
1798                            "warp-sync-request-error",
1799                            chain = task.network[chain_id].log_name,
1800                            target,
1801                            error = "NoConnection"
1802                        );
1803                        let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1804                    }
1805                }
1806            }
1807            WakeUpReason::MessageForChain(
1808                chain_id,
1809                ToBackgroundChain::StartStorageProofRequest {
1810                    target,
1811                    config,
1812                    timeout,
1813                    result,
1814                },
1815            ) => {
1816                log!(
1817                    &task.platform,
1818                    Debug,
1819                    "network",
1820                    "storage-proof-request-started",
1821                    chain = task.network[chain_id].log_name,
1822                    target,
1823                    block_hash = HashDisplay(&config.block_hash)
1824                );
1825
1826                match task.network.start_storage_proof_request(
1827                    &target,
1828                    chain_id,
1829                    config.clone(),
1830                    timeout,
1831                ) {
1832                    Ok(substream_id) => {
1833                        task.storage_proof_requests.insert(substream_id, result);
1834                    }
1835                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1836                        log!(
1837                            &task.platform,
1838                            Debug,
1839                            "network",
1840                            "storage-proof-request-error",
1841                            chain = task.network[chain_id].log_name,
1842                            target,
1843                            error = "NoConnection"
1844                        );
1845                        let _ = result.send(Err(StorageProofRequestError::NoConnection));
1846                    }
1847                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1848                        log!(
1849                            &task.platform,
1850                            Debug,
1851                            "network",
1852                            "storage-proof-request-error",
1853                            chain = task.network[chain_id].log_name,
1854                            target,
1855                            error = "RequestTooLarge"
1856                        );
1857                        let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1858                    }
1859                };
1860            }
1861            WakeUpReason::MessageForChain(
1862                chain_id,
1863                ToBackgroundChain::StartCallProofRequest {
1864                    target,
1865                    config,
1866                    timeout,
1867                    result,
1868                },
1869            ) => {
1870                log!(
1871                    &task.platform,
1872                    Debug,
1873                    "network",
1874                    "call-proof-request-started",
1875                    chain = task.network[chain_id].log_name,
1876                    target,
1877                    block_hash = HashDisplay(&config.block_hash),
1878                    function = config.method
1879                );
1880                // TODO: log parameter
1881
1882                match task.network.start_call_proof_request(
1883                    &target,
1884                    chain_id,
1885                    config.clone(),
1886                    timeout,
1887                ) {
1888                    Ok(substream_id) => {
1889                        task.call_proof_requests.insert(substream_id, result);
1890                    }
1891                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1892                        log!(
1893                            &task.platform,
1894                            Debug,
1895                            "network",
1896                            "call-proof-request-error",
1897                            chain = task.network[chain_id].log_name,
1898                            target,
1899                            error = "NoConnection"
1900                        );
1901                        let _ = result.send(Err(CallProofRequestError::NoConnection));
1902                    }
1903                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1904                        log!(
1905                            &task.platform,
1906                            Debug,
1907                            "network",
1908                            "call-proof-request-error",
1909                            chain = task.network[chain_id].log_name,
1910                            target,
1911                            error = "RequestTooLarge"
1912                        );
1913                        let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1914                    }
1915                };
1916            }
1917            WakeUpReason::MessageForChain(
1918                chain_id,
1919                ToBackgroundChain::StartChildStorageProofRequest {
1920                    target,
1921                    config,
1922                    timeout,
1923                    result,
1924                },
1925            ) => {
1926                log!(
1927                    &task.platform,
1928                    Debug,
1929                    "network",
1930                    "child-storage-proof-request-started",
1931                    chain = task.network[chain_id].log_name,
1932                    target,
1933                    block_hash = HashDisplay(&config.block_hash)
1934                );
1935
1936                match task.network.start_child_storage_proof_request(
1937                    &target,
1938                    chain_id,
1939                    codec::ChildStorageProofRequestConfig {
1940                        block_hash: config.block_hash,
1941                        child_trie: &config.child_trie,
1942                        keys: config.keys.iter().map(|k| k.as_slice()),
1943                    },
1944                    timeout,
1945                ) {
1946                    Ok(substream_id) => {
1947                        task.child_storage_proof_requests
1948                            .insert(substream_id, result);
1949                    }
1950                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1951                        log!(
1952                            &task.platform,
1953                            Debug,
1954                            "network",
1955                            "child-storage-proof-request-error",
1956                            chain = task.network[chain_id].log_name,
1957                            target,
1958                            error = "NoConnection"
1959                        );
1960                        let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
1961                    }
1962                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1963                        log!(
1964                            &task.platform,
1965                            Debug,
1966                            "network",
1967                            "child-storage-proof-request-error",
1968                            chain = task.network[chain_id].log_name,
1969                            target,
1970                            error = "RequestTooLarge"
1971                        );
1972                        let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
1973                    }
1974                };
1975            }
1976            WakeUpReason::MessageForChain(
1977                chain_id,
1978                ToBackgroundChain::SetLocalBestBlock {
1979                    best_hash,
1980                    best_number,
1981                },
1982            ) => {
1983                task.network
1984                    .set_chain_local_best_block(chain_id, best_hash, best_number);
1985            }
1986            WakeUpReason::MessageForChain(
1987                chain_id,
1988                ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1989            ) => {
1990                log!(
1991                    &task.platform,
1992                    Debug,
1993                    "network",
1994                    "local-grandpa-state-announced",
1995                    chain = task.network[chain_id].log_name,
1996                    set_id = grandpa_state.set_id,
1997                    commit_finalized_height = grandpa_state.commit_finalized_height,
1998                );
1999
2000                // TODO: log the list of peers we sent the packet to
2001
2002                task.network
2003                    .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
2004            }
2005            WakeUpReason::MessageForChain(
2006                chain_id,
2007                ToBackgroundChain::AnnounceTransaction {
2008                    transaction,
2009                    result,
2010                },
2011            ) => {
2012                // TODO: keep track of which peer knows about which transaction, and don't send it again
2013
2014                let peers_to_send = task
2015                    .network
2016                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2017                    .cloned()
2018                    .collect::<Vec<_>>();
2019
2020                let mut peers_sent = Vec::with_capacity(peers_to_send.len());
2021                let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
2022                for peer in &peers_to_send {
2023                    match task
2024                        .network
2025                        .gossip_send_transaction(peer, chain_id, &transaction)
2026                    {
2027                        Ok(()) => peers_sent.push(peer.to_base58()),
2028                        Err(QueueNotificationError::QueueFull) => {
2029                            peers_queue_full.push(peer.to_base58())
2030                        }
2031                        Err(QueueNotificationError::NoConnection) => unreachable!(),
2032                    }
2033                }
2034
2035                log!(
2036                    &task.platform,
2037                    Debug,
2038                    "network",
2039                    "transaction-announced",
2040                    chain = task.network[chain_id].log_name,
2041                    transaction =
2042                        hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
2043                    size = transaction.len(),
2044                    peers_sent = peers_sent.join(", "),
2045                    peers_queue_full = peers_queue_full.join(", "),
2046                );
2047
2048                let _ = result.send(peers_to_send);
2049            }
2050            WakeUpReason::MessageForChain(
2051                chain_id,
2052                ToBackgroundChain::SendBlockAnnounce {
2053                    target,
2054                    scale_encoded_header,
2055                    is_best,
2056                    result,
2057                },
2058            ) => {
2059                // TODO: log who the announce was sent to
2060                let _ = result.send(task.network.gossip_send_block_announce(
2061                    &target,
2062                    chain_id,
2063                    &scale_encoded_header,
2064                    is_best,
2065                ));
2066            }
2067            WakeUpReason::MessageForChain(
2068                _chain_id,
2069                ToBackgroundChain::SendBitswapMessage {
2070                    target,
2071                    message,
2072                    result,
2073                },
2074            ) => {
2075                let _ = result.send(task.network.bitswap_send_message(&target, message));
2076            }
2077            WakeUpReason::MessageForChain(
2078                _chain_id,
2079                ToBackgroundChain::BroadcastBitswapMessage { message, result },
2080            ) => {
2081                let peers = task
2082                    .network
2083                    .established_bitswap_desired()
2084                    .cloned()
2085                    .collect::<Vec<_>>();
2086                let results = peers
2087                    .iter()
2088                    .map(|peer| {
2089                        (
2090                            peer,
2091                            task.network.bitswap_send_message(peer, message.clone()),
2092                        )
2093                    })
2094                    .collect::<Vec<_>>(); // we must collect first to send all messages
2095
2096                let succeeded_peers = results
2097                    .iter()
2098                    .filter_map(|(peer, r)| r.is_ok().then(|| (*peer).clone()))
2099                    .collect::<Vec<_>>();
2100
2101                // TODO: introspecting a third-party error type below doesn't seem good.
2102                let r = if !succeeded_peers.is_empty() {
2103                    Ok(succeeded_peers)
2104                } else if results
2105                    .iter()
2106                    .any(|(_peer, r)| matches!(r, Err(SendBitswapMessageError::QueueFull)))
2107                {
2108                    // `QueueFull` has higher priority than `NoConnection` for possible
2109                    // back-pressure in higher level code.
2110                    Err(SendBitswapMessageError::QueueFull)
2111                } else {
2112                    // This is only emitted if all peers fail with `NoConnection` or there is no
2113                    // peers at all.
2114                    Err(SendBitswapMessageError::NoConnection)
2115                };
2116
2117                let _ = result.send(r);
2118            }
2119            WakeUpReason::MessageForChain(
2120                chain_id,
2121                ToBackgroundChain::BroadcastStatement { statement, result },
2122            ) => {
2123                let peers_to_send = task
2124                    .network
2125                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2126                    .cloned()
2127                    .collect::<Vec<_>>();
2128
2129                let total = peers_to_send.len();
2130                let mut sent = 0;
2131                for peer in &peers_to_send {
2132                    if task
2133                        .network
2134                        .gossip_send_statement(peer, chain_id, statement.clone())
2135                        .is_ok()
2136                    {
2137                        sent += 1;
2138                    }
2139                }
2140
2141                log!(
2142                    &task.platform,
2143                    Debug,
2144                    "network",
2145                    "statement-broadcast",
2146                    chain = task.network[chain_id].log_name,
2147                    sent,
2148                    total,
2149                );
2150
2151                let _ = result.send(BroadcastStatementResult { sent, total });
2152            }
2153            WakeUpReason::MessageForChain(
2154                chain_id,
2155                ToBackgroundChain::UpdateTopicAffinity { filter },
2156            ) => {
2157                task.current_affinity_filter
2158                    .insert(chain_id, filter.clone());
2159                if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2160                    let mut to_remove = Vec::new();
2161                    for peer_id in peers.iter() {
2162                        if let Err(
2163                            SendTopicAffinityError::NoConnection
2164                            | SendTopicAffinityError::ProtocolV1,
2165                        ) = task.network.send_topic_affinity(peer_id, chain_id, &filter)
2166                        {
2167                            to_remove.push(peer_id.clone());
2168                        }
2169                    }
2170                    for peer_id in &to_remove {
2171                        peers.remove(peer_id);
2172                    }
2173                }
2174            }
2175            WakeUpReason::MessageForChain(
2176                chain_id,
2177                ToBackgroundChain::Discover {
2178                    list,
2179                    important_nodes,
2180                },
2181            ) => {
2182                for (peer_id, addrs) in list {
2183                    if important_nodes {
2184                        task.important_nodes
2185                            .entry(chain_id)
2186                            .or_default()
2187                            .insert(peer_id.clone());
2188                    }
2189
2190                    // Note that we must call this function before `insert_address`, as documented
2191                    // in `basic_peering_strategy`.
2192                    task.peering_strategy
2193                        .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
2194
2195                    for addr in addrs {
2196                        let _ =
2197                            task.peering_strategy
2198                                .insert_address(&peer_id, addr.into_bytes(), 10);
2199                        // TODO: constant
2200                    }
2201                }
2202            }
2203            WakeUpReason::MessageForChain(
2204                chain_id,
2205                ToBackgroundChain::DiscoveredNodes { result },
2206            ) => {
2207                // TODO: consider returning Vec<u8>s for the addresses?
2208                let _ = result.send(
2209                    task.peering_strategy
2210                        .chain_peers_unordered(&chain_id)
2211                        .map(|peer_id| {
2212                            let addrs = task
2213                                .peering_strategy
2214                                .peer_addresses(peer_id)
2215                                .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
2216                                .collect::<Vec<_>>();
2217                            (peer_id.clone(), addrs)
2218                        })
2219                        .collect::<Vec<_>>(),
2220                );
2221            }
2222            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
2223                let _ = result.send(
2224                    task.network
2225                        .gossip_connected_peers(
2226                            chain_id,
2227                            service::GossipKind::ConsensusTransactions,
2228                        )
2229                        .cloned()
2230                        .collect(),
2231                );
2232            }
2233            WakeUpReason::StartDiscovery(chain_id) => {
2234                // Re-insert the chain in `chains_by_next_discovery`.
2235                let chain = &mut task.network[chain_id];
2236                chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
2237                chain.next_discovery_period =
2238                    cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
2239                task.chains_by_next_discovery.insert(
2240                    (chain.next_discovery_when.clone(), chain_id),
2241                    Box::pin(
2242                        task.platform
2243                            .sleep(task.network[chain_id].next_discovery_period),
2244                    ),
2245                );
2246
2247                // Iterative-style discovery: instead of a single FindNode against one peer,
2248                // dispatch up to ALPHA=3 FindNode requests in parallel to distinct peers,
2249                // each with a distinct random target. This gives substantially better DHT
2250                // coverage per discovery round (more peers asked, more diverse keyspace
2251                // walked) without requiring a per-query state machine.
2252                //
2253                // Order of preference for the target peer pool:
2254                //  1. Peers we know speak this chain's Kad protocol (from Identify).
2255                //  2. Peers with an open block-announces gossip substream (best-effort:
2256                //     they're connected and likely speak Kad even if we haven't gotten
2257                //     Identify yet).
2258                const PARALLEL_FIND_NODE_PER_ROUND: usize = 3;
2259
2260                let mut candidates: Vec<PeerId> = task
2261                    .network
2262                    .kademlia_capable_peers(chain_id)
2263                    .cloned()
2264                    .collect();
2265                for p in task
2266                    .network
2267                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2268                    .cloned()
2269                {
2270                    if !candidates.contains(&p) {
2271                        candidates.push(p);
2272                    }
2273                }
2274
2275                let started = dispatch_find_node_requests(
2276                    &mut task.network,
2277                    &mut task.randomness,
2278                    chain_id,
2279                    &candidates,
2280                    PARALLEL_FIND_NODE_PER_ROUND,
2281                );
2282
2283                let chain_log_name = &task.network[chain_id].log_name;
2284                for (request_target, requested_peer_id) in &started {
2285                    log!(
2286                        &task.platform,
2287                        Debug,
2288                        "network",
2289                        "discovery-find-node-started",
2290                        chain = chain_log_name,
2291                        request_target,
2292                        requested_peer_id
2293                    );
2294                }
2295                if started.is_empty() {
2296                    log!(
2297                        &task.platform,
2298                        Debug,
2299                        "network",
2300                        "discovery-skipped-no-peer",
2301                        chain = chain_log_name
2302                    );
2303                }
2304            }
2305            WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
2306                peer_id,
2307                expected_peer_id,
2308                id,
2309            }) => {
2310                let remote_addr =
2311                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
2312                if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
2313                {
2314                    log!(
2315                        &task.platform,
2316                        Debug,
2317                        "network",
2318                        "handshake-finished-peer-id-mismatch",
2319                        remote_addr,
2320                        expected_peer_id,
2321                        actual_peer_id = peer_id
2322                    );
2323
2324                    let _was_in = task
2325                        .peering_strategy
2326                        .decrease_address_connections_and_remove_if_zero(
2327                            expected_peer_id,
2328                            remote_addr.as_ref(),
2329                        );
2330                    debug_assert!(_was_in.is_ok());
2331                    let _ = task.peering_strategy.increase_address_connections(
2332                        &peer_id,
2333                        remote_addr.into_bytes().to_vec(),
2334                        10,
2335                    );
2336                } else {
2337                    log!(
2338                        &task.platform,
2339                        Debug,
2340                        "network",
2341                        "handshake-finished",
2342                        remote_addr,
2343                        peer_id
2344                    );
2345                }
2346
2347                task.bitswap_peering_strategy
2348                    .increase_peer_connections(&peer_id);
2349            }
2350            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2351                expected_peer_id: Some(_),
2352                ..
2353            })
2354            | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
2355                let (address, peer_id, handshake_finished) = match wake_up_reason {
2356                    WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2357                        address,
2358                        expected_peer_id: Some(peer_id),
2359                        ..
2360                    }) => (address, peer_id, false),
2361                    WakeUpReason::NetworkEvent(service::Event::Disconnected {
2362                        address,
2363                        peer_id,
2364                        ..
2365                    }) => (address, peer_id, true),
2366                    _ => unreachable!(),
2367                };
2368
2369                task.peering_strategy
2370                    .decrease_address_connections(&peer_id, &address)
2371                    .unwrap();
2372                let address = Multiaddr::from_bytes(address).unwrap();
2373                log!(
2374                    &task.platform,
2375                    Debug,
2376                    "network",
2377                    "connection-shutdown",
2378                    peer_id,
2379                    address,
2380                    ?handshake_finished
2381                );
2382
2383                // Ban the peer in order to avoid trying over and over again the same address(es).
2384                // Even if the handshake was finished, it is possible that the peer simply shuts
2385                // down connections immediately after it has been opened, hence the ban.
2386                // Due to race conditions and peerid mismatches, it is possible that there is
2387                // another existing connection or connection attempt with that same peer. However,
2388                // it is not possible to be sure that we will reach 0 connections or connection
2389                // attempts, and thus we ban the peer every time.
2390                // Pre-handshake failures get a shorter ban: many parallel dials time out
2391                // before any handshake completes, and a long slot-hold there dominates
2392                // peer-discovery latency on restarts.
2393                let ban_duration = if handshake_finished {
2394                    Duration::from_secs(5)
2395                } else {
2396                    Duration::from_secs(2)
2397                };
2398                task.network.gossip_remove_desired_all(
2399                    &peer_id,
2400                    service::GossipKind::ConsensusTransactions,
2401                );
2402                for (&chain_id, what_happened) in task
2403                    .peering_strategy
2404                    .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
2405                {
2406                    if matches!(
2407                        what_happened,
2408                        basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2409                    ) {
2410                        log!(
2411                            &task.platform,
2412                            Debug,
2413                            "network",
2414                            "slot-unassigned",
2415                            chain = &task.network[chain_id].log_name,
2416                            peer_id,
2417                            ?ban_duration,
2418                            // TODO: `reason` might be wrong, `handshake_finished` is not checked.
2419                            reason = "pre-handshake-disconnect"
2420                        );
2421                    }
2422                }
2423
2424                if handshake_finished {
2425                    task.network.bitswap_remove_desired(&peer_id);
2426                    let what_happened = task
2427                        .bitswap_peering_strategy
2428                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration);
2429                    if matches!(
2430                        what_happened,
2431                        bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true },
2432                    ) {
2433                        log!(
2434                            &task.platform,
2435                            Debug,
2436                            "network",
2437                            "bitswap-slot-unassigned",
2438                            peer_id,
2439                            ?ban_duration,
2440                            reason = "disconnect",
2441                        );
2442                    }
2443                    let _ = task
2444                        .bitswap_peering_strategy
2445                        .decrease_peer_connections(&peer_id);
2446                }
2447            }
2448            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2449                expected_peer_id: None,
2450                ..
2451            }) => {
2452                // This path can't be reached as we always set an expected peer id when creating
2453                // a connection.
2454                debug_assert!(false);
2455            }
2456            WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
2457                id,
2458                peer_id,
2459                ping_time,
2460            }) => {
2461                let remote_addr =
2462                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
2463                log!(
2464                    &task.platform,
2465                    Debug,
2466                    "network",
2467                    "pong",
2468                    peer_id,
2469                    remote_addr,
2470                    ?ping_time
2471                );
2472            }
2473            WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
2474                chain_id,
2475                peer_id,
2476                announce,
2477            }) => {
2478                log!(
2479                    &task.platform,
2480                    Debug,
2481                    "network",
2482                    "block-announce-received",
2483                    chain = &task.network[chain_id].log_name,
2484                    peer_id,
2485                    block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
2486                        announce.decode().scale_encoded_header
2487                    )),
2488                    is_best = announce.decode().is_best
2489                );
2490
2491                let decoded_announce = announce.decode();
2492                if decoded_announce.is_best {
2493                    let link = task
2494                        .open_gossip_links
2495                        .get_mut(&(chain_id, peer_id.clone()))
2496                        .unwrap();
2497                    if let Ok(decoded) = header::decode(
2498                        decoded_announce.scale_encoded_header,
2499                        task.network[chain_id].block_number_bytes,
2500                    ) {
2501                        link.best_block_hash = header::hash_from_scale_encoded_header(
2502                            decoded_announce.scale_encoded_header,
2503                        );
2504                        link.best_block_number = decoded.number;
2505                    }
2506                }
2507
2508                debug_assert!(task.event_pending_send.is_none());
2509                task.event_pending_send =
2510                    Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
2511            }
2512            WakeUpReason::NetworkEvent(service::Event::GossipConnected {
2513                peer_id,
2514                chain_id,
2515                role,
2516                best_number,
2517                best_hash,
2518                kind: service::GossipKind::ConsensusTransactions,
2519            }) => {
2520                log!(
2521                    &task.platform,
2522                    Debug,
2523                    "network",
2524                    "gossip-open-success",
2525                    chain = &task.network[chain_id].log_name,
2526                    peer_id,
2527                    best_number,
2528                    best_hash = HashDisplay(&best_hash)
2529                );
2530
2531                let _prev_value = task.open_gossip_links.insert(
2532                    (chain_id, peer_id.clone()),
2533                    OpenGossipLinkState {
2534                        best_block_number: best_number,
2535                        best_block_hash: best_hash,
2536                        role,
2537                        finalized_block_height: None,
2538                    },
2539                );
2540                debug_assert!(_prev_value.is_none());
2541
2542                task.chains_ever_gossip_connected.insert(chain_id);
2543
2544                debug_assert!(task.event_pending_send.is_none());
2545                task.event_pending_send = Some((
2546                    chain_id,
2547                    Event::Connected {
2548                        peer_id,
2549                        role,
2550                        best_block_number: best_number,
2551                        best_block_hash: best_hash,
2552                    },
2553                ));
2554            }
2555            WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
2556                peer_id,
2557                chain_id,
2558                error,
2559                kind: service::GossipKind::ConsensusTransactions,
2560            }) => {
2561                log!(
2562                    &task.platform,
2563                    Debug,
2564                    "network",
2565                    "gossip-open-error",
2566                    chain = &task.network[chain_id].log_name,
2567                    peer_id,
2568                    ?error,
2569                );
2570                // Must exceed polkadot-sdk's 5s notification-reject ban; otherwise we retry
2571                // into a still-active remote ban. 0.5s margin covers network delay and
2572                // clock skew between the two sides' ban timers.
2573                let ban_duration = Duration::from_millis(5500);
2574
2575                // Note that peer doesn't necessarily have an out slot, as this event might happen
2576                // as a result of an inbound gossip connection.
2577                let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
2578                    matches!(
2579                        task.peering_strategy
2580                            .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
2581                        basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
2582                    )
2583                } else {
2584                    matches!(
2585                        task.peering_strategy.unassign_slot_and_ban(
2586                            &chain_id,
2587                            &peer_id,
2588                            task.platform.now() + ban_duration,
2589                        ),
2590                        basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2591                    )
2592                };
2593
2594                if had_slot {
2595                    log!(
2596                        &task.platform,
2597                        Debug,
2598                        "network",
2599                        "slot-unassigned",
2600                        chain = &task.network[chain_id].log_name,
2601                        peer_id,
2602                        ?ban_duration,
2603                        reason = "gossip-open-failed"
2604                    );
2605                    task.network.gossip_remove_desired(
2606                        chain_id,
2607                        &peer_id,
2608                        service::GossipKind::ConsensusTransactions,
2609                    );
2610                }
2611            }
2612            WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
2613                peer_id,
2614                chain_id,
2615                kind: service::GossipKind::ConsensusTransactions,
2616            }) => {
2617                log!(
2618                    &task.platform,
2619                    Debug,
2620                    "network",
2621                    "gossip-closed",
2622                    chain = &task.network[chain_id].log_name,
2623                    peer_id,
2624                );
2625                let ban_duration = Duration::from_secs(10);
2626
2627                let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
2628                debug_assert!(_was_in.is_some());
2629
2630                // Note that peer doesn't necessarily have an out slot, as this event might happen
2631                // as a result of an inbound gossip connection.
2632                if matches!(
2633                    task.peering_strategy.unassign_slot_and_ban(
2634                        &chain_id,
2635                        &peer_id,
2636                        task.platform.now() + ban_duration,
2637                    ),
2638                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2639                ) {
2640                    log!(
2641                        &task.platform,
2642                        Debug,
2643                        "network",
2644                        "slot-unassigned",
2645                        chain = &task.network[chain_id].log_name,
2646                        peer_id,
2647                        ?ban_duration,
2648                        reason = "gossip-closed"
2649                    );
2650                    task.network.gossip_remove_desired(
2651                        chain_id,
2652                        &peer_id,
2653                        service::GossipKind::ConsensusTransactions,
2654                    );
2655                }
2656
2657                if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2658                    peers.remove(&peer_id);
2659                }
2660
2661                debug_assert!(task.event_pending_send.is_none());
2662                task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2663            }
2664            WakeUpReason::NetworkEvent(service::Event::BitswapConnected { peer_id }) => {
2665                task.bitswap_connected_peers = task.bitswap_connected_peers.saturating_add(1);
2666                log!(
2667                    &task.platform,
2668                    Debug,
2669                    "network",
2670                    "bitswap-open-success",
2671                    peer_id,
2672                    total = task.bitswap_connected_peers
2673                );
2674            }
2675            WakeUpReason::NetworkEvent(service::Event::BitswapOpenFailed { peer_id, error }) => {
2676                log!(
2677                    &task.platform,
2678                    Debug,
2679                    "network",
2680                    "bitswap-open-error",
2681                    peer_id,
2682                    ?error
2683                );
2684                let ban_duration = if error.is_protocol_not_available() {
2685                    Duration::from_secs(600)
2686                } else {
2687                    Duration::from_secs(15)
2688                };
2689                if matches!(
2690                    task.bitswap_peering_strategy
2691                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2692                    bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2693                ) {
2694                    log!(
2695                        &task.platform,
2696                        Debug,
2697                        "network",
2698                        "bitswap-slot-unassigned",
2699                        peer_id,
2700                        ?ban_duration,
2701                        reason = "bitswap-open-failed"
2702                    );
2703                    task.network.bitswap_remove_desired(&peer_id);
2704                }
2705            }
2706            WakeUpReason::NetworkEvent(service::Event::BitswapMessage { peer_id, message }) => {
2707                log!(
2708                    &task.platform,
2709                    Debug,
2710                    "network",
2711                    "bitswap-message-received",
2712                    peer_id
2713                );
2714                debug_assert!(task.bitswap_event_pending_send.is_none());
2715                task.bitswap_event_pending_send =
2716                    Some(BitswapEvent::BitswapMessage { peer_id, message });
2717            }
2718            WakeUpReason::NetworkEvent(service::Event::BitswapDisconnected { peer_id }) => {
2719                debug_assert!(task.bitswap_connected_peers > 0);
2720                task.bitswap_connected_peers = task.bitswap_connected_peers.saturating_sub(1);
2721                log!(
2722                    &task.platform,
2723                    Debug,
2724                    "network",
2725                    "bitswap-closed",
2726                    peer_id,
2727                    total = task.bitswap_connected_peers
2728                );
2729                let ban_duration = Duration::from_secs(10);
2730                if matches!(
2731                    task.bitswap_peering_strategy
2732                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2733                    bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2734                ) {
2735                    log!(
2736                        &task.platform,
2737                        Debug,
2738                        "network",
2739                        "bitswap-slot-unassigned",
2740                        peer_id,
2741                        ?ban_duration,
2742                        reason = "bitswap-closed"
2743                    );
2744                    task.network.bitswap_remove_desired(&peer_id);
2745                }
2746            }
2747            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2748                substream_id,
2749                peer_id,
2750                chain_id,
2751                response: service::RequestResult::Blocks(response),
2752            }) => {
2753                match &response {
2754                    Ok(blocks) => {
2755                        log!(
2756                            &task.platform,
2757                            Debug,
2758                            "network",
2759                            "blocks-request-success",
2760                            chain = task.network[chain_id].log_name,
2761                            target = peer_id,
2762                            num_blocks = blocks.len(),
2763                            block_data_total_size =
2764                                BytesDisplay(blocks.iter().fold(0, |sum, block| {
2765                                    let block_size = block.header.as_ref().map_or(0, |h| h.len())
2766                                        + block
2767                                            .body
2768                                            .as_ref()
2769                                            .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2770                                        + block
2771                                            .justifications
2772                                            .as_ref()
2773                                            .into_iter()
2774                                            .flat_map(|l| l.iter())
2775                                            .fold(0, |s, j| s + j.justification.len());
2776                                    sum + u64::try_from(block_size).unwrap()
2777                                }))
2778                        );
2779                    }
2780                    Err(error) => {
2781                        log!(
2782                            &task.platform,
2783                            Debug,
2784                            "network",
2785                            "blocks-request-error",
2786                            chain = task.network[chain_id].log_name,
2787                            target = peer_id,
2788                            ?error
2789                        );
2790                    }
2791                }
2792
2793                match &response {
2794                    Ok(_) => {}
2795                    Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2796                    Err(err) => {
2797                        log!(
2798                            &task.platform,
2799                            Debug,
2800                            "network",
2801                            format!(
2802                                "Error in block request with {}. This might indicate an \
2803                                incompatibility. Error: {}",
2804                                peer_id, err
2805                            )
2806                        );
2807                    }
2808                }
2809
2810                let _ = task
2811                    .blocks_requests
2812                    .remove(&substream_id)
2813                    .unwrap()
2814                    .send(response.map_err(BlocksRequestError::Request));
2815            }
2816            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2817                substream_id,
2818                peer_id,
2819                chain_id,
2820                response: service::RequestResult::GrandpaWarpSync(response),
2821            }) => {
2822                match &response {
2823                    Ok(response) => {
2824                        // TODO: print total bytes size
2825                        let decoded = response.decode();
2826                        log!(
2827                            &task.platform,
2828                            Debug,
2829                            "network",
2830                            "warp-sync-request-success",
2831                            chain = task.network[chain_id].log_name,
2832                            target = peer_id,
2833                            num_fragments = decoded.fragments.len(),
2834                            is_finished = ?decoded.is_finished,
2835                        );
2836                    }
2837                    Err(error) => {
2838                        log!(
2839                            &task.platform,
2840                            Debug,
2841                            "network",
2842                            "warp-sync-request-error",
2843                            chain = task.network[chain_id].log_name,
2844                            target = peer_id,
2845                            ?error,
2846                        );
2847                    }
2848                }
2849
2850                let _ = task
2851                    .grandpa_warp_sync_requests
2852                    .remove(&substream_id)
2853                    .unwrap()
2854                    .send(response.map_err(WarpSyncRequestError::Request));
2855            }
2856            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2857                substream_id,
2858                peer_id,
2859                chain_id,
2860                response: service::RequestResult::StorageProof(response),
2861            }) => {
2862                match &response {
2863                    Ok(items) => {
2864                        let decoded = items.decode();
2865                        log!(
2866                            &task.platform,
2867                            Debug,
2868                            "network",
2869                            "storage-proof-request-success",
2870                            chain = task.network[chain_id].log_name,
2871                            target = peer_id,
2872                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2873                        );
2874                    }
2875                    Err(error) => {
2876                        log!(
2877                            &task.platform,
2878                            Debug,
2879                            "network",
2880                            "storage-proof-request-error",
2881                            chain = task.network[chain_id].log_name,
2882                            target = peer_id,
2883                            ?error
2884                        );
2885                    }
2886                }
2887
2888                // Both regular storage proof and child storage proof use the same protocol,
2889                // so check both HashMaps for the request.
2890                if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
2891                    let _ = sender.send(response.map_err(StorageProofRequestError::Request));
2892                } else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
2893                {
2894                    let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
2895                } else {
2896                    unreachable!()
2897                }
2898            }
2899            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2900                substream_id,
2901                peer_id,
2902                chain_id,
2903                response: service::RequestResult::CallProof(response),
2904            }) => {
2905                match &response {
2906                    Ok(items) => {
2907                        let decoded = items.decode();
2908                        log!(
2909                            &task.platform,
2910                            Debug,
2911                            "network",
2912                            "call-proof-request-success",
2913                            chain = task.network[chain_id].log_name,
2914                            target = peer_id,
2915                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2916                        );
2917                    }
2918                    Err(error) => {
2919                        log!(
2920                            &task.platform,
2921                            Debug,
2922                            "network",
2923                            "call-proof-request-error",
2924                            chain = task.network[chain_id].log_name,
2925                            target = peer_id,
2926                            ?error
2927                        );
2928                    }
2929                }
2930
2931                let _ = task
2932                    .call_proof_requests
2933                    .remove(&substream_id)
2934                    .unwrap()
2935                    .send(response.map_err(CallProofRequestError::Request));
2936            }
2937            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2938                peer_id: requestee_peer_id,
2939                chain_id,
2940                response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2941                ..
2942            }) => {
2943                // Track whether this response taught us anything new. If so, we reset the
2944                // chain's discovery backoff so that the next FindNode round runs at the
2945                // initial 2s interval rather than continuing to back off — Kademlia is
2946                // making progress, walk the DHT eagerly.
2947                let mut any_new_peer = false;
2948                for (peer_id, mut addrs) in nodes {
2949                    // Make sure to not insert too many address for a single peer.
2950                    // While the .
2951                    if addrs.len() >= 10 {
2952                        addrs.truncate(10);
2953                    }
2954
2955                    let mut valid_addrs = Vec::with_capacity(addrs.len());
2956                    for addr in addrs {
2957                        match Multiaddr::from_bytes(addr) {
2958                            Ok(mut a) => {
2959                                if !pop_p2p_if_matches(&mut a, &peer_id) {
2960                                    log!(
2961                                        &task.platform,
2962                                        Debug,
2963                                        "network",
2964                                        "discovered-address-peer-id-mismatch",
2965                                        chain = &task.network[chain_id].log_name,
2966                                        announced_peer_id = peer_id,
2967                                        addr = &a,
2968                                        obtained_from = requestee_peer_id
2969                                    );
2970                                    continue;
2971                                }
2972                                if platform::address_parse::multiaddr_to_address(&a)
2973                                    .ok()
2974                                    .map_or(false, |addr| {
2975                                        task.platform.supports_connection_type((&addr).into())
2976                                    })
2977                                {
2978                                    valid_addrs.push(a)
2979                                } else {
2980                                    log!(
2981                                        &task.platform,
2982                                        Debug,
2983                                        "network",
2984                                        "discovered-address-not-supported",
2985                                        chain = &task.network[chain_id].log_name,
2986                                        peer_id,
2987                                        addr = &a,
2988                                        obtained_from = requestee_peer_id
2989                                    );
2990                                }
2991                            }
2992                            Err((error, addr)) => {
2993                                log!(
2994                                    &task.platform,
2995                                    Debug,
2996                                    "network",
2997                                    "discovered-address-invalid",
2998                                    chain = &task.network[chain_id].log_name,
2999                                    peer_id,
3000                                    error,
3001                                    addr = hex::encode(&addr),
3002                                    obtained_from = requestee_peer_id
3003                                );
3004                            }
3005                        }
3006                    }
3007
3008                    if !valid_addrs.is_empty() {
3009                        // Note that we must call this function before `insert_address`,
3010                        // as documented in `basic_peering_strategy`.
3011                        let insert_outcome =
3012                            task.peering_strategy
3013                                .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
3014
3015                        if let basic_peering_strategy::InsertChainPeerResult::Inserted {
3016                            peer_removed,
3017                        } = insert_outcome
3018                        {
3019                            any_new_peer = true;
3020                            if let Some(peer_removed) = peer_removed {
3021                                log!(
3022                                    &task.platform,
3023                                    Debug,
3024                                    "network",
3025                                    "peer-purged-from-address-book",
3026                                    chain = &task.network[chain_id].log_name,
3027                                    peer_id = peer_removed,
3028                                );
3029                            }
3030
3031                            log!(
3032                                &task.platform,
3033                                Debug,
3034                                "network",
3035                                "peer-discovered",
3036                                chain = &task.network[chain_id].log_name,
3037                                peer_id,
3038                                addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), // TODO: better formatting?
3039                                obtained_from = requestee_peer_id
3040                            );
3041                        }
3042                    }
3043
3044                    for addr in valid_addrs {
3045                        let _insert_result =
3046                            task.peering_strategy
3047                                .insert_address(&peer_id, addr.into_bytes(), 10); // TODO: constant
3048                        debug_assert!(!matches!(
3049                            _insert_result,
3050                            basic_peering_strategy::InsertAddressResult::UnknownPeer
3051                        ));
3052                    }
3053                }
3054
3055                if any_new_peer {
3056                    task.network[chain_id].next_discovery_period = Duration::from_secs(2);
3057                }
3058            }
3059            WakeUpReason::NetworkEvent(service::Event::RequestResult {
3060                peer_id,
3061                chain_id,
3062                response: service::RequestResult::KademliaFindNode(Err(error)),
3063                ..
3064            }) => {
3065                log!(
3066                    &task.platform,
3067                    Debug,
3068                    "network",
3069                    "discovery-find-node-error",
3070                    chain = &task.network[chain_id].log_name,
3071                    ?error,
3072                    find_node_target = peer_id,
3073                );
3074
3075                // No error is printed if the request fails due to a benign networking error such
3076                // as an unresponsive peer.
3077                match error {
3078                    service::KademliaFindNodeError::RequestFailed(err)
3079                        if !err.is_protocol_error() => {}
3080
3081                    service::KademliaFindNodeError::RequestFailed(
3082                        service::RequestError::Substream(
3083                            connection::established::RequestError::ProtocolNotAvailable,
3084                        ),
3085                    ) => {
3086                        // TODO: remove this warning in a long time
3087                        log!(
3088                            &task.platform,
3089                            Warn,
3090                            "network",
3091                            format!(
3092                                "Problem during discovery on {}: protocol not available. \
3093                                This might indicate that the version of Substrate used by \
3094                                the chain doesn't include \
3095                                <https://github.com/paritytech/substrate/pull/12545>.",
3096                                &task.network[chain_id].log_name
3097                            )
3098                        );
3099                    }
3100                    _ => {
3101                        log!(
3102                            &task.platform,
3103                            Debug,
3104                            "network",
3105                            format!(
3106                                "Problem during discovery on {}: {}",
3107                                &task.network[chain_id].log_name, error
3108                            )
3109                        );
3110                    }
3111                }
3112            }
3113            WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
3114                // We never start any other kind of requests.
3115                unreachable!()
3116            }
3117            WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
3118                peer_id,
3119                chain_id,
3120                kind: service::GossipKind::ConsensusTransactions,
3121            }) => {
3122                // The networking state machine guarantees that `GossipInDesired`
3123                // can't happen if we are already opening an out slot, which we do
3124                // immediately.
3125                // TODO: add debug_assert! ^
3126                if task
3127                    .network
3128                    .opened_gossip_undesired_by_chain(chain_id)
3129                    .count()
3130                    < 4
3131                {
3132                    log!(
3133                        &task.platform,
3134                        Debug,
3135                        "network",
3136                        "gossip-in-request",
3137                        chain = &task.network[chain_id].log_name,
3138                        peer_id,
3139                        outcome = "accepted"
3140                    );
3141                    task.network
3142                        .gossip_open(
3143                            chain_id,
3144                            &peer_id,
3145                            service::GossipKind::ConsensusTransactions,
3146                        )
3147                        .unwrap();
3148                } else {
3149                    log!(
3150                        &task.platform,
3151                        Debug,
3152                        "network",
3153                        "gossip-in-request",
3154                        chain = &task.network[chain_id].log_name,
3155                        peer_id,
3156                        outcome = "rejected",
3157                    );
3158                    task.network
3159                        .gossip_close(
3160                            chain_id,
3161                            &peer_id,
3162                            service::GossipKind::ConsensusTransactions,
3163                        )
3164                        .unwrap();
3165                }
3166            }
3167            WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
3168                // Can't happen as we already instantaneously accept or reject gossip in requests.
3169                unreachable!()
3170            }
3171            WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
3172                peer_id,
3173                substream_id,
3174            }) => {
3175                log!(
3176                    &task.platform,
3177                    Debug,
3178                    "network",
3179                    "identify-request-received",
3180                    peer_id,
3181                );
3182                task.network
3183                    .respond_identify(substream_id, &task.identify_agent_version);
3184            }
3185            WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
3186            WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
3187                // All incoming requests are immediately answered.
3188                unreachable!()
3189            }
3190            WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
3191                chain_id,
3192                peer_id,
3193                state,
3194            }) => {
3195                log!(
3196                    &task.platform,
3197                    Debug,
3198                    "network",
3199                    "grandpa-neighbor-packet-received",
3200                    chain = &task.network[chain_id].log_name,
3201                    peer_id,
3202                    round_number = state.round_number,
3203                    set_id = state.set_id,
3204                    commit_finalized_height = state.commit_finalized_height,
3205                );
3206
3207                task.open_gossip_links
3208                    .get_mut(&(chain_id, peer_id.clone()))
3209                    .unwrap()
3210                    .finalized_block_height = Some(state.commit_finalized_height);
3211
3212                debug_assert!(task.event_pending_send.is_none());
3213                task.event_pending_send = Some((
3214                    chain_id,
3215                    Event::GrandpaNeighborPacket {
3216                        peer_id,
3217                        finalized_block_height: state.commit_finalized_height,
3218                    },
3219                ));
3220            }
3221            WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
3222                chain_id,
3223                peer_id,
3224                message,
3225            }) => {
3226                log!(
3227                    &task.platform,
3228                    Debug,
3229                    "network",
3230                    "grandpa-commit-message-received",
3231                    chain = &task.network[chain_id].log_name,
3232                    peer_id,
3233                    target_block_hash = HashDisplay(message.decode().target_hash),
3234                );
3235
3236                debug_assert!(task.event_pending_send.is_none());
3237                task.event_pending_send =
3238                    Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
3239            }
3240            WakeUpReason::NetworkEvent(service::Event::StatementsNotification {
3241                chain_id,
3242                peer_id,
3243                statements,
3244            }) => {
3245                debug_assert!(task.event_pending_send.is_none());
3246
3247                if statements.is_empty() {
3248                    continue;
3249                }
3250
3251                task.event_pending_send = Some((
3252                    chain_id,
3253                    Event::StatementsNotification {
3254                        peer_id,
3255                        statements,
3256                    },
3257                ));
3258            }
3259            WakeUpReason::NetworkEvent(service::Event::StatementProtocolConnected {
3260                peer_id,
3261                chain_id,
3262                version,
3263            }) => {
3264                log!(
3265                    &task.platform,
3266                    Trace,
3267                    "network",
3268                    "statement-protocol-open-success",
3269                    chain = &task.network[chain_id].log_name,
3270                    peer_id,
3271                    ?version,
3272                );
3273
3274                if matches!(version, codec::StatementProtocolVersion::V2) {
3275                    task.v2_statement_peers
3276                        .entry(chain_id)
3277                        .or_insert_with(|| {
3278                            HashSet::with_capacity_and_hasher(16, Default::default())
3279                        })
3280                        .insert(peer_id.clone());
3281                    if let Some(filter) = task.current_affinity_filter.get(&chain_id) {
3282                        if let Err(
3283                            SendTopicAffinityError::NoConnection
3284                            | SendTopicAffinityError::ProtocolV1,
3285                        ) = task.network.send_topic_affinity(&peer_id, chain_id, filter)
3286                        {
3287                            task.v2_statement_peers
3288                                .get_mut(&chain_id)
3289                                .unwrap()
3290                                .remove(&peer_id);
3291                        }
3292                    }
3293                }
3294            }
3295            // TODO: we don't filter outbound statements yet
3296            WakeUpReason::NetworkEvent(service::Event::StatementTopicAffinityReceived {
3297                ..
3298            }) => {}
3299            WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
3300                // TODO: handle properly?
3301                log!(
3302                    &task.platform,
3303                    Warn,
3304                    "network",
3305                    "protocol-error",
3306                    peer_id,
3307                    ?error
3308                );
3309
3310                // TODO: disconnect peer
3311            }
3312            WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
3313                task.peering_strategy.assign_slot(&chain_id, &peer_id);
3314
3315                log!(
3316                    &task.platform,
3317                    Debug,
3318                    "network",
3319                    "slot-assigned",
3320                    chain = &task.network[chain_id].log_name,
3321                    peer_id
3322                );
3323
3324                task.network.gossip_insert_desired(
3325                    chain_id,
3326                    peer_id,
3327                    service::GossipKind::ConsensusTransactions,
3328                );
3329            }
3330            WakeUpReason::CanAssignBitswapSlot(peer_id) => {
3331                task.bitswap_peering_strategy.assign_slot(&peer_id).unwrap();
3332
3333                log!(
3334                    &task.platform,
3335                    Debug,
3336                    "network",
3337                    "bitswap-slot-assigned",
3338                    peer_id
3339                );
3340
3341                task.network.bitswap_insert_desired(peer_id);
3342            }
3343            WakeUpReason::NextRecentConnectionRestore => {
3344                task.num_recent_connection_opening =
3345                    task.num_recent_connection_opening.saturating_sub(1);
3346            }
3347            WakeUpReason::CanStartConnect(expected_peer_id) => {
3348                let Some(multiaddr) = task
3349                    .peering_strategy
3350                    .pick_address_and_add_connection(&expected_peer_id)
3351                else {
3352                    // There is no address for that peer in the address book.
3353                    task.network.gossip_remove_desired_all(
3354                        &expected_peer_id,
3355                        service::GossipKind::ConsensusTransactions,
3356                    );
3357                    let ban_duration = Duration::from_secs(10);
3358                    for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
3359                        &expected_peer_id,
3360                        task.platform.now() + ban_duration,
3361                    ) {
3362                        if matches!(
3363                            what_happened,
3364                            basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
3365                        ) {
3366                            log!(
3367                                &task.platform,
3368                                Debug,
3369                                "network",
3370                                "slot-unassigned",
3371                                chain = &task.network[chain_id].log_name,
3372                                peer_id = expected_peer_id,
3373                                ?ban_duration,
3374                                reason = "no-address"
3375                            );
3376                        }
3377                    }
3378                    continue;
3379                };
3380
3381                let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
3382                    Ok(a) => a,
3383                    Err((multiaddr::FromBytesError, addr)) => {
3384                        // Address is in an invalid format.
3385                        let _was_in = task
3386                            .peering_strategy
3387                            .decrease_address_connections_and_remove_if_zero(
3388                                &expected_peer_id,
3389                                &addr,
3390                            );
3391                        debug_assert!(_was_in.is_ok());
3392                        continue;
3393                    }
3394                };
3395
3396                let address = address_parse::multiaddr_to_address(&multiaddr)
3397                    .ok()
3398                    .filter(|addr| {
3399                        task.platform.supports_connection_type(match &addr {
3400                            address_parse::AddressOrMultiStreamAddress::Address(addr) => {
3401                                From::from(addr)
3402                            }
3403                            address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3404                                addr,
3405                            ) => From::from(addr),
3406                        })
3407                    });
3408
3409                let Some(address) = address else {
3410                    // Address is in an invalid format or isn't supported by the platform.
3411                    let _was_in = task
3412                        .peering_strategy
3413                        .decrease_address_connections_and_remove_if_zero(
3414                            &expected_peer_id,
3415                            multiaddr.as_ref(),
3416                        );
3417                    debug_assert!(_was_in.is_ok());
3418                    continue;
3419                };
3420
3421                // Each connection has its own individual Noise key.
3422                let noise_key = {
3423                    let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
3424                    task.platform.fill_random_bytes(&mut *noise_static_key);
3425                    let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
3426                    task.platform.fill_random_bytes(&mut *libp2p_key);
3427                    connection::NoiseKey::new(&libp2p_key, &noise_static_key)
3428                };
3429
3430                log!(
3431                    &task.platform,
3432                    Debug,
3433                    "network",
3434                    "connection-started",
3435                    expected_peer_id,
3436                    remote_addr = multiaddr,
3437                    local_peer_id =
3438                        peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
3439                            .into_peer_id(),
3440                );
3441
3442                task.num_recent_connection_opening += 1;
3443
3444                let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
3445                    async_channel::bounded(8);
3446                let task_name = format!("connection-{}", multiaddr);
3447
3448                match address {
3449                    address_parse::AddressOrMultiStreamAddress::Address(address) => {
3450                        // As documented in the `PlatformRef` trait, `connect_stream` must
3451                        // return as soon as possible.
3452                        let connection = task.platform.connect_stream(address).await;
3453
3454                        let (connection_id, connection_task) =
3455                            task.network.add_single_stream_connection(
3456                                task.platform.now(),
3457                                service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
3458                                    is_initiator: true,
3459                                    noise_key: &noise_key,
3460                                },
3461                                multiaddr.clone().into_bytes(),
3462                                Some(expected_peer_id.clone()),
3463                                coordinator_to_connection_tx,
3464                            );
3465
3466                        task.platform.spawn_task(
3467                            task_name.into(),
3468                            tasks::single_stream_connection_task::<TPlat>(
3469                                connection,
3470                                multiaddr.to_string(),
3471                                task.platform.clone(),
3472                                connection_id,
3473                                connection_task,
3474                                coordinator_to_connection_rx,
3475                                task.tasks_messages_tx.clone(),
3476                            ),
3477                        );
3478                    }
3479                    address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3480                        platform::MultiStreamAddress::WebRtc {
3481                            ip,
3482                            port,
3483                            remote_certificate_sha256,
3484                        },
3485                    ) => {
3486                        // We need to know the local TLS certificate in order to insert the
3487                        // connection, and as such we need to call `connect_multistream` here.
3488                        // As documented in the `PlatformRef` trait, `connect_multistream` must
3489                        // return as soon as possible.
3490                        let connection = task
3491                            .platform
3492                            .connect_multistream(platform::MultiStreamAddress::WebRtc {
3493                                ip,
3494                                port,
3495                                remote_certificate_sha256,
3496                            })
3497                            .await;
3498
3499                        // Convert the SHA256 hashes into multihashes.
3500                        let local_tls_certificate_multihash = [18u8, 32]
3501                            .into_iter()
3502                            .chain(connection.local_tls_certificate_sha256.into_iter())
3503                            .collect();
3504                        let remote_tls_certificate_multihash = [18u8, 32]
3505                            .into_iter()
3506                            .chain(remote_certificate_sha256.iter().copied())
3507                            .collect();
3508
3509                        let (connection_id, connection_task) =
3510                            task.network.add_multi_stream_connection(
3511                                task.platform.now(),
3512                                service::MultiStreamHandshakeKind::WebRtc {
3513                                    is_initiator: true,
3514                                    local_tls_certificate_multihash,
3515                                    remote_tls_certificate_multihash,
3516                                    noise_key: &noise_key,
3517                                },
3518                                multiaddr.clone().into_bytes(),
3519                                Some(expected_peer_id.clone()),
3520                                coordinator_to_connection_tx,
3521                            );
3522
3523                        task.platform.spawn_task(
3524                            task_name.into(),
3525                            tasks::webrtc_multi_stream_connection_task::<TPlat>(
3526                                connection.connection,
3527                                multiaddr.to_string(),
3528                                task.platform.clone(),
3529                                connection_id,
3530                                connection_task,
3531                                coordinator_to_connection_rx,
3532                                task.tasks_messages_tx.clone(),
3533                            ),
3534                        );
3535                    }
3536                }
3537            }
3538            WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
3539                task.network
3540                    .gossip_open(
3541                        chain_id,
3542                        &peer_id,
3543                        service::GossipKind::ConsensusTransactions,
3544                    )
3545                    .unwrap();
3546
3547                log!(
3548                    &task.platform,
3549                    Debug,
3550                    "network",
3551                    "gossip-open-start",
3552                    chain = &task.network[chain_id].log_name,
3553                    peer_id,
3554                );
3555            }
3556            WakeUpReason::CanOpenBitswap(peer_id) => {
3557                task.network.bitswap_open(&peer_id).unwrap();
3558
3559                log!(
3560                    &task.platform,
3561                    Debug,
3562                    "network",
3563                    "bitswap-open-start",
3564                    peer_id
3565                );
3566            }
3567            WakeUpReason::MessageToConnection {
3568                connection_id,
3569                message,
3570            } => {
3571                // Note that it is critical for the sending to not take too long here, in order to
3572                // not block the process of the network service.
3573                // In particular, if sending the message to the connection is blocked due to
3574                // sending a message on the connection-to-coordinator channel, this will result
3575                // in a deadlock.
3576                // For this reason, the connection task is always ready to immediately accept a
3577                // message on the coordinator-to-connection channel.
3578                let _send_result = task.network[connection_id].send(message).await;
3579                debug_assert!(_send_result.is_ok());
3580            }
3581        }
3582    }
3583}
3584
3585/// Starts find-node requests against `candidates` until `max` have started, each with a fresh
3586/// random target key, and returns the `(request_target, requested_peer_id)` of each.
3587///
3588/// A `kademlia_capable_peers` candidate may have no usable connection (the flag outlives the
3589/// connection it was learned on); such a peer fails with `NoConnection` and is skipped without
3590/// counting towards `max`.
3591fn dispatch_find_node_requests<TChain, TConn, TNow>(
3592    network: &mut service::ChainNetwork<TChain, TConn, TNow>,
3593    randomness: &mut impl rand_chacha::rand_core::RngCore,
3594    chain_id: service::ChainId,
3595    candidates: &[PeerId],
3596    max: usize,
3597) -> Vec<(PeerId, PeerId)>
3598where
3599    TNow: Clone
3600        + core::ops::Add<Duration, Output = TNow>
3601        + core::ops::Sub<TNow, Output = Duration>
3602        + Ord,
3603{
3604    let mut started = Vec::with_capacity(max);
3605
3606    for target in candidates {
3607        if started.len() >= max {
3608            break;
3609        }
3610
3611        let random_peer_id = {
3612            let mut pub_key = [0; 32];
3613            randomness.fill_bytes(&mut pub_key);
3614            PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
3615        };
3616
3617        match network.start_kademlia_find_node_request(
3618            target,
3619            chain_id,
3620            &random_peer_id,
3621            Duration::from_secs(20),
3622        ) {
3623            Ok(_) => started.push((target.clone(), random_peer_id)),
3624            Err(service::StartRequestError::NoConnection) => {}
3625        }
3626    }
3627
3628    started
3629}
3630
3631/// Pops a trailing `/p2p/<peer_id>` from `addr` if it matches `expected_peer`. Returns `false`
3632/// (caller must discard the address) on mismatch.
3633fn pop_p2p_if_matches(
3634    addr: &mut smoldot::libp2p::multiaddr::Multiaddr,
3635    expected_peer: &smoldot::libp2p::peer_id::PeerId,
3636) -> bool {
3637    use smoldot::libp2p::multiaddr::Protocol;
3638    match addr.iter().last() {
3639        Some(Protocol::P2p(mh)) => {
3640            if mh.into_bytes() == expected_peer.as_bytes() {
3641                addr.pop();
3642                true
3643            } else {
3644                false
3645            }
3646        }
3647        _ => true,
3648    }
3649}
3650
3651#[cfg(test)]
3652mod tests {
3653    use super::{Role, dispatch_find_node_requests, pop_p2p_if_matches, service};
3654    use core::time::Duration;
3655    use rand_chacha::rand_core::SeedableRng as _;
3656    use smoldot::libp2p::{multiaddr::Multiaddr, peer_id::PeerId};
3657
3658    // Two distinct, valid PeerIds. The first is reused from existing smoldot tests in
3659    // `lib/src/libp2p/multiaddr.rs:629`; the second is the bootnode peer-id observed in the
3660    // test environment that motivated this change.
3661    const PEER_A: &str = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN";
3662    const PEER_B: &str = "12D3KooWQk1yQtG1YugyKjiQf6KNk8VjGGAT5xy1FWcnRKN4yXYJ";
3663
3664    fn peer(s: &str) -> PeerId {
3665        PeerId::from_bytes(bs58::decode(s).into_vec().unwrap()).unwrap()
3666    }
3667
3668    #[test]
3669    fn no_suffix_passes_through_unchanged() {
3670        let mut addr: Multiaddr = "/ip4/127.0.0.1/tcp/30333/ws".parse().unwrap();
3671        let before = addr.clone();
3672        assert!(pop_p2p_if_matches(&mut addr, &peer(PEER_A)));
3673        assert_eq!(addr, before);
3674    }
3675
3676    #[test]
3677    fn matching_suffix_is_stripped() {
3678        let mut addr: Multiaddr = format!("/ip4/127.0.0.1/tcp/30333/ws/p2p/{PEER_A}")
3679            .parse()
3680            .unwrap();
3681        assert!(pop_p2p_if_matches(&mut addr, &peer(PEER_A)));
3682        let expected: Multiaddr = "/ip4/127.0.0.1/tcp/30333/ws".parse().unwrap();
3683        assert_eq!(addr, expected);
3684    }
3685
3686    #[test]
3687    fn mismatched_suffix_rejects_and_keeps_addr() {
3688        let original: Multiaddr = format!("/ip4/127.0.0.1/tcp/30333/ws/p2p/{PEER_A}")
3689            .parse()
3690            .unwrap();
3691        let mut addr = original.clone();
3692        assert!(!pop_p2p_if_matches(&mut addr, &peer(PEER_B)));
3693        assert_eq!(addr, original);
3694    }
3695
3696    fn empty_network() -> (service::ChainNetwork<(), (), Duration>, service::ChainId) {
3697        let mut network = service::ChainNetwork::new(service::Config {
3698            connections_capacity: 8,
3699            chains_capacity: 1,
3700            randomness_seed: [0; 32],
3701            handshake_timeout: Duration::from_secs(10),
3702        });
3703        let chain_id = network
3704            .add_chain(service::ChainConfig {
3705                user_data: (),
3706                genesis_hash: [0; 32],
3707                fork_id: None,
3708                block_number_bytes: 4,
3709                grandpa_protocol_config: None,
3710                allow_inbound_block_requests: false,
3711                best_hash: [0; 32],
3712                best_number: 0,
3713                role: Role::Light,
3714                enable_statement_protocol: false,
3715            })
3716            .unwrap();
3717        (network, chain_id)
3718    }
3719
3720    // With no connections every candidate returns `NoConnection`, so all are skipped and no
3721    // request is started. The dispatch loop must not treat that as unreachable.
3722    #[test]
3723    fn dispatch_skips_unreachable_candidates() {
3724        let (mut network, chain_id) = empty_network();
3725        let mut randomness = rand_chacha::ChaCha20Rng::from_seed([7; 32]);
3726
3727        let candidates = [peer(PEER_A), peer(PEER_B)];
3728        let started =
3729            dispatch_find_node_requests(&mut network, &mut randomness, chain_id, &candidates, 3);
3730
3731        assert!(started.is_empty());
3732    }
3733}