referrerpolicy=no-referrer-when-downgrade

sc_network/litep2p/
service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkService` implementation for `litep2p`.
20
21use crate::{
22	config::MultiaddrWithPeerId,
23	litep2p::shim::{
24		notification::{config::ProtocolControlHandle, peerset::PeersetCommand},
25		request_response::OutboundRequest,
26	},
27	network_state::NetworkState,
28	peer_store::PeerStoreProvider,
29	service::out_events,
30	Event, IfDisconnected, NetworkDHTProvider, NetworkEventStream, NetworkPeers, NetworkRequest,
31	NetworkSigner, NetworkStateInfo, NetworkStatus, NetworkStatusProvider, OutboundFailure,
32	ProtocolName, RequestFailure, Signature,
33};
34
35use codec::DecodeAll;
36use futures::{channel::oneshot, stream::BoxStream};
37use libp2p::identity::SigningError;
38use litep2p::{
39	addresses::PublicAddresses, crypto::ed25519::Keypair,
40	types::multiaddr::Multiaddr as LiteP2pMultiaddr,
41};
42use parking_lot::RwLock;
43use sc_network_types::kad::{Key as KademliaKey, Record};
44
45use sc_network_common::{
46	role::{ObservedRole, Roles},
47	types::ReputationChange,
48};
49use sc_network_types::{
50	multiaddr::{Multiaddr, Protocol},
51	PeerId,
52};
53use sc_utils::mpsc::TracingUnboundedSender;
54
55use std::{
56	collections::{HashMap, HashSet},
57	sync::{atomic::Ordering, Arc},
58	time::Instant,
59};
60
61/// Logging target for the file.
62const LOG_TARGET: &str = "sub-libp2p";
63
64/// Commands sent by [`Litep2pNetworkService`] to
65/// [`Litep2pNetworkBackend`](super::Litep2pNetworkBackend).
66#[derive(Debug)]
67pub enum NetworkServiceCommand {
68	/// Find peers closest to `target` in the DHT.
69	FindClosestPeers {
70		/// Target peer ID.
71		target: PeerId,
72	},
73
74	/// Get value from DHT.
75	GetValue {
76		/// Record key.
77		key: KademliaKey,
78	},
79
80	/// Put value to DHT.
81	PutValue {
82		/// Record key.
83		key: KademliaKey,
84
85		/// Record value.
86		value: Vec<u8>,
87	},
88
89	/// Put value to DHT.
90	PutValueTo {
91		/// Record.
92		record: Record,
93		/// Peers we want to put the record.
94		peers: Vec<sc_network_types::PeerId>,
95		/// If we should update the local storage or not.
96		update_local_storage: bool,
97	},
98	/// Store record in the local DHT store.
99	StoreRecord {
100		/// Record key.
101		key: KademliaKey,
102
103		/// Record value.
104		value: Vec<u8>,
105
106		/// Original publisher of the record.
107		publisher: Option<PeerId>,
108
109		/// Record expiration time as measured by a local, monothonic clock.
110		expires: Option<Instant>,
111	},
112
113	/// Start providing `key`.
114	StartProviding { key: KademliaKey },
115
116	/// Stop providing `key`.
117	StopProviding { key: KademliaKey },
118
119	/// Get providers for `key`.
120	GetProviders { key: KademliaKey },
121
122	/// Query network status.
123	Status {
124		/// `oneshot::Sender` for sending the status.
125		tx: oneshot::Sender<NetworkStatus>,
126	},
127
128	/// Add `peers` to `protocol`'s reserved set.
129	AddPeersToReservedSet {
130		/// Protocol.
131		protocol: ProtocolName,
132
133		/// Reserved peers.
134		peers: HashSet<Multiaddr>,
135	},
136
137	/// Add known address for peer.
138	AddKnownAddress {
139		/// Peer ID.
140		peer: PeerId,
141
142		/// Address.
143		address: Multiaddr,
144	},
145
146	/// Set reserved peers for `protocol`.
147	SetReservedPeers {
148		/// Protocol.
149		protocol: ProtocolName,
150
151		/// Reserved peers.
152		peers: HashSet<Multiaddr>,
153	},
154
155	/// Disconnect peer from protocol.
156	DisconnectPeer {
157		/// Protocol.
158		protocol: ProtocolName,
159
160		/// Peer ID.
161		peer: PeerId,
162	},
163
164	/// Set protocol to reserved only (true/false) mode.
165	SetReservedOnly {
166		/// Protocol.
167		protocol: ProtocolName,
168
169		/// Reserved only?
170		reserved_only: bool,
171	},
172
173	/// Remove reserved peers from protocol.
174	RemoveReservedPeers {
175		/// Protocol.
176		protocol: ProtocolName,
177
178		/// Peers to remove from the reserved set.
179		peers: HashSet<PeerId>,
180	},
181
182	/// Create event stream for DHT events.
183	EventStream {
184		/// Sender for the events.
185		tx: out_events::Sender,
186	},
187}
188
189/// `NetworkService` implementation for `litep2p`.
190#[derive(Debug, Clone)]
191pub struct Litep2pNetworkService {
192	/// Local peer ID.
193	local_peer_id: litep2p::PeerId,
194
195	/// The `KeyPair` that defines the `PeerId` of the local node.
196	keypair: Keypair,
197
198	/// TX channel for sending commands to [`Litep2pNetworkBackend`](super::Litep2pNetworkBackend).
199	cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
200
201	/// Handle to `PeerStore`.
202	peer_store_handle: Arc<dyn PeerStoreProvider>,
203
204	/// Peerset handles.
205	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
206
207	/// Name for the block announce protocol.
208	block_announce_protocol: ProtocolName,
209
210	/// Installed request-response protocols.
211	request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
212
213	/// Listen addresses.
214	listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
215
216	/// External addresses.
217	external_addresses: PublicAddresses,
218}
219
220impl Litep2pNetworkService {
221	/// Create new [`Litep2pNetworkService`].
222	pub fn new(
223		local_peer_id: litep2p::PeerId,
224		keypair: Keypair,
225		cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
226		peer_store_handle: Arc<dyn PeerStoreProvider>,
227		peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
228		block_announce_protocol: ProtocolName,
229		request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
230		listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
231		external_addresses: PublicAddresses,
232	) -> Self {
233		Self {
234			local_peer_id,
235			keypair,
236			cmd_tx,
237			peer_store_handle,
238			peerset_handles,
239			block_announce_protocol,
240			request_response_protocols,
241			listen_addresses,
242			external_addresses,
243		}
244	}
245}
246
247impl NetworkSigner for Litep2pNetworkService {
248	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
249		let public_key = self.keypair.public();
250		let bytes = self.keypair.sign(msg.as_ref());
251
252		Ok(Signature {
253			public_key: crate::service::signature::PublicKey::Litep2p(
254				litep2p::crypto::PublicKey::Ed25519(public_key),
255			),
256			bytes,
257		})
258	}
259
260	fn verify(
261		&self,
262		peer: PeerId,
263		public_key: &Vec<u8>,
264		signature: &Vec<u8>,
265		message: &Vec<u8>,
266	) -> Result<bool, String> {
267		let public_key = litep2p::crypto::PublicKey::from_protobuf_encoding(&public_key)
268			.map_err(|error| error.to_string())?;
269		let peer: litep2p::PeerId = peer.into();
270
271		Ok(peer == public_key.to_peer_id() && public_key.verify(message, signature))
272	}
273}
274
275impl NetworkDHTProvider for Litep2pNetworkService {
276	fn find_closest_peers(&self, target: PeerId) {
277		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::FindClosestPeers { target });
278	}
279
280	fn get_value(&self, key: &KademliaKey) {
281		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetValue { key: key.clone() });
282	}
283
284	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
285		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value });
286	}
287
288	fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool) {
289		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValueTo {
290			record: Record {
291				key: record.key.to_vec().into(),
292				value: record.value,
293				publisher: record.publisher.map(|peer_id| {
294					let peer_id: sc_network_types::PeerId = peer_id.into();
295					peer_id.into()
296				}),
297				expires: record.expires,
298			},
299			peers: peers.into_iter().collect(),
300			update_local_storage,
301		});
302	}
303
304	fn store_record(
305		&self,
306		key: KademliaKey,
307		value: Vec<u8>,
308		publisher: Option<PeerId>,
309		expires: Option<Instant>,
310	) {
311		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StoreRecord {
312			key,
313			value,
314			publisher,
315			expires,
316		});
317	}
318
319	fn start_providing(&self, key: KademliaKey) {
320		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StartProviding { key });
321	}
322
323	fn stop_providing(&self, key: KademliaKey) {
324		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StopProviding { key });
325	}
326
327	fn get_providers(&self, key: KademliaKey) {
328		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetProviders { key });
329	}
330}
331
332#[async_trait::async_trait]
333impl NetworkStatusProvider for Litep2pNetworkService {
334	async fn status(&self) -> Result<NetworkStatus, ()> {
335		let (tx, rx) = oneshot::channel();
336		self.cmd_tx
337			.unbounded_send(NetworkServiceCommand::Status { tx })
338			.map_err(|_| ())?;
339
340		rx.await.map_err(|_| ())
341	}
342
343	async fn network_state(&self) -> Result<NetworkState, ()> {
344		Ok(NetworkState {
345			peer_id: self.local_peer_id.to_base58(),
346			listened_addresses: self
347				.listen_addresses
348				.read()
349				.iter()
350				.cloned()
351				.map(|a| Multiaddr::from(a).into())
352				.collect(),
353			external_addresses: self
354				.external_addresses
355				.get_addresses()
356				.into_iter()
357				.map(|a| Multiaddr::from(a).into())
358				.collect(),
359			connected_peers: HashMap::new(),
360			not_connected_peers: HashMap::new(),
361			// TODO: Check what info we can include here.
362			//       Issue reference: https://github.com/paritytech/substrate/issues/14160.
363			peerset: serde_json::json!(
364				"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
365			),
366		})
367	}
368}
369
370// Manual implementation to avoid extra boxing here
371// TODO: functions modifying peerset state could be modified to call peerset directly if the
372// `Multiaddr` only contains a `PeerId`
373#[async_trait::async_trait]
374impl NetworkPeers for Litep2pNetworkService {
375	fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
376		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedPeers {
377			protocol: self.block_announce_protocol.clone(),
378			peers: peers
379				.into_iter()
380				.map(|peer| Multiaddr::empty().with(Protocol::P2p(peer.into())))
381				.collect(),
382		});
383	}
384
385	fn set_authorized_only(&self, reserved_only: bool) {
386		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
387			protocol: self.block_announce_protocol.clone(),
388			reserved_only,
389		});
390	}
391
392	fn add_known_address(&self, peer: PeerId, address: Multiaddr) {
393		let _ = self
394			.cmd_tx
395			.unbounded_send(NetworkServiceCommand::AddKnownAddress { peer, address });
396	}
397
398	fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
399		self.peer_store_handle.peer_reputation(peer_id)
400	}
401
402	fn report_peer(&self, peer: PeerId, cost_benefit: ReputationChange) {
403		self.peer_store_handle.report_peer(peer, cost_benefit);
404	}
405
406	fn disconnect_peer(&self, peer: PeerId, protocol: ProtocolName) {
407		let _ = self
408			.cmd_tx
409			.unbounded_send(NetworkServiceCommand::DisconnectPeer { protocol, peer });
410	}
411
412	fn accept_unreserved_peers(&self) {
413		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
414			protocol: self.block_announce_protocol.clone(),
415			reserved_only: false,
416		});
417	}
418
419	fn deny_unreserved_peers(&self) {
420		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
421			protocol: self.block_announce_protocol.clone(),
422			reserved_only: true,
423		});
424	}
425
426	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
427		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::AddPeersToReservedSet {
428			protocol: self.block_announce_protocol.clone(),
429			peers: HashSet::from_iter([peer.concat().into()]),
430		});
431
432		Ok(())
433	}
434
435	fn remove_reserved_peer(&self, peer: PeerId) {
436		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
437			protocol: self.block_announce_protocol.clone(),
438			peers: HashSet::from_iter([peer]),
439		});
440	}
441
442	fn set_reserved_peers(
443		&self,
444		protocol: ProtocolName,
445		peers: HashSet<Multiaddr>,
446	) -> Result<(), String> {
447		let _ = self
448			.cmd_tx
449			.unbounded_send(NetworkServiceCommand::SetReservedPeers { protocol, peers });
450		Ok(())
451	}
452
453	fn add_peers_to_reserved_set(
454		&self,
455		protocol: ProtocolName,
456		peers: HashSet<Multiaddr>,
457	) -> Result<(), String> {
458		let _ = self
459			.cmd_tx
460			.unbounded_send(NetworkServiceCommand::AddPeersToReservedSet { protocol, peers });
461		Ok(())
462	}
463
464	fn remove_peers_from_reserved_set(
465		&self,
466		protocol: ProtocolName,
467		peers: Vec<PeerId>,
468	) -> Result<(), String> {
469		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
470			protocol,
471			peers: peers.into_iter().map(From::from).collect(),
472		});
473
474		Ok(())
475	}
476
477	fn sync_num_connected(&self) -> usize {
478		self.peerset_handles
479			.get(&self.block_announce_protocol)
480			.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed))
481	}
482
483	fn peer_role(&self, peer: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
484		match Roles::decode_all(&mut &handshake[..]) {
485			Ok(role) => Some(role.into()),
486			Err(_) => {
487				log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
488				self.peer_store_handle.peer_role(&(peer.into()))
489			},
490		}
491	}
492
493	/// Get the list of reserved peers.
494	///
495	/// Returns an error if the `NetworkWorker` is no longer running.
496	async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
497		let Some(handle) = self.peerset_handles.get(&self.block_announce_protocol) else {
498			return Err(())
499		};
500		let (tx, rx) = oneshot::channel();
501
502		handle
503			.tx
504			.unbounded_send(PeersetCommand::GetReservedPeers { tx })
505			.map_err(|_| ())?;
506
507		// the channel can only be closed if `Peerset` no longer exists
508		rx.await.map_err(|_| ())
509	}
510}
511
512impl NetworkEventStream for Litep2pNetworkService {
513	fn event_stream(&self, stream_name: &'static str) -> BoxStream<'static, Event> {
514		let (tx, rx) = out_events::channel(stream_name, 100_000);
515		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::EventStream { tx });
516		Box::pin(rx)
517	}
518}
519
520impl NetworkStateInfo for Litep2pNetworkService {
521	fn external_addresses(&self) -> Vec<Multiaddr> {
522		self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
523	}
524
525	fn listen_addresses(&self) -> Vec<Multiaddr> {
526		self.listen_addresses.read().iter().cloned().map(Into::into).collect()
527	}
528
529	fn local_peer_id(&self) -> PeerId {
530		self.local_peer_id.into()
531	}
532}
533
534// Manual implementation to avoid extra boxing here
535#[async_trait::async_trait]
536impl NetworkRequest for Litep2pNetworkService {
537	async fn request(
538		&self,
539		target: PeerId,
540		protocol: ProtocolName,
541		request: Vec<u8>,
542		fallback_request: Option<(Vec<u8>, ProtocolName)>,
543		connect: IfDisconnected,
544	) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
545		let (tx, rx) = oneshot::channel();
546
547		self.start_request(target, protocol, request, fallback_request, tx, connect);
548
549		match rx.await {
550			Ok(v) => v,
551			// The channel can only be closed if the network worker no longer exists. If the
552			// network worker no longer exists, then all connections to `target` are necessarily
553			// closed, and we legitimately report this situation as a "ConnectionClosed".
554			Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
555		}
556	}
557
558	fn start_request(
559		&self,
560		peer: PeerId,
561		protocol: ProtocolName,
562		request: Vec<u8>,
563		fallback_request: Option<(Vec<u8>, ProtocolName)>,
564		sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
565		connect: IfDisconnected,
566	) {
567		match self.request_response_protocols.get(&protocol) {
568			Some(tx) => {
569				let _ = tx.unbounded_send(OutboundRequest::new(
570					peer,
571					request,
572					sender,
573					fallback_request,
574					connect,
575				));
576			},
577			None => log::warn!(
578				target: LOG_TARGET,
579				"{protocol} doesn't exist, cannot send request to {peer:?}"
580			),
581		}
582	}
583}