sc_network/litep2p/
service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkService` implementation for `litep2p`.
20
21use crate::{
22	config::MultiaddrWithPeerId,
23	litep2p::shim::{
24		notification::{config::ProtocolControlHandle, peerset::PeersetCommand},
25		request_response::OutboundRequest,
26	},
27	network_state::NetworkState,
28	peer_store::PeerStoreProvider,
29	service::out_events,
30	Event, IfDisconnected, NetworkDHTProvider, NetworkEventStream, NetworkPeers, NetworkRequest,
31	NetworkSigner, NetworkStateInfo, NetworkStatus, NetworkStatusProvider, ProtocolName,
32	RequestFailure, Signature,
33};
34
35use crate::litep2p::Record;
36use codec::DecodeAll;
37use futures::{channel::oneshot, stream::BoxStream};
38use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
39use litep2p::{
40	addresses::PublicAddresses, crypto::ed25519::Keypair,
41	types::multiaddr::Multiaddr as LiteP2pMultiaddr,
42};
43use parking_lot::RwLock;
44
45use sc_network_common::{
46	role::{ObservedRole, Roles},
47	types::ReputationChange,
48};
49use sc_network_types::{
50	multiaddr::{Multiaddr, Protocol},
51	PeerId,
52};
53use sc_utils::mpsc::TracingUnboundedSender;
54
55use std::{
56	collections::{HashMap, HashSet},
57	sync::{atomic::Ordering, Arc},
58	time::Instant,
59};
60
61/// Logging target for the file.
62const LOG_TARGET: &str = "sub-libp2p";
63
64/// Commands sent by [`Litep2pNetworkService`] to
65/// [`Litep2pNetworkBackend`](super::Litep2pNetworkBackend).
66#[derive(Debug)]
67pub enum NetworkServiceCommand {
68	/// Get value from DHT.
69	GetValue {
70		/// Record key.
71		key: KademliaKey,
72	},
73
74	/// Put value to DHT.
75	PutValue {
76		/// Record key.
77		key: KademliaKey,
78
79		/// Record value.
80		value: Vec<u8>,
81	},
82
83	/// Put value to DHT.
84	PutValueTo {
85		/// Record.
86		record: Record,
87		/// Peers we want to put the record.
88		peers: Vec<sc_network_types::PeerId>,
89		/// If we should update the local storage or not.
90		update_local_storage: bool,
91	},
92	/// Store record in the local DHT store.
93	StoreRecord {
94		/// Record key.
95		key: KademliaKey,
96
97		/// Record value.
98		value: Vec<u8>,
99
100		/// Original publisher of the record.
101		publisher: Option<PeerId>,
102
103		/// Record expiration time as measured by a local, monothonic clock.
104		expires: Option<Instant>,
105	},
106
107	/// Query network status.
108	Status {
109		/// `oneshot::Sender` for sending the status.
110		tx: oneshot::Sender<NetworkStatus>,
111	},
112
113	/// Add `peers` to `protocol`'s reserved set.
114	AddPeersToReservedSet {
115		/// Protocol.
116		protocol: ProtocolName,
117
118		/// Reserved peers.
119		peers: HashSet<Multiaddr>,
120	},
121
122	/// Add known address for peer.
123	AddKnownAddress {
124		/// Peer ID.
125		peer: PeerId,
126
127		/// Address.
128		address: Multiaddr,
129	},
130
131	/// Set reserved peers for `protocol`.
132	SetReservedPeers {
133		/// Protocol.
134		protocol: ProtocolName,
135
136		/// Reserved peers.
137		peers: HashSet<Multiaddr>,
138	},
139
140	/// Disconnect peer from protocol.
141	DisconnectPeer {
142		/// Protocol.
143		protocol: ProtocolName,
144
145		/// Peer ID.
146		peer: PeerId,
147	},
148
149	/// Set protocol to reserved only (true/false) mode.
150	SetReservedOnly {
151		/// Protocol.
152		protocol: ProtocolName,
153
154		/// Reserved only?
155		reserved_only: bool,
156	},
157
158	/// Remove reserved peers from protocol.
159	RemoveReservedPeers {
160		/// Protocol.
161		protocol: ProtocolName,
162
163		/// Peers to remove from the reserved set.
164		peers: HashSet<PeerId>,
165	},
166
167	/// Create event stream for DHT events.
168	EventStream {
169		/// Sender for the events.
170		tx: out_events::Sender,
171	},
172}
173
174/// `NetworkService` implementation for `litep2p`.
175#[derive(Debug, Clone)]
176pub struct Litep2pNetworkService {
177	/// Local peer ID.
178	local_peer_id: litep2p::PeerId,
179
180	/// The `KeyPair` that defines the `PeerId` of the local node.
181	keypair: Keypair,
182
183	/// TX channel for sending commands to [`Litep2pNetworkBackend`](super::Litep2pNetworkBackend).
184	cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
185
186	/// Handle to `PeerStore`.
187	peer_store_handle: Arc<dyn PeerStoreProvider>,
188
189	/// Peerset handles.
190	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
191
192	/// Name for the block announce protocol.
193	block_announce_protocol: ProtocolName,
194
195	/// Installed request-response protocols.
196	request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
197
198	/// Listen addresses.
199	listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
200
201	/// External addresses.
202	external_addresses: PublicAddresses,
203}
204
205impl Litep2pNetworkService {
206	/// Create new [`Litep2pNetworkService`].
207	pub fn new(
208		local_peer_id: litep2p::PeerId,
209		keypair: Keypair,
210		cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
211		peer_store_handle: Arc<dyn PeerStoreProvider>,
212		peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
213		block_announce_protocol: ProtocolName,
214		request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
215		listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
216		external_addresses: PublicAddresses,
217	) -> Self {
218		Self {
219			local_peer_id,
220			keypair,
221			cmd_tx,
222			peer_store_handle,
223			peerset_handles,
224			block_announce_protocol,
225			request_response_protocols,
226			listen_addresses,
227			external_addresses,
228		}
229	}
230}
231
232impl NetworkSigner for Litep2pNetworkService {
233	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
234		let public_key = self.keypair.public();
235		let bytes = self.keypair.sign(msg.as_ref());
236
237		Ok(Signature {
238			public_key: crate::service::signature::PublicKey::Litep2p(
239				litep2p::crypto::PublicKey::Ed25519(public_key),
240			),
241			bytes,
242		})
243	}
244
245	fn verify(
246		&self,
247		peer: PeerId,
248		public_key: &Vec<u8>,
249		signature: &Vec<u8>,
250		message: &Vec<u8>,
251	) -> Result<bool, String> {
252		let public_key = litep2p::crypto::PublicKey::from_protobuf_encoding(&public_key)
253			.map_err(|error| error.to_string())?;
254		let peer: litep2p::PeerId = peer.into();
255
256		Ok(peer == public_key.to_peer_id() && public_key.verify(message, signature))
257	}
258}
259
260impl NetworkDHTProvider for Litep2pNetworkService {
261	fn get_value(&self, key: &KademliaKey) {
262		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetValue { key: key.clone() });
263	}
264
265	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
266		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value });
267	}
268
269	fn put_record_to(
270		&self,
271		record: libp2p::kad::Record,
272		peers: HashSet<PeerId>,
273		update_local_storage: bool,
274	) {
275		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValueTo {
276			record: Record {
277				key: record.key.to_vec().into(),
278				value: record.value,
279				publisher: record.publisher.map(|peer_id| {
280					let peer_id: sc_network_types::PeerId = peer_id.into();
281					peer_id.into()
282				}),
283				expires: record.expires,
284			},
285			peers: peers.into_iter().collect(),
286			update_local_storage,
287		});
288	}
289
290	fn store_record(
291		&self,
292		key: KademliaKey,
293		value: Vec<u8>,
294		publisher: Option<PeerId>,
295		expires: Option<Instant>,
296	) {
297		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StoreRecord {
298			key,
299			value,
300			publisher,
301			expires,
302		});
303	}
304}
305
306#[async_trait::async_trait]
307impl NetworkStatusProvider for Litep2pNetworkService {
308	async fn status(&self) -> Result<NetworkStatus, ()> {
309		let (tx, rx) = oneshot::channel();
310		self.cmd_tx
311			.unbounded_send(NetworkServiceCommand::Status { tx })
312			.map_err(|_| ())?;
313
314		rx.await.map_err(|_| ())
315	}
316
317	async fn network_state(&self) -> Result<NetworkState, ()> {
318		Ok(NetworkState {
319			peer_id: self.local_peer_id.to_base58(),
320			listened_addresses: self
321				.listen_addresses
322				.read()
323				.iter()
324				.cloned()
325				.map(|a| Multiaddr::from(a).into())
326				.collect(),
327			external_addresses: self
328				.external_addresses
329				.get_addresses()
330				.into_iter()
331				.map(|a| Multiaddr::from(a).into())
332				.collect(),
333			connected_peers: HashMap::new(),
334			not_connected_peers: HashMap::new(),
335			// TODO: Check what info we can include here.
336			//       Issue reference: https://github.com/paritytech/substrate/issues/14160.
337			peerset: serde_json::json!(
338				"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
339			),
340		})
341	}
342}
343
344// Manual implementation to avoid extra boxing here
345// TODO: functions modifying peerset state could be modified to call peerset directly if the
346// `Multiaddr` only contains a `PeerId`
347#[async_trait::async_trait]
348impl NetworkPeers for Litep2pNetworkService {
349	fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
350		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedPeers {
351			protocol: self.block_announce_protocol.clone(),
352			peers: peers
353				.into_iter()
354				.map(|peer| Multiaddr::empty().with(Protocol::P2p(peer.into())))
355				.collect(),
356		});
357	}
358
359	fn set_authorized_only(&self, reserved_only: bool) {
360		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
361			protocol: self.block_announce_protocol.clone(),
362			reserved_only,
363		});
364	}
365
366	fn add_known_address(&self, peer: PeerId, address: Multiaddr) {
367		let _ = self
368			.cmd_tx
369			.unbounded_send(NetworkServiceCommand::AddKnownAddress { peer, address });
370	}
371
372	fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
373		self.peer_store_handle.peer_reputation(peer_id)
374	}
375
376	fn report_peer(&self, peer: PeerId, cost_benefit: ReputationChange) {
377		self.peer_store_handle.report_peer(peer, cost_benefit);
378	}
379
380	fn disconnect_peer(&self, peer: PeerId, protocol: ProtocolName) {
381		let _ = self
382			.cmd_tx
383			.unbounded_send(NetworkServiceCommand::DisconnectPeer { protocol, peer });
384	}
385
386	fn accept_unreserved_peers(&self) {
387		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
388			protocol: self.block_announce_protocol.clone(),
389			reserved_only: false,
390		});
391	}
392
393	fn deny_unreserved_peers(&self) {
394		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
395			protocol: self.block_announce_protocol.clone(),
396			reserved_only: true,
397		});
398	}
399
400	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
401		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::AddPeersToReservedSet {
402			protocol: self.block_announce_protocol.clone(),
403			peers: HashSet::from_iter([peer.concat().into()]),
404		});
405
406		Ok(())
407	}
408
409	fn remove_reserved_peer(&self, peer: PeerId) {
410		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
411			protocol: self.block_announce_protocol.clone(),
412			peers: HashSet::from_iter([peer]),
413		});
414	}
415
416	fn set_reserved_peers(
417		&self,
418		protocol: ProtocolName,
419		peers: HashSet<Multiaddr>,
420	) -> Result<(), String> {
421		let _ = self
422			.cmd_tx
423			.unbounded_send(NetworkServiceCommand::SetReservedPeers { protocol, peers });
424		Ok(())
425	}
426
427	fn add_peers_to_reserved_set(
428		&self,
429		protocol: ProtocolName,
430		peers: HashSet<Multiaddr>,
431	) -> Result<(), String> {
432		let _ = self
433			.cmd_tx
434			.unbounded_send(NetworkServiceCommand::AddPeersToReservedSet { protocol, peers });
435		Ok(())
436	}
437
438	fn remove_peers_from_reserved_set(
439		&self,
440		protocol: ProtocolName,
441		peers: Vec<PeerId>,
442	) -> Result<(), String> {
443		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
444			protocol,
445			peers: peers.into_iter().map(From::from).collect(),
446		});
447
448		Ok(())
449	}
450
451	fn sync_num_connected(&self) -> usize {
452		self.peerset_handles
453			.get(&self.block_announce_protocol)
454			.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed))
455	}
456
457	fn peer_role(&self, peer: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
458		match Roles::decode_all(&mut &handshake[..]) {
459			Ok(role) => Some(role.into()),
460			Err(_) => {
461				log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
462				self.peer_store_handle.peer_role(&(peer.into()))
463			},
464		}
465	}
466
467	/// Get the list of reserved peers.
468	///
469	/// Returns an error if the `NetworkWorker` is no longer running.
470	async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
471		let Some(handle) = self.peerset_handles.get(&self.block_announce_protocol) else {
472			return Err(())
473		};
474		let (tx, rx) = oneshot::channel();
475
476		handle
477			.tx
478			.unbounded_send(PeersetCommand::GetReservedPeers { tx })
479			.map_err(|_| ())?;
480
481		// the channel can only be closed if `Peerset` no longer exists
482		rx.await.map_err(|_| ())
483	}
484}
485
486impl NetworkEventStream for Litep2pNetworkService {
487	fn event_stream(&self, stream_name: &'static str) -> BoxStream<'static, Event> {
488		let (tx, rx) = out_events::channel(stream_name, 100_000);
489		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::EventStream { tx });
490		Box::pin(rx)
491	}
492}
493
494impl NetworkStateInfo for Litep2pNetworkService {
495	fn external_addresses(&self) -> Vec<Multiaddr> {
496		self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
497	}
498
499	fn listen_addresses(&self) -> Vec<Multiaddr> {
500		self.listen_addresses.read().iter().cloned().map(Into::into).collect()
501	}
502
503	fn local_peer_id(&self) -> PeerId {
504		self.local_peer_id.into()
505	}
506}
507
508// Manual implementation to avoid extra boxing here
509#[async_trait::async_trait]
510impl NetworkRequest for Litep2pNetworkService {
511	async fn request(
512		&self,
513		_target: PeerId,
514		_protocol: ProtocolName,
515		_request: Vec<u8>,
516		_fallback_request: Option<(Vec<u8>, ProtocolName)>,
517		_connect: IfDisconnected,
518	) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
519		unimplemented!();
520	}
521
522	fn start_request(
523		&self,
524		peer: PeerId,
525		protocol: ProtocolName,
526		request: Vec<u8>,
527		fallback_request: Option<(Vec<u8>, ProtocolName)>,
528		sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
529		connect: IfDisconnected,
530	) {
531		match self.request_response_protocols.get(&protocol) {
532			Some(tx) => {
533				let _ = tx.unbounded_send(OutboundRequest::new(
534					peer,
535					request,
536					sender,
537					fallback_request,
538					connect,
539				));
540			},
541			None => log::warn!(
542				target: LOG_TARGET,
543				"{protocol} doesn't exist, cannot send request to {peer:?}"
544			),
545		}
546	}
547}