1use crate::{
31 behaviour::{self, Behaviour, BehaviourOut},
32 bitswap::BitswapRequestHandler,
33 config::{
34 parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
35 NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
36 },
37 discovery::DiscoveryConfig,
38 error::Error,
39 event::{DhtEvent, Event},
40 network_state::{
41 NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
42 },
43 peer_store::{PeerStore, PeerStoreProvider},
44 protocol::{self, Protocol, Ready},
45 protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
46 request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
47 service::{
48 signature::{Signature, SigningError},
49 traits::{
50 BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
51 NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
52 NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
53 NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
54 },
55 },
56 transport,
57 types::ProtocolName,
58 NotificationService, ReputationChange,
59};
60
61use codec::DecodeAll;
62use futures::{channel::oneshot, prelude::*};
63use libp2p::{
64 connection_limits::{ConnectionLimits, Exceeded},
65 core::{upgrade, ConnectedPoint, Endpoint},
66 identify::Info as IdentifyInfo,
67 identity::ed25519,
68 multiaddr::{self, Multiaddr},
69 swarm::{
70 Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
71 NetworkBehaviour, Swarm, SwarmEvent,
72 },
73 PeerId,
74};
75use log::{debug, error, info, trace, warn};
76use metrics::{Histogram, MetricSources, Metrics};
77use parking_lot::Mutex;
78use prometheus_endpoint::Registry;
79use sc_network_types::kad::{Key as KademliaKey, Record};
80
81use sc_client_api::BlockBackend;
82use sc_network_common::{
83 role::{ObservedRole, Roles},
84 ExHashT,
85};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
87use sp_runtime::traits::Block as BlockT;
88
89pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
90pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
91pub use metrics::NotificationMetrics;
92pub use protocol::NotificationsSink;
93use std::{
94 collections::{HashMap, HashSet},
95 fs, iter,
96 marker::PhantomData,
97 num::NonZeroUsize,
98 pin::Pin,
99 str,
100 sync::{
101 atomic::{AtomicUsize, Ordering},
102 Arc,
103 },
104 time::{Duration, Instant},
105};
106
107pub(crate) mod metrics;
108pub(crate) mod out_events;
109
110pub mod signature;
111pub mod traits;
112
113const LOG_TARGET: &str = "sub-libp2p";
115
116struct Libp2pBandwidthSink {
117 #[allow(deprecated)]
118 sink: Arc<transport::BandwidthSinks>,
119}
120
121impl BandwidthSink for Libp2pBandwidthSink {
122 fn total_inbound(&self) -> u64 {
123 self.sink.total_inbound()
124 }
125
126 fn total_outbound(&self) -> u64 {
127 self.sink.total_outbound()
128 }
129}
130
131pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
133 num_connected: Arc<AtomicUsize>,
135 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
137 listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
139 local_peer_id: PeerId,
141 local_identity: Keypair,
143 bandwidth: Arc<dyn BandwidthSink>,
145 to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
147 notification_protocol_ids: HashMap<ProtocolName, SetId>,
150 protocol_handles: Vec<protocol_controller::ProtocolHandle>,
153 sync_protocol_handle: protocol_controller::ProtocolHandle,
155 peer_store_handle: Arc<dyn PeerStoreProvider>,
157 _marker: PhantomData<H>,
160 _block: PhantomData<B>,
162}
163
164#[async_trait::async_trait]
165impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
166where
167 B: BlockT + 'static,
168 H: ExHashT,
169{
170 type NotificationProtocolConfig = NonDefaultSetConfig;
171 type RequestResponseProtocolConfig = RequestResponseConfig;
172 type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
173 type PeerStore = PeerStore;
174 type BitswapConfig = RequestResponseConfig;
175
176 fn new(params: Params<B, H, Self>) -> Result<Self, Error>
177 where
178 Self: Sized,
179 {
180 NetworkWorker::new(params)
181 }
182
183 fn network_service(&self) -> Arc<dyn NetworkServiceT> {
185 self.service.clone()
186 }
187
188 fn peer_store(
190 bootnodes: Vec<sc_network_types::PeerId>,
191 metrics_registry: Option<Registry>,
192 ) -> Self::PeerStore {
193 PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
194 }
195
196 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
197 NotificationMetrics::new(registry)
198 }
199
200 fn bitswap_server(
201 client: Arc<dyn BlockBackend<B> + Send + Sync>,
202 _metrics_registry: Option<Registry>,
203 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
204 let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
205
206 (Box::pin(async move { handler.run().await }), protocol_config)
207 }
208
209 fn notification_config(
211 protocol_name: ProtocolName,
212 fallback_names: Vec<ProtocolName>,
213 max_notification_size: u64,
214 handshake: Option<NotificationHandshake>,
215 set_config: SetConfig,
216 _metrics: NotificationMetrics,
217 _peerstore_handle: Arc<dyn PeerStoreProvider>,
218 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
219 NonDefaultSetConfig::new(
220 protocol_name,
221 fallback_names,
222 max_notification_size,
223 handshake,
224 set_config,
225 )
226 }
227
228 fn request_response_config(
230 protocol_name: ProtocolName,
231 fallback_names: Vec<ProtocolName>,
232 max_request_size: u64,
233 max_response_size: u64,
234 request_timeout: Duration,
235 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
236 ) -> Self::RequestResponseProtocolConfig {
237 Self::RequestResponseProtocolConfig {
238 name: protocol_name,
239 fallback_names,
240 max_request_size,
241 max_response_size,
242 request_timeout,
243 inbound_queue,
244 }
245 }
246
247 async fn run(mut self) {
249 self.run().await
250 }
251}
252
253impl<B, H> NetworkWorker<B, H>
254where
255 B: BlockT + 'static,
256 H: ExHashT,
257{
258 pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
264 let peer_store_handle = params.network_config.peer_store_handle();
265 let FullNetworkConfiguration {
266 notification_protocols,
267 request_response_protocols,
268 mut network_config,
269 ..
270 } = params.network_config;
271
272 let local_identity = network_config.node_key.clone().into_keypair()?;
274 let local_public = local_identity.public();
275 let local_peer_id = local_public.to_peer_id();
276
277 let local_identity: ed25519::Keypair = local_identity.into();
279 let local_public: ed25519::PublicKey = local_public.into();
280 let local_peer_id: PeerId = local_peer_id.into();
281
282 network_config.boot_nodes = network_config
283 .boot_nodes
284 .into_iter()
285 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
286 .collect();
287 network_config.default_peers_set.reserved_nodes = network_config
288 .default_peers_set
289 .reserved_nodes
290 .into_iter()
291 .filter(|reserved_node| {
292 if reserved_node.peer_id == local_peer_id.into() {
293 warn!(
294 target: LOG_TARGET,
295 "Local peer ID used in reserved node, ignoring: {}",
296 reserved_node,
297 );
298 false
299 } else {
300 true
301 }
302 })
303 .collect();
304
305 ensure_addresses_consistent_with_transport(
307 network_config.listen_addresses.iter(),
308 &network_config.transport,
309 )?;
310 ensure_addresses_consistent_with_transport(
311 network_config.boot_nodes.iter().map(|x| &x.multiaddr),
312 &network_config.transport,
313 )?;
314 ensure_addresses_consistent_with_transport(
315 network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
316 &network_config.transport,
317 )?;
318 for notification_protocol in ¬ification_protocols {
319 ensure_addresses_consistent_with_transport(
320 notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
321 &network_config.transport,
322 )?;
323 }
324 ensure_addresses_consistent_with_transport(
325 network_config.public_addresses.iter(),
326 &network_config.transport,
327 )?;
328
329 let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
330
331 if let Some(path) = &network_config.net_config_path {
332 fs::create_dir_all(path)?;
333 }
334
335 info!(
336 target: LOG_TARGET,
337 "🏷 Local node identity is: {}",
338 local_peer_id.to_base58(),
339 );
340 info!(target: LOG_TARGET, "Running libp2p network backend");
341
342 let (transport, bandwidth) = {
343 let config_mem = match network_config.transport {
344 TransportConfig::MemoryOnly => true,
345 TransportConfig::Normal { .. } => false,
346 };
347
348 transport::build_transport(local_identity.clone().into(), config_mem)
349 };
350
351 let (to_notifications, from_protocol_controllers) =
352 tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
353
354 let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
356 .chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
357
358 let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
359 .enumerate()
360 .map(|(set_id, set_config)| {
361 let proto_set_config = ProtoSetConfig {
362 in_peers: set_config.in_peers,
363 out_peers: set_config.out_peers,
364 reserved_nodes: set_config
365 .reserved_nodes
366 .iter()
367 .map(|node| node.peer_id.into())
368 .collect(),
369 reserved_only: set_config.non_reserved_mode.is_reserved_only(),
370 };
371
372 ProtocolController::new(
373 SetId::from(set_id),
374 proto_set_config,
375 to_notifications.clone(),
376 Arc::clone(&peer_store_handle),
377 )
378 })
379 .unzip();
380
381 let sync_protocol_handle = protocol_handles[0].clone();
383
384 protocol_controllers
386 .into_iter()
387 .for_each(|controller| (params.executor)(controller.run().boxed()));
388
389 let notification_protocol_ids: HashMap<ProtocolName, SetId> =
392 iter::once(¶ms.block_announce_config)
393 .chain(notification_protocols.iter())
394 .enumerate()
395 .map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
396 .collect();
397
398 let known_addresses = {
399 let mut addresses: Vec<_> = network_config
401 .default_peers_set
402 .reserved_nodes
403 .iter()
404 .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
405 .chain(notification_protocols.iter().flat_map(|protocol| {
406 protocol
407 .set_config()
408 .reserved_nodes
409 .iter()
410 .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
411 }))
412 .chain(
413 network_config
414 .boot_nodes
415 .iter()
416 .map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
417 )
418 .collect();
419
420 addresses.sort();
422 addresses.dedup();
423
424 addresses
425 };
426
427 network_config.boot_nodes.iter().try_for_each(|bootnode| {
429 if let Some(other) = network_config
430 .boot_nodes
431 .iter()
432 .filter(|o| o.multiaddr == bootnode.multiaddr)
433 .find(|o| o.peer_id != bootnode.peer_id)
434 {
435 Err(Error::DuplicateBootnode {
436 address: bootnode.multiaddr.clone().into(),
437 first_id: bootnode.peer_id.into(),
438 second_id: other.peer_id.into(),
439 })
440 } else {
441 Ok(())
442 }
443 })?;
444
445 let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
447
448 for bootnode in network_config.boot_nodes.iter() {
449 boot_node_ids
450 .entry(bootnode.peer_id.into())
451 .or_default()
452 .push(bootnode.multiaddr.clone().into());
453 }
454
455 let boot_node_ids = Arc::new(boot_node_ids);
456
457 let num_connected = Arc::new(AtomicUsize::new(0));
458 let external_addresses = Arc::new(Mutex::new(HashSet::new()));
459
460 let (protocol, notif_protocol_handles) = Protocol::new(
461 From::from(¶ms.role),
462 params.notification_metrics,
463 notification_protocols,
464 params.block_announce_config,
465 Arc::clone(&peer_store_handle),
466 protocol_handles.clone(),
467 from_protocol_controllers,
468 )?;
469
470 let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
472 let user_agent =
473 format!("{} ({})", network_config.client_version, network_config.node_name);
474
475 let discovery_config = {
476 let mut config = DiscoveryConfig::new(local_peer_id);
477 config.with_permanent_addresses(
478 known_addresses
479 .iter()
480 .map(|(peer, address)| (peer.into(), address.clone().into()))
481 .collect::<Vec<_>>(),
482 );
483 config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
484 config.with_kademlia(
485 params.genesis_hash,
486 params.fork_id.as_deref(),
487 ¶ms.protocol_id,
488 );
489 config.with_dht_random_walk(network_config.enable_dht_random_walk);
490 config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
491 config.use_kademlia_disjoint_query_paths(
492 network_config.kademlia_disjoint_query_paths,
493 );
494 config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
495
496 match network_config.transport {
497 TransportConfig::MemoryOnly => {
498 config.with_mdns(false);
499 config.allow_private_ip(false);
500 },
501 TransportConfig::Normal {
502 enable_mdns,
503 allow_private_ip: allow_private_ipv4,
504 ..
505 } => {
506 config.with_mdns(enable_mdns);
507 config.allow_private_ip(allow_private_ipv4);
508 },
509 }
510
511 config
512 };
513
514 let behaviour = {
515 let result = Behaviour::new(
516 protocol,
517 user_agent,
518 local_public.into(),
519 discovery_config,
520 request_response_protocols,
521 Arc::clone(&peer_store_handle),
522 external_addresses.clone(),
523 network_config.public_addresses.iter().cloned().map(Into::into).collect(),
524 ConnectionLimits::default()
525 .with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
526 .with_max_established_incoming(Some(
527 crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
528 )),
529 );
530
531 match result {
532 Ok(b) => b,
533 Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
534 return Err(Error::DuplicateRequestResponseProtocol { protocol: proto })
535 },
536 }
537 };
538
539 let swarm = {
540 struct SpawnImpl<F>(F);
541 impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
542 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
543 (self.0)(f)
544 }
545 }
546
547 let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
548 .with_substream_upgrade_protocol_override(upgrade::Version::V1)
549 .with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
550 .with_per_connection_event_buffer_size(24)
553 .with_max_negotiating_inbound_streams(2048)
554 .with_idle_connection_timeout(network_config.idle_connection_timeout);
555
556 Swarm::new(transport, behaviour, local_peer_id, config)
557 };
558
559 (swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
560 };
561
562 let metrics = match ¶ms.metrics_registry {
564 Some(registry) => Some(metrics::register(
565 registry,
566 MetricSources {
567 bandwidth: bandwidth.clone(),
568 connected_peers: num_connected.clone(),
569 },
570 )?),
571 None => None,
572 };
573
574 for addr in &network_config.listen_addresses {
576 if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
577 warn!(target: LOG_TARGET, "Can't listen on {} because: {:?}", addr, err)
578 }
579 }
580
581 for addr in &network_config.public_addresses {
583 Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
584 }
585
586 let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
587
588 let service = Arc::new(NetworkService {
589 bandwidth,
590 external_addresses,
591 listen_addresses: listen_addresses_set.clone(),
592 num_connected: num_connected.clone(),
593 local_peer_id,
594 local_identity: local_identity.into(),
595 to_worker,
596 notification_protocol_ids,
597 protocol_handles,
598 sync_protocol_handle,
599 peer_store_handle: Arc::clone(&peer_store_handle),
600 _marker: PhantomData,
601 _block: Default::default(),
602 });
603
604 Ok(NetworkWorker {
605 listen_addresses: listen_addresses_set,
606 num_connected,
607 network_service: swarm,
608 service,
609 from_service,
610 event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
611 metrics,
612 boot_node_ids,
613 reported_invalid_boot_nodes: Default::default(),
614 peer_store_handle: Arc::clone(&peer_store_handle),
615 notif_protocol_handles,
616 _marker: Default::default(),
617 _block: Default::default(),
618 })
619 }
620
621 pub fn status(&self) -> NetworkStatus {
623 NetworkStatus {
624 num_connected_peers: self.num_connected_peers(),
625 total_bytes_inbound: self.total_bytes_inbound(),
626 total_bytes_outbound: self.total_bytes_outbound(),
627 }
628 }
629
630 pub fn total_bytes_inbound(&self) -> u64 {
632 self.service.bandwidth.total_inbound()
633 }
634
635 pub fn total_bytes_outbound(&self) -> u64 {
637 self.service.bandwidth.total_outbound()
638 }
639
640 pub fn num_connected_peers(&self) -> usize {
642 self.network_service.behaviour().user_protocol().num_sync_peers()
643 }
644
645 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
647 self.network_service.behaviour_mut().add_known_address(peer_id, addr);
648 }
649
650 pub fn service(&self) -> &Arc<NetworkService<B, H>> {
653 &self.service
654 }
655
656 pub fn local_peer_id(&self) -> &PeerId {
658 Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
659 }
660
661 pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
665 Swarm::<Behaviour<B>>::listeners(&self.network_service)
666 }
667
668 pub fn network_state(&mut self) -> NetworkState {
673 let swarm = &mut self.network_service;
674 let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
675 let connected_peers = {
676 let swarm = &mut *swarm;
677 open.iter()
678 .filter_map(move |peer_id| {
679 let known_addresses = if let Ok(addrs) =
680 NetworkBehaviour::handle_pending_outbound_connection(
681 swarm.behaviour_mut(),
682 ConnectionId::new_unchecked(0), Some(*peer_id),
684 &vec![],
685 Endpoint::Listener,
686 ) {
687 addrs.into_iter().collect()
688 } else {
689 error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
690 return None;
691 };
692
693 let endpoint = if let Some(e) =
694 swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
695 {
696 e.clone().into()
697 } else {
698 error!(target: LOG_TARGET, "Found state inconsistency between custom protocol \
699 and debug information about {:?}", peer_id);
700 return None;
701 };
702
703 Some((
704 peer_id.to_base58(),
705 NetworkStatePeer {
706 endpoint,
707 version_string: swarm
708 .behaviour_mut()
709 .node(peer_id)
710 .and_then(|i| i.client_version().map(|s| s.to_owned())),
711 latest_ping_time: swarm
712 .behaviour_mut()
713 .node(peer_id)
714 .and_then(|i| i.latest_ping()),
715 known_addresses,
716 },
717 ))
718 })
719 .collect()
720 };
721
722 let not_connected_peers = {
723 let swarm = &mut *swarm;
724 swarm
725 .behaviour_mut()
726 .known_peers()
727 .into_iter()
728 .filter(|p| open.iter().all(|n| n != p))
729 .map(move |peer_id| {
730 let known_addresses = if let Ok(addrs) =
731 NetworkBehaviour::handle_pending_outbound_connection(
732 swarm.behaviour_mut(),
733 ConnectionId::new_unchecked(0), Some(peer_id),
735 &vec![],
736 Endpoint::Listener,
737 ) {
738 addrs.into_iter().collect()
739 } else {
740 error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
741 Default::default()
742 };
743
744 (
745 peer_id.to_base58(),
746 NetworkStateNotConnectedPeer {
747 version_string: swarm
748 .behaviour_mut()
749 .node(&peer_id)
750 .and_then(|i| i.client_version().map(|s| s.to_owned())),
751 latest_ping_time: swarm
752 .behaviour_mut()
753 .node(&peer_id)
754 .and_then(|i| i.latest_ping()),
755 known_addresses,
756 },
757 )
758 })
759 .collect()
760 };
761
762 let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
763 let listened_addresses = swarm.listeners().cloned().collect();
764 let external_addresses = swarm.external_addresses().cloned().collect();
765
766 NetworkState {
767 peer_id,
768 listened_addresses,
769 external_addresses,
770 connected_peers,
771 not_connected_peers,
772 peerset: serde_json::json!(
775 "Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
776 ),
777 }
778 }
779
780 pub fn remove_reserved_peer(&self, peer: PeerId) {
782 self.service.remove_reserved_peer(peer.into());
783 }
784
785 pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
787 self.service.add_reserved_peer(peer)
788 }
789}
790
791impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
792 pub async fn network_state(&self) -> Result<NetworkState, ()> {
799 let (tx, rx) = oneshot::channel();
800
801 let _ = self
802 .to_worker
803 .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
804
805 match rx.await {
806 Ok(v) => v.map_err(|_| ()),
807 Err(_) => Err(()),
809 }
810 }
811
812 fn split_multiaddr_and_peer_id(
817 &self,
818 peers: HashSet<Multiaddr>,
819 ) -> Result<Vec<(PeerId, Multiaddr)>, String> {
820 peers
821 .into_iter()
822 .map(|mut addr| {
823 let peer = match addr.pop() {
824 Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
825 _ => return Err("Missing PeerId from address".to_string()),
826 };
827
828 if peer == self.local_peer_id {
831 Err("Local peer ID in peer set.".to_string())
832 } else {
833 Ok((peer, addr))
834 }
835 })
836 .collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
837 }
838}
839
840impl<B, H> NetworkStateInfo for NetworkService<B, H>
841where
842 B: sp_runtime::traits::Block,
843 H: ExHashT,
844{
845 fn external_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
847 self.external_addresses.lock().iter().cloned().map(Into::into).collect()
848 }
849
850 fn listen_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
852 self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
853 }
854
855 fn local_peer_id(&self) -> sc_network_types::PeerId {
857 self.local_peer_id.into()
858 }
859}
860
861impl<B, H> NetworkSigner for NetworkService<B, H>
862where
863 B: sp_runtime::traits::Block,
864 H: ExHashT,
865{
866 fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
867 let public_key = self.local_identity.public();
868 let bytes = self.local_identity.sign(msg.as_ref())?;
869
870 Ok(Signature {
871 public_key: crate::service::signature::PublicKey::Libp2p(public_key),
872 bytes,
873 })
874 }
875
876 fn verify(
877 &self,
878 peer_id: sc_network_types::PeerId,
879 public_key: &Vec<u8>,
880 signature: &Vec<u8>,
881 message: &Vec<u8>,
882 ) -> Result<bool, String> {
883 let public_key =
884 PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
885 let peer_id: PeerId = peer_id.into();
886 let remote: libp2p::PeerId = public_key.to_peer_id();
887
888 Ok(peer_id == remote && public_key.verify(message, signature))
889 }
890}
891
892impl<B, H> NetworkDHTProvider for NetworkService<B, H>
893where
894 B: BlockT + 'static,
895 H: ExHashT,
896{
897 fn find_closest_peers(&self, target: sc_network_types::PeerId) {
902 let _ = self
903 .to_worker
904 .unbounded_send(ServiceToWorkerMsg::FindClosestPeers(target.into()));
905 }
906
907 fn get_value(&self, key: &KademliaKey) {
912 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
913 }
914
915 fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
920 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
921 }
922
923 fn put_record_to(
924 &self,
925 record: Record,
926 peers: HashSet<sc_network_types::PeerId>,
927 update_local_storage: bool,
928 ) {
929 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
930 record,
931 peers,
932 update_local_storage,
933 });
934 }
935
936 fn store_record(
937 &self,
938 key: KademliaKey,
939 value: Vec<u8>,
940 publisher: Option<sc_network_types::PeerId>,
941 expires: Option<Instant>,
942 ) {
943 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
944 key,
945 value,
946 publisher.map(Into::into),
947 expires,
948 ));
949 }
950
951 fn start_providing(&self, key: KademliaKey) {
952 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
953 }
954
955 fn stop_providing(&self, key: KademliaKey) {
956 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
957 }
958
959 fn get_providers(&self, key: KademliaKey) {
960 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
961 }
962}
963
964#[async_trait::async_trait]
965impl<B, H> NetworkStatusProvider for NetworkService<B, H>
966where
967 B: BlockT + 'static,
968 H: ExHashT,
969{
970 async fn status(&self) -> Result<NetworkStatus, ()> {
971 let (tx, rx) = oneshot::channel();
972
973 let _ = self
974 .to_worker
975 .unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
976
977 match rx.await {
978 Ok(v) => v.map_err(|_| ()),
979 Err(_) => Err(()),
981 }
982 }
983
984 async fn network_state(&self) -> Result<NetworkState, ()> {
985 let (tx, rx) = oneshot::channel();
986
987 let _ = self
988 .to_worker
989 .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
990
991 match rx.await {
992 Ok(v) => v.map_err(|_| ()),
993 Err(_) => Err(()),
995 }
996 }
997}
998
999#[async_trait::async_trait]
1000impl<B, H> NetworkPeers for NetworkService<B, H>
1001where
1002 B: BlockT + 'static,
1003 H: ExHashT,
1004{
1005 fn set_authorized_peers(&self, peers: HashSet<sc_network_types::PeerId>) {
1006 self.sync_protocol_handle
1007 .set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
1008 }
1009
1010 fn set_authorized_only(&self, reserved_only: bool) {
1011 self.sync_protocol_handle.set_reserved_only(reserved_only);
1012 }
1013
1014 fn add_known_address(
1015 &self,
1016 peer_id: sc_network_types::PeerId,
1017 addr: sc_network_types::multiaddr::Multiaddr,
1018 ) {
1019 let _ = self
1020 .to_worker
1021 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1022 }
1023
1024 fn report_peer(&self, peer_id: sc_network_types::PeerId, cost_benefit: ReputationChange) {
1025 self.peer_store_handle.report_peer(peer_id, cost_benefit);
1026 }
1027
1028 fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
1029 self.peer_store_handle.peer_reputation(peer_id)
1030 }
1031
1032 fn disconnect_peer(&self, peer_id: sc_network_types::PeerId, protocol: ProtocolName) {
1033 let _ = self
1034 .to_worker
1035 .unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1036 }
1037
1038 fn accept_unreserved_peers(&self) {
1039 self.sync_protocol_handle.set_reserved_only(false);
1040 }
1041
1042 fn deny_unreserved_peers(&self) {
1043 self.sync_protocol_handle.set_reserved_only(true);
1044 }
1045
1046 fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1047 if peer.peer_id == self.local_peer_id.into() {
1049 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1050 }
1051
1052 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1053 peer.peer_id.into(),
1054 peer.multiaddr.into(),
1055 ));
1056 self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1057
1058 Ok(())
1059 }
1060
1061 fn remove_reserved_peer(&self, peer_id: sc_network_types::PeerId) {
1062 self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1063 }
1064
1065 fn set_reserved_peers(
1066 &self,
1067 protocol: ProtocolName,
1068 peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1069 ) -> Result<(), String> {
1070 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1071 return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol));
1072 };
1073
1074 let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1075 let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1076
1077 let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1078
1079 for (peer_id, addr) in peers_addrs.into_iter() {
1080 if peer_id == self.local_peer_id {
1082 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1083 }
1084
1085 peers.insert(peer_id.into());
1086
1087 if !addr.is_empty() {
1088 let _ = self
1089 .to_worker
1090 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1091 }
1092 }
1093
1094 self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1095
1096 Ok(())
1097 }
1098
1099 fn add_peers_to_reserved_set(
1100 &self,
1101 protocol: ProtocolName,
1102 peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1103 ) -> Result<(), String> {
1104 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1105 return Err(format!(
1106 "Cannot add peers to reserved set of unknown protocol: {}",
1107 protocol
1108 ));
1109 };
1110
1111 let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1112 let peers = self.split_multiaddr_and_peer_id(peers)?;
1113
1114 for (peer_id, addr) in peers.into_iter() {
1115 if peer_id == self.local_peer_id {
1117 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1118 }
1119
1120 if !addr.is_empty() {
1121 let _ = self
1122 .to_worker
1123 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1124 }
1125
1126 self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1127 }
1128
1129 Ok(())
1130 }
1131
1132 fn remove_peers_from_reserved_set(
1133 &self,
1134 protocol: ProtocolName,
1135 peers: Vec<sc_network_types::PeerId>,
1136 ) -> Result<(), String> {
1137 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1138 return Err(format!(
1139 "Cannot remove peers from reserved set of unknown protocol: {}",
1140 protocol
1141 ));
1142 };
1143
1144 for peer_id in peers.into_iter() {
1145 self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1146 }
1147
1148 Ok(())
1149 }
1150
1151 fn sync_num_connected(&self) -> usize {
1152 self.num_connected.load(Ordering::Relaxed)
1153 }
1154
1155 fn peer_role(
1156 &self,
1157 peer_id: sc_network_types::PeerId,
1158 handshake: Vec<u8>,
1159 ) -> Option<ObservedRole> {
1160 match Roles::decode_all(&mut &handshake[..]) {
1161 Ok(role) => Some(role.into()),
1162 Err(_) => {
1163 log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
1164 self.peer_store_handle.peer_role(&(peer_id.into()))
1165 },
1166 }
1167 }
1168
1169 async fn reserved_peers(&self) -> Result<Vec<sc_network_types::PeerId>, ()> {
1173 let (tx, rx) = oneshot::channel();
1174
1175 self.sync_protocol_handle.reserved_peers(tx);
1176
1177 rx.await
1179 .map(|peers| peers.into_iter().map(From::from).collect())
1180 .map_err(|_| ())
1181 }
1182}
1183
1184impl<B, H> NetworkEventStream for NetworkService<B, H>
1185where
1186 B: BlockT + 'static,
1187 H: ExHashT,
1188{
1189 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1190 let (tx, rx) = out_events::channel(name, 100_000);
1191 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1192 Box::pin(rx)
1193 }
1194}
1195
1196#[async_trait::async_trait]
1197impl<B, H> NetworkRequest for NetworkService<B, H>
1198where
1199 B: BlockT + 'static,
1200 H: ExHashT,
1201{
1202 async fn request(
1203 &self,
1204 target: sc_network_types::PeerId,
1205 protocol: ProtocolName,
1206 request: Vec<u8>,
1207 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1208 connect: IfDisconnected,
1209 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1210 let (tx, rx) = oneshot::channel();
1211
1212 self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1213
1214 match rx.await {
1215 Ok(v) => v,
1216 Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1220 }
1221 }
1222
1223 fn start_request(
1224 &self,
1225 target: sc_network_types::PeerId,
1226 protocol: ProtocolName,
1227 request: Vec<u8>,
1228 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1229 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1230 connect: IfDisconnected,
1231 ) {
1232 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1233 target: target.into(),
1234 protocol: protocol.into(),
1235 request,
1236 fallback_request,
1237 pending_response: tx,
1238 connect,
1239 });
1240 }
1241}
1242
1243#[must_use]
1245pub struct NotificationSender {
1246 sink: NotificationsSink,
1247
1248 protocol_name: ProtocolName,
1250
1251 notification_size_metric: Option<Histogram>,
1254}
1255
1256#[async_trait::async_trait]
1257impl NotificationSenderT for NotificationSender {
1258 async fn ready(
1259 &self,
1260 ) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1261 Ok(Box::new(NotificationSenderReady {
1262 ready: match self.sink.reserve_notification().await {
1263 Ok(r) => Some(r),
1264 Err(()) => return Err(NotificationSenderError::Closed),
1265 },
1266 peer_id: self.sink.peer_id(),
1267 protocol_name: &self.protocol_name,
1268 notification_size_metric: self.notification_size_metric.clone(),
1269 }))
1270 }
1271}
1272
1273#[must_use]
1275pub struct NotificationSenderReady<'a> {
1276 ready: Option<Ready<'a>>,
1277
1278 peer_id: &'a PeerId,
1280
1281 protocol_name: &'a ProtocolName,
1283
1284 notification_size_metric: Option<Histogram>,
1287}
1288
1289impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1290 fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1291 if let Some(notification_size_metric) = &self.notification_size_metric {
1292 notification_size_metric.observe(notification.len() as f64);
1293 }
1294
1295 trace!(
1296 target: LOG_TARGET,
1297 "External API => Notification({:?}, {}, {} bytes)",
1298 self.peer_id, self.protocol_name, notification.len(),
1299 );
1300 trace!(target: LOG_TARGET, "Handler({:?}) <= Async notification", self.peer_id);
1301
1302 self.ready
1303 .take()
1304 .ok_or(NotificationSenderError::Closed)?
1305 .send(notification)
1306 .map_err(|()| NotificationSenderError::Closed)
1307 }
1308}
1309
1310enum ServiceToWorkerMsg {
1314 FindClosestPeers(PeerId),
1315 GetValue(KademliaKey),
1316 PutValue(KademliaKey, Vec<u8>),
1317 PutRecordTo {
1318 record: Record,
1319 peers: HashSet<sc_network_types::PeerId>,
1320 update_local_storage: bool,
1321 },
1322 StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1323 StartProviding(KademliaKey),
1324 StopProviding(KademliaKey),
1325 GetProviders(KademliaKey),
1326 AddKnownAddress(PeerId, Multiaddr),
1327 EventStream(out_events::Sender),
1328 Request {
1329 target: PeerId,
1330 protocol: ProtocolName,
1331 request: Vec<u8>,
1332 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1333 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1334 connect: IfDisconnected,
1335 },
1336 NetworkStatus {
1337 pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1338 },
1339 NetworkState {
1340 pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1341 },
1342 DisconnectPeer(PeerId, ProtocolName),
1343}
1344
1345#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1349pub struct NetworkWorker<B, H>
1350where
1351 B: BlockT + 'static,
1352 H: ExHashT,
1353{
1354 listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1356 num_connected: Arc<AtomicUsize>,
1358 service: Arc<NetworkService<B, H>>,
1360 network_service: Swarm<Behaviour<B>>,
1362 from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1364 event_streams: out_events::OutChannels,
1366 metrics: Option<Metrics>,
1368 boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1370 reported_invalid_boot_nodes: HashSet<PeerId>,
1372 peer_store_handle: Arc<dyn PeerStoreProvider>,
1374 notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1376 _marker: PhantomData<H>,
1379 _block: PhantomData<B>,
1381}
1382
1383impl<B, H> NetworkWorker<B, H>
1384where
1385 B: BlockT + 'static,
1386 H: ExHashT,
1387{
1388 pub async fn run(mut self) {
1390 while self.next_action().await {}
1391 }
1392
1393 pub async fn next_action(&mut self) -> bool {
1398 futures::select! {
1399 msg = self.from_service.next() => {
1401 if let Some(msg) = msg {
1402 self.handle_worker_message(msg);
1403 } else {
1404 return false
1405 }
1406 },
1407 event = self.network_service.select_next_some() => {
1409 self.handle_swarm_event(event);
1410 },
1411 };
1412
1413 let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1415 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1416
1417 if let Some(metrics) = self.metrics.as_ref() {
1418 if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1419 for (lower_ilog2_bucket_bound, num_entries) in buckets {
1420 metrics
1421 .kbuckets_num_nodes
1422 .with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1423 .set(num_entries as u64);
1424 }
1425 }
1426 if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1427 metrics.kademlia_records_count.set(num_entries as u64);
1428 }
1429 if let Some(num_entries) =
1430 self.network_service.behaviour_mut().kademlia_records_total_size()
1431 {
1432 metrics.kademlia_records_sizes_total.set(num_entries as u64);
1433 }
1434
1435 metrics.pending_connections.set(
1436 Swarm::network_info(&self.network_service).connection_counters().num_pending()
1437 as u64,
1438 );
1439 }
1440
1441 true
1442 }
1443
1444 fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1446 match msg {
1447 ServiceToWorkerMsg::FindClosestPeers(target) => {
1448 self.network_service.behaviour_mut().find_closest_peers(target)
1449 },
1450 ServiceToWorkerMsg::GetValue(key) => {
1451 self.network_service.behaviour_mut().get_value(key.into())
1452 },
1453 ServiceToWorkerMsg::PutValue(key, value) => {
1454 self.network_service.behaviour_mut().put_value(key.into(), value)
1455 },
1456 ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1457 .network_service
1458 .behaviour_mut()
1459 .put_record_to(record.into(), peers, update_local_storage),
1460 ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1461 .network_service
1462 .behaviour_mut()
1463 .store_record(key.into(), value, publisher, expires),
1464 ServiceToWorkerMsg::StartProviding(key) => {
1465 self.network_service.behaviour_mut().start_providing(key.into())
1466 },
1467 ServiceToWorkerMsg::StopProviding(key) => {
1468 self.network_service.behaviour_mut().stop_providing(&key.into())
1469 },
1470 ServiceToWorkerMsg::GetProviders(key) => {
1471 self.network_service.behaviour_mut().get_providers(key.into())
1472 },
1473 ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => {
1474 self.network_service.behaviour_mut().add_known_address(peer_id, addr)
1475 },
1476 ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1477 ServiceToWorkerMsg::Request {
1478 target,
1479 protocol,
1480 request,
1481 fallback_request,
1482 pending_response,
1483 connect,
1484 } => {
1485 self.network_service.behaviour_mut().send_request(
1486 &target,
1487 protocol,
1488 request,
1489 fallback_request,
1490 pending_response,
1491 connect,
1492 );
1493 },
1494 ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1495 let _ = pending_response.send(Ok(self.status()));
1496 },
1497 ServiceToWorkerMsg::NetworkState { pending_response } => {
1498 let _ = pending_response.send(Ok(self.network_state()));
1499 },
1500 ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1501 .network_service
1502 .behaviour_mut()
1503 .user_protocol_mut()
1504 .disconnect_peer(&who, protocol_name),
1505 }
1506 }
1507
1508 fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut>) {
1510 match event {
1511 SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1512 if let Some(metrics) = self.metrics.as_ref() {
1513 match result {
1514 Ok(serve_time) => {
1515 metrics
1516 .requests_in_success_total
1517 .with_label_values(&[&protocol])
1518 .observe(serve_time.as_secs_f64());
1519 },
1520 Err(err) => {
1521 let reason = match err {
1522 ResponseFailure::Network(InboundFailure::Timeout) => {
1523 Some("timeout")
1524 },
1525 ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1526 {
1531 None
1532 },
1533 ResponseFailure::Network(InboundFailure::ResponseOmission) => {
1534 Some("busy-omitted")
1535 },
1536 ResponseFailure::Network(InboundFailure::ConnectionClosed) => {
1537 Some("connection-closed")
1538 },
1539 ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"),
1540 };
1541
1542 if let Some(reason) = reason {
1543 metrics
1544 .requests_in_failure_total
1545 .with_label_values(&[&protocol, reason])
1546 .inc();
1547 }
1548 },
1549 }
1550 }
1551 },
1552 SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1553 protocol,
1554 duration,
1555 result,
1556 ..
1557 }) => {
1558 if let Some(metrics) = self.metrics.as_ref() {
1559 match result {
1560 Ok(_) => {
1561 metrics
1562 .requests_out_success_total
1563 .with_label_values(&[&protocol])
1564 .observe(duration.as_secs_f64());
1565 },
1566 Err(err) => {
1567 let reason = match err {
1568 RequestFailure::NotConnected => "not-connected",
1569 RequestFailure::UnknownProtocol => "unknown-protocol",
1570 RequestFailure::InvalidRequest => "invalid-request",
1571 RequestFailure::Refused => "refused",
1572 RequestFailure::Obsolete => "obsolete",
1573 RequestFailure::Network(OutboundFailure::DialFailure) => {
1574 "dial-failure"
1575 },
1576 RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1577 RequestFailure::Network(OutboundFailure::ConnectionClosed) => {
1578 "connection-closed"
1579 },
1580 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1581 "unsupported"
1582 },
1583 RequestFailure::Network(OutboundFailure::Io(_)) => "io",
1584 };
1585
1586 metrics
1587 .requests_out_failure_total
1588 .with_label_values(&[&protocol, reason])
1589 .inc();
1590 },
1591 }
1592 }
1593 },
1594 SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1595 for change in changes {
1596 self.peer_store_handle.report_peer(peer.into(), change);
1597 }
1598 },
1599 SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1600 peer_id,
1601 info:
1602 IdentifyInfo {
1603 protocol_version, agent_version, mut listen_addrs, protocols, ..
1604 },
1605 }) => {
1606 if listen_addrs.len() > 30 {
1607 debug!(
1608 target: LOG_TARGET,
1609 "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1610 peer_id, protocol_version, agent_version
1611 );
1612 listen_addrs.truncate(30);
1613 }
1614 for addr in listen_addrs {
1615 self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1616 &peer_id,
1617 &protocols,
1618 addr.clone(),
1619 );
1620 }
1621 self.peer_store_handle.add_known_peer(peer_id.into());
1622 },
1623 SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1624 self.peer_store_handle.add_known_peer(peer_id.into());
1625 },
1626 SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1627 if let Some(metrics) = self.metrics.as_ref() {
1628 metrics.kademlia_random_queries_total.inc();
1629 }
1630 },
1631 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1632 remote,
1633 set_id,
1634 direction,
1635 negotiated_fallback,
1636 notifications_sink,
1637 received_handshake,
1638 }) => {
1639 let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1640 remote,
1641 direction,
1642 received_handshake,
1643 negotiated_fallback,
1644 notifications_sink,
1645 );
1646 },
1647 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1648 remote,
1649 set_id,
1650 notifications_sink,
1651 }) => {
1652 let _ = self.notif_protocol_handles[usize::from(set_id)]
1653 .report_notification_sink_replaced(remote, notifications_sink);
1654
1655 },
1676 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1677 let _ = self.notif_protocol_handles[usize::from(set_id)]
1678 .report_substream_closed(remote);
1679 },
1680 SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1681 remote,
1682 set_id,
1683 notification,
1684 }) => {
1685 let _ = self.notif_protocol_handles[usize::from(set_id)]
1686 .report_notification_received(remote, notification);
1687 },
1688 SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1689 match (self.metrics.as_ref(), duration) {
1690 (Some(metrics), Some(duration)) => {
1691 let query_type = match event {
1692 DhtEvent::ClosestPeersFound(_, _) => "peers-found",
1693 DhtEvent::ClosestPeersNotFound(_) => "peers-not-found",
1694 DhtEvent::ValueFound(_) => "value-found",
1695 DhtEvent::ValueNotFound(_) => "value-not-found",
1696 DhtEvent::ValuePut(_) => "value-put",
1697 DhtEvent::ValuePutFailed(_) => "value-put-failed",
1698 DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1699 DhtEvent::StartedProviding(_) => "started-providing",
1700 DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
1701 DhtEvent::ProvidersFound(_, _) => "providers-found",
1702 DhtEvent::NoMoreProviders(_) => "no-more-providers",
1703 DhtEvent::ProvidersNotFound(_) => "providers-not-found",
1704 };
1705 metrics
1706 .kademlia_query_duration
1707 .with_label_values(&[query_type])
1708 .observe(duration.as_secs_f64());
1709 },
1710 _ => {},
1711 }
1712
1713 self.event_streams.send(Event::Dht(event));
1714 },
1715 SwarmEvent::Behaviour(BehaviourOut::None) => {
1716 },
1718 SwarmEvent::ConnectionEstablished {
1719 peer_id,
1720 endpoint,
1721 num_established,
1722 concurrent_dial_errors,
1723 ..
1724 } => {
1725 if let Some(errors) = concurrent_dial_errors {
1726 debug!(target: LOG_TARGET, "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1727 } else {
1728 debug!(target: LOG_TARGET, "Libp2p => Connected({:?})", peer_id);
1729 }
1730
1731 if let Some(metrics) = self.metrics.as_ref() {
1732 let direction = match endpoint {
1733 ConnectedPoint::Dialer { .. } => "out",
1734 ConnectedPoint::Listener { .. } => "in",
1735 };
1736 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1737
1738 if num_established.get() == 1 {
1739 metrics.distinct_peers_connections_opened_total.inc();
1740 }
1741 }
1742 },
1743 SwarmEvent::ConnectionClosed {
1744 connection_id,
1745 peer_id,
1746 cause,
1747 endpoint,
1748 num_established,
1749 } => {
1750 debug!(target: LOG_TARGET, "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1751 if let Some(metrics) = self.metrics.as_ref() {
1752 let direction = match endpoint {
1753 ConnectedPoint::Dialer { .. } => "out",
1754 ConnectedPoint::Listener { .. } => "in",
1755 };
1756 let reason = match cause {
1757 Some(ConnectionError::IO(_)) => "transport-error",
1758 Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1759 None => "actively-closed",
1760 };
1761 metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1762
1763 if num_established == 0 {
1765 metrics.distinct_peers_connections_closed_total.inc();
1766 }
1767 }
1768 },
1769 SwarmEvent::NewListenAddr { address, .. } => {
1770 trace!(target: LOG_TARGET, "Libp2p => NewListenAddr({})", address);
1771 if let Some(metrics) = self.metrics.as_ref() {
1772 metrics.listeners_local_addresses.inc();
1773 }
1774 self.listen_addresses.lock().insert(address.clone());
1775 },
1776 SwarmEvent::ExpiredListenAddr { address, .. } => {
1777 info!(target: LOG_TARGET, "📪 No longer listening on {}", address);
1778 if let Some(metrics) = self.metrics.as_ref() {
1779 metrics.listeners_local_addresses.dec();
1780 }
1781 self.listen_addresses.lock().remove(&address);
1782 },
1783 SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1784 if let Some(peer_id) = peer_id {
1785 trace!(
1786 target: LOG_TARGET,
1787 "Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1788 );
1789
1790 let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1791
1792 if let Some(addresses) =
1793 not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1794 {
1795 if let DialError::WrongPeerId { obtained, endpoint } = &error {
1796 if let ConnectedPoint::Dialer {
1797 address,
1798 role_override: _,
1799 port_use: _,
1800 } = endpoint
1801 {
1802 let address_without_peer_id = parse_addr(address.clone().into())
1803 .map_or_else(|_| address.clone(), |r| r.1.into());
1804
1805 if addresses.iter().any(|a| address_without_peer_id == *a) {
1809 warn!(
1810 "💔 The bootnode you want to connect to at `{address}` provided a \
1811 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1812 );
1813
1814 self.reported_invalid_boot_nodes.insert(peer_id);
1815 }
1816 }
1817 }
1818 }
1819 }
1820
1821 if let Some(metrics) = self.metrics.as_ref() {
1822 let reason = match error {
1823 DialError::Denied { cause } => {
1824 if cause.downcast::<Exceeded>().is_ok() {
1825 Some("limit-reached")
1826 } else {
1827 None
1828 }
1829 },
1830 DialError::LocalPeerId { .. } => Some("local-peer-id"),
1831 DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1832 DialError::Transport(_) => Some("transport-error"),
1833 DialError::NoAddresses |
1834 DialError::DialPeerConditionFalse(_) |
1835 DialError::Aborted => None, };
1837 if let Some(reason) = reason {
1838 metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1839 }
1840 }
1841 },
1842 SwarmEvent::Dialing { connection_id, peer_id } => {
1843 trace!(target: LOG_TARGET, "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1844 },
1845 SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1846 trace!(target: LOG_TARGET, "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1847 if let Some(metrics) = self.metrics.as_ref() {
1848 metrics.incoming_connections_total.inc();
1849 }
1850 },
1851 SwarmEvent::IncomingConnectionError {
1852 connection_id,
1853 local_addr,
1854 send_back_addr,
1855 error,
1856 } => {
1857 debug!(
1858 target: LOG_TARGET,
1859 "Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1860 );
1861 if let Some(metrics) = self.metrics.as_ref() {
1862 let reason = match error {
1863 ListenError::Denied { cause } => {
1864 if cause.downcast::<Exceeded>().is_ok() {
1865 Some("limit-reached")
1866 } else {
1867 None
1868 }
1869 },
1870 ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => {
1871 Some("invalid-peer-id")
1872 },
1873 ListenError::Transport(_) => Some("transport-error"),
1874 ListenError::Aborted => None, };
1876
1877 if let Some(reason) = reason {
1878 metrics
1879 .incoming_connections_errors_total
1880 .with_label_values(&[reason])
1881 .inc();
1882 }
1883 }
1884 },
1885 SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1886 if let Some(metrics) = self.metrics.as_ref() {
1887 metrics.listeners_local_addresses.sub(addresses.len() as u64);
1888 }
1889 let mut listen_addresses = self.listen_addresses.lock();
1890 for addr in &addresses {
1891 listen_addresses.remove(addr);
1892 }
1893 drop(listen_addresses);
1894
1895 let addrs =
1896 addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1897 match reason {
1898 Ok(()) => error!(
1899 target: LOG_TARGET,
1900 "📪 Libp2p listener ({}) closed gracefully",
1901 addrs
1902 ),
1903 Err(e) => error!(
1904 target: LOG_TARGET,
1905 "📪 Libp2p listener ({}) closed: {}",
1906 addrs, e
1907 ),
1908 }
1909 },
1910 SwarmEvent::ListenerError { error, .. } => {
1911 debug!(target: LOG_TARGET, "Libp2p => ListenerError: {}", error);
1912 if let Some(metrics) = self.metrics.as_ref() {
1913 metrics.listeners_errors_total.inc();
1914 }
1915 },
1916 SwarmEvent::NewExternalAddrCandidate { address } => {
1917 trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrCandidate: {address:?}");
1918 },
1919 SwarmEvent::ExternalAddrConfirmed { address } => {
1920 trace!(target: LOG_TARGET, "Libp2p => ExternalAddrConfirmed: {address:?}");
1921 },
1922 SwarmEvent::ExternalAddrExpired { address } => {
1923 trace!(target: LOG_TARGET, "Libp2p => ExternalAddrExpired: {address:?}");
1924 },
1925 SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
1926 trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}")
1927 },
1928 event => {
1929 warn!(target: LOG_TARGET, "New unknown SwarmEvent libp2p event: {event:?}");
1930 },
1931 }
1932 }
1933}
1934
1935impl<B, H> Unpin for NetworkWorker<B, H>
1936where
1937 B: BlockT + 'static,
1938 H: ExHashT,
1939{
1940}
1941
1942pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1943 addresses: impl Iterator<Item = &'a sc_network_types::multiaddr::Multiaddr>,
1944 transport: &TransportConfig,
1945) -> Result<(), Error> {
1946 use sc_network_types::multiaddr::Protocol;
1947
1948 if matches!(transport, TransportConfig::MemoryOnly) {
1949 let addresses: Vec<_> = addresses
1950 .filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1951 .cloned()
1952 .collect();
1953
1954 if !addresses.is_empty() {
1955 return Err(Error::AddressesForAnotherTransport {
1956 transport: transport.clone(),
1957 addresses,
1958 });
1959 }
1960 } else {
1961 let addresses: Vec<_> = addresses
1962 .filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1963 .cloned()
1964 .collect();
1965
1966 if !addresses.is_empty() {
1967 return Err(Error::AddressesForAnotherTransport {
1968 transport: transport.clone(),
1969 addresses,
1970 });
1971 }
1972 }
1973
1974 Ok(())
1975}