1use crate::{
22 config::MultiaddrWithPeerId,
23 litep2p::shim::{
24 notification::{config::ProtocolControlHandle, peerset::PeersetCommand},
25 request_response::OutboundRequest,
26 },
27 network_state::NetworkState,
28 peer_store::PeerStoreProvider,
29 service::out_events,
30 Event, IfDisconnected, NetworkDHTProvider, NetworkEventStream, NetworkPeers, NetworkRequest,
31 NetworkSigner, NetworkStateInfo, NetworkStatus, NetworkStatusProvider, OutboundFailure,
32 ProtocolName, RequestFailure, Signature,
33};
34
35use codec::DecodeAll;
36use futures::{channel::oneshot, stream::BoxStream};
37use libp2p::identity::SigningError;
38use litep2p::{
39 addresses::PublicAddresses, crypto::ed25519::Keypair,
40 types::multiaddr::Multiaddr as LiteP2pMultiaddr,
41};
42use parking_lot::RwLock;
43use sc_network_types::kad::{Key as KademliaKey, Record};
44
45use sc_network_common::{
46 role::{ObservedRole, Roles},
47 types::ReputationChange,
48};
49use sc_network_types::{
50 multiaddr::{Multiaddr, Protocol},
51 PeerId,
52};
53use sc_utils::mpsc::TracingUnboundedSender;
54
55use std::{
56 collections::{HashMap, HashSet},
57 sync::{atomic::Ordering, Arc},
58 time::Instant,
59};
60
61const LOG_TARGET: &str = "sub-libp2p";
63
64#[derive(Debug)]
67pub enum NetworkServiceCommand {
68 FindClosestPeers {
70 target: PeerId,
72 },
73
74 GetValue {
76 key: KademliaKey,
78 },
79
80 PutValue {
82 key: KademliaKey,
84
85 value: Vec<u8>,
87 },
88
89 PutValueTo {
91 record: Record,
93 peers: Vec<sc_network_types::PeerId>,
95 update_local_storage: bool,
97 },
98 StoreRecord {
100 key: KademliaKey,
102
103 value: Vec<u8>,
105
106 publisher: Option<PeerId>,
108
109 expires: Option<Instant>,
111 },
112
113 StartProviding { key: KademliaKey },
115
116 StopProviding { key: KademliaKey },
118
119 GetProviders { key: KademliaKey },
121
122 Status {
124 tx: oneshot::Sender<NetworkStatus>,
126 },
127
128 AddPeersToReservedSet {
130 protocol: ProtocolName,
132
133 peers: HashSet<Multiaddr>,
135 },
136
137 AddKnownAddress {
139 peer: PeerId,
141
142 address: Multiaddr,
144 },
145
146 SetReservedPeers {
148 protocol: ProtocolName,
150
151 peers: HashSet<Multiaddr>,
153 },
154
155 DisconnectPeer {
157 protocol: ProtocolName,
159
160 peer: PeerId,
162 },
163
164 SetReservedOnly {
166 protocol: ProtocolName,
168
169 reserved_only: bool,
171 },
172
173 RemoveReservedPeers {
175 protocol: ProtocolName,
177
178 peers: HashSet<PeerId>,
180 },
181
182 EventStream {
184 tx: out_events::Sender,
186 },
187}
188
189#[derive(Debug, Clone)]
191pub struct Litep2pNetworkService {
192 local_peer_id: litep2p::PeerId,
194
195 keypair: Keypair,
197
198 cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
200
201 peer_store_handle: Arc<dyn PeerStoreProvider>,
203
204 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
206
207 block_announce_protocol: ProtocolName,
209
210 request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
212
213 listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
215
216 external_addresses: PublicAddresses,
218}
219
220impl Litep2pNetworkService {
221 pub fn new(
223 local_peer_id: litep2p::PeerId,
224 keypair: Keypair,
225 cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
226 peer_store_handle: Arc<dyn PeerStoreProvider>,
227 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
228 block_announce_protocol: ProtocolName,
229 request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
230 listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
231 external_addresses: PublicAddresses,
232 ) -> Self {
233 Self {
234 local_peer_id,
235 keypair,
236 cmd_tx,
237 peer_store_handle,
238 peerset_handles,
239 block_announce_protocol,
240 request_response_protocols,
241 listen_addresses,
242 external_addresses,
243 }
244 }
245}
246
247impl NetworkSigner for Litep2pNetworkService {
248 fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
249 let public_key = self.keypair.public();
250 let bytes = self.keypair.sign(msg.as_ref());
251
252 Ok(Signature {
253 public_key: crate::service::signature::PublicKey::Litep2p(
254 litep2p::crypto::PublicKey::Ed25519(public_key),
255 ),
256 bytes,
257 })
258 }
259
260 fn verify(
261 &self,
262 peer: PeerId,
263 public_key: &Vec<u8>,
264 signature: &Vec<u8>,
265 message: &Vec<u8>,
266 ) -> Result<bool, String> {
267 let public_key = litep2p::crypto::PublicKey::from_protobuf_encoding(&public_key)
268 .map_err(|error| error.to_string())?;
269 let peer: litep2p::PeerId = peer.into();
270
271 Ok(peer == public_key.to_peer_id() && public_key.verify(message, signature))
272 }
273}
274
275impl NetworkDHTProvider for Litep2pNetworkService {
276 fn find_closest_peers(&self, target: PeerId) {
277 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::FindClosestPeers { target });
278 }
279
280 fn get_value(&self, key: &KademliaKey) {
281 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetValue { key: key.clone() });
282 }
283
284 fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
285 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value });
286 }
287
288 fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool) {
289 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValueTo {
290 record: Record {
291 key: record.key.to_vec().into(),
292 value: record.value,
293 publisher: record.publisher.map(|peer_id| {
294 let peer_id: sc_network_types::PeerId = peer_id.into();
295 peer_id.into()
296 }),
297 expires: record.expires,
298 },
299 peers: peers.into_iter().collect(),
300 update_local_storage,
301 });
302 }
303
304 fn store_record(
305 &self,
306 key: KademliaKey,
307 value: Vec<u8>,
308 publisher: Option<PeerId>,
309 expires: Option<Instant>,
310 ) {
311 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StoreRecord {
312 key,
313 value,
314 publisher,
315 expires,
316 });
317 }
318
319 fn start_providing(&self, key: KademliaKey) {
320 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StartProviding { key });
321 }
322
323 fn stop_providing(&self, key: KademliaKey) {
324 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StopProviding { key });
325 }
326
327 fn get_providers(&self, key: KademliaKey) {
328 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetProviders { key });
329 }
330}
331
332#[async_trait::async_trait]
333impl NetworkStatusProvider for Litep2pNetworkService {
334 async fn status(&self) -> Result<NetworkStatus, ()> {
335 let (tx, rx) = oneshot::channel();
336 self.cmd_tx
337 .unbounded_send(NetworkServiceCommand::Status { tx })
338 .map_err(|_| ())?;
339
340 rx.await.map_err(|_| ())
341 }
342
343 async fn network_state(&self) -> Result<NetworkState, ()> {
344 Ok(NetworkState {
345 peer_id: self.local_peer_id.to_base58(),
346 listened_addresses: self
347 .listen_addresses
348 .read()
349 .iter()
350 .cloned()
351 .map(|a| Multiaddr::from(a).into())
352 .collect(),
353 external_addresses: self
354 .external_addresses
355 .get_addresses()
356 .into_iter()
357 .map(|a| Multiaddr::from(a).into())
358 .collect(),
359 connected_peers: HashMap::new(),
360 not_connected_peers: HashMap::new(),
361 peerset: serde_json::json!(
364 "Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
365 ),
366 })
367 }
368}
369
370#[async_trait::async_trait]
374impl NetworkPeers for Litep2pNetworkService {
375 fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
376 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedPeers {
377 protocol: self.block_announce_protocol.clone(),
378 peers: peers
379 .into_iter()
380 .map(|peer| Multiaddr::empty().with(Protocol::P2p(peer.into())))
381 .collect(),
382 });
383 }
384
385 fn set_authorized_only(&self, reserved_only: bool) {
386 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
387 protocol: self.block_announce_protocol.clone(),
388 reserved_only,
389 });
390 }
391
392 fn add_known_address(&self, peer: PeerId, address: Multiaddr) {
393 let _ = self
394 .cmd_tx
395 .unbounded_send(NetworkServiceCommand::AddKnownAddress { peer, address });
396 }
397
398 fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
399 self.peer_store_handle.peer_reputation(peer_id)
400 }
401
402 fn report_peer(&self, peer: PeerId, cost_benefit: ReputationChange) {
403 self.peer_store_handle.report_peer(peer, cost_benefit);
404 }
405
406 fn disconnect_peer(&self, peer: PeerId, protocol: ProtocolName) {
407 let _ = self
408 .cmd_tx
409 .unbounded_send(NetworkServiceCommand::DisconnectPeer { protocol, peer });
410 }
411
412 fn accept_unreserved_peers(&self) {
413 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
414 protocol: self.block_announce_protocol.clone(),
415 reserved_only: false,
416 });
417 }
418
419 fn deny_unreserved_peers(&self) {
420 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
421 protocol: self.block_announce_protocol.clone(),
422 reserved_only: true,
423 });
424 }
425
426 fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
427 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::AddPeersToReservedSet {
428 protocol: self.block_announce_protocol.clone(),
429 peers: HashSet::from_iter([peer.concat().into()]),
430 });
431
432 Ok(())
433 }
434
435 fn remove_reserved_peer(&self, peer: PeerId) {
436 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
437 protocol: self.block_announce_protocol.clone(),
438 peers: HashSet::from_iter([peer]),
439 });
440 }
441
442 fn set_reserved_peers(
443 &self,
444 protocol: ProtocolName,
445 peers: HashSet<Multiaddr>,
446 ) -> Result<(), String> {
447 let _ = self
448 .cmd_tx
449 .unbounded_send(NetworkServiceCommand::SetReservedPeers { protocol, peers });
450 Ok(())
451 }
452
453 fn add_peers_to_reserved_set(
454 &self,
455 protocol: ProtocolName,
456 peers: HashSet<Multiaddr>,
457 ) -> Result<(), String> {
458 let _ = self
459 .cmd_tx
460 .unbounded_send(NetworkServiceCommand::AddPeersToReservedSet { protocol, peers });
461 Ok(())
462 }
463
464 fn remove_peers_from_reserved_set(
465 &self,
466 protocol: ProtocolName,
467 peers: Vec<PeerId>,
468 ) -> Result<(), String> {
469 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
470 protocol,
471 peers: peers.into_iter().map(From::from).collect(),
472 });
473
474 Ok(())
475 }
476
477 fn sync_num_connected(&self) -> usize {
478 self.peerset_handles
479 .get(&self.block_announce_protocol)
480 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed))
481 }
482
483 fn peer_role(&self, peer: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
484 match Roles::decode_all(&mut &handshake[..]) {
485 Ok(role) => Some(role.into()),
486 Err(_) => {
487 log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
488 self.peer_store_handle.peer_role(&(peer.into()))
489 },
490 }
491 }
492
493 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
497 let Some(handle) = self.peerset_handles.get(&self.block_announce_protocol) else {
498 return Err(())
499 };
500 let (tx, rx) = oneshot::channel();
501
502 handle
503 .tx
504 .unbounded_send(PeersetCommand::GetReservedPeers { tx })
505 .map_err(|_| ())?;
506
507 rx.await.map_err(|_| ())
509 }
510}
511
512impl NetworkEventStream for Litep2pNetworkService {
513 fn event_stream(&self, stream_name: &'static str) -> BoxStream<'static, Event> {
514 let (tx, rx) = out_events::channel(stream_name, 100_000);
515 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::EventStream { tx });
516 Box::pin(rx)
517 }
518}
519
520impl NetworkStateInfo for Litep2pNetworkService {
521 fn external_addresses(&self) -> Vec<Multiaddr> {
522 self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
523 }
524
525 fn listen_addresses(&self) -> Vec<Multiaddr> {
526 self.listen_addresses.read().iter().cloned().map(Into::into).collect()
527 }
528
529 fn local_peer_id(&self) -> PeerId {
530 self.local_peer_id.into()
531 }
532}
533
534#[async_trait::async_trait]
536impl NetworkRequest for Litep2pNetworkService {
537 async fn request(
538 &self,
539 target: PeerId,
540 protocol: ProtocolName,
541 request: Vec<u8>,
542 fallback_request: Option<(Vec<u8>, ProtocolName)>,
543 connect: IfDisconnected,
544 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
545 let (tx, rx) = oneshot::channel();
546
547 self.start_request(target, protocol, request, fallback_request, tx, connect);
548
549 match rx.await {
550 Ok(v) => v,
551 Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
555 }
556 }
557
558 fn start_request(
559 &self,
560 peer: PeerId,
561 protocol: ProtocolName,
562 request: Vec<u8>,
563 fallback_request: Option<(Vec<u8>, ProtocolName)>,
564 sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
565 connect: IfDisconnected,
566 ) {
567 match self.request_response_protocols.get(&protocol) {
568 Some(tx) => {
569 let _ = tx.unbounded_send(OutboundRequest::new(
570 peer,
571 request,
572 sender,
573 fallback_request,
574 connect,
575 ));
576 },
577 None => log::warn!(
578 target: LOG_TARGET,
579 "{protocol} doesn't exist, cannot send request to {peer:?}"
580 ),
581 }
582 }
583}