1use crate::{
22 config::MultiaddrWithPeerId,
23 litep2p::shim::{
24 notification::{config::ProtocolControlHandle, peerset::PeersetCommand},
25 request_response::OutboundRequest,
26 },
27 network_state::NetworkState,
28 peer_store::PeerStoreProvider,
29 service::out_events,
30 Event, IfDisconnected, NetworkDHTProvider, NetworkEventStream, NetworkPeers, NetworkRequest,
31 NetworkSigner, NetworkStateInfo, NetworkStatus, NetworkStatusProvider, ProtocolName,
32 RequestFailure, Signature,
33};
34
35use crate::litep2p::Record;
36use codec::DecodeAll;
37use futures::{channel::oneshot, stream::BoxStream};
38use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
39use litep2p::{
40 addresses::PublicAddresses, crypto::ed25519::Keypair,
41 types::multiaddr::Multiaddr as LiteP2pMultiaddr,
42};
43use parking_lot::RwLock;
44
45use sc_network_common::{
46 role::{ObservedRole, Roles},
47 types::ReputationChange,
48};
49use sc_network_types::{
50 multiaddr::{Multiaddr, Protocol},
51 PeerId,
52};
53use sc_utils::mpsc::TracingUnboundedSender;
54
55use std::{
56 collections::{HashMap, HashSet},
57 sync::{atomic::Ordering, Arc},
58 time::Instant,
59};
60
61const LOG_TARGET: &str = "sub-libp2p";
63
64#[derive(Debug)]
67pub enum NetworkServiceCommand {
68 GetValue {
70 key: KademliaKey,
72 },
73
74 PutValue {
76 key: KademliaKey,
78
79 value: Vec<u8>,
81 },
82
83 PutValueTo {
85 record: Record,
87 peers: Vec<sc_network_types::PeerId>,
89 update_local_storage: bool,
91 },
92 StoreRecord {
94 key: KademliaKey,
96
97 value: Vec<u8>,
99
100 publisher: Option<PeerId>,
102
103 expires: Option<Instant>,
105 },
106
107 Status {
109 tx: oneshot::Sender<NetworkStatus>,
111 },
112
113 AddPeersToReservedSet {
115 protocol: ProtocolName,
117
118 peers: HashSet<Multiaddr>,
120 },
121
122 AddKnownAddress {
124 peer: PeerId,
126
127 address: Multiaddr,
129 },
130
131 SetReservedPeers {
133 protocol: ProtocolName,
135
136 peers: HashSet<Multiaddr>,
138 },
139
140 DisconnectPeer {
142 protocol: ProtocolName,
144
145 peer: PeerId,
147 },
148
149 SetReservedOnly {
151 protocol: ProtocolName,
153
154 reserved_only: bool,
156 },
157
158 RemoveReservedPeers {
160 protocol: ProtocolName,
162
163 peers: HashSet<PeerId>,
165 },
166
167 EventStream {
169 tx: out_events::Sender,
171 },
172}
173
174#[derive(Debug, Clone)]
176pub struct Litep2pNetworkService {
177 local_peer_id: litep2p::PeerId,
179
180 keypair: Keypair,
182
183 cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
185
186 peer_store_handle: Arc<dyn PeerStoreProvider>,
188
189 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
191
192 block_announce_protocol: ProtocolName,
194
195 request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
197
198 listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
200
201 external_addresses: PublicAddresses,
203}
204
205impl Litep2pNetworkService {
206 pub fn new(
208 local_peer_id: litep2p::PeerId,
209 keypair: Keypair,
210 cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
211 peer_store_handle: Arc<dyn PeerStoreProvider>,
212 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
213 block_announce_protocol: ProtocolName,
214 request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
215 listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
216 external_addresses: PublicAddresses,
217 ) -> Self {
218 Self {
219 local_peer_id,
220 keypair,
221 cmd_tx,
222 peer_store_handle,
223 peerset_handles,
224 block_announce_protocol,
225 request_response_protocols,
226 listen_addresses,
227 external_addresses,
228 }
229 }
230}
231
232impl NetworkSigner for Litep2pNetworkService {
233 fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
234 let public_key = self.keypair.public();
235 let bytes = self.keypair.sign(msg.as_ref());
236
237 Ok(Signature {
238 public_key: crate::service::signature::PublicKey::Litep2p(
239 litep2p::crypto::PublicKey::Ed25519(public_key),
240 ),
241 bytes,
242 })
243 }
244
245 fn verify(
246 &self,
247 peer: PeerId,
248 public_key: &Vec<u8>,
249 signature: &Vec<u8>,
250 message: &Vec<u8>,
251 ) -> Result<bool, String> {
252 let public_key = litep2p::crypto::PublicKey::from_protobuf_encoding(&public_key)
253 .map_err(|error| error.to_string())?;
254 let peer: litep2p::PeerId = peer.into();
255
256 Ok(peer == public_key.to_peer_id() && public_key.verify(message, signature))
257 }
258}
259
260impl NetworkDHTProvider for Litep2pNetworkService {
261 fn get_value(&self, key: &KademliaKey) {
262 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetValue { key: key.clone() });
263 }
264
265 fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
266 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value });
267 }
268
269 fn put_record_to(
270 &self,
271 record: libp2p::kad::Record,
272 peers: HashSet<PeerId>,
273 update_local_storage: bool,
274 ) {
275 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValueTo {
276 record: Record {
277 key: record.key.to_vec().into(),
278 value: record.value,
279 publisher: record.publisher.map(|peer_id| {
280 let peer_id: sc_network_types::PeerId = peer_id.into();
281 peer_id.into()
282 }),
283 expires: record.expires,
284 },
285 peers: peers.into_iter().collect(),
286 update_local_storage,
287 });
288 }
289
290 fn store_record(
291 &self,
292 key: KademliaKey,
293 value: Vec<u8>,
294 publisher: Option<PeerId>,
295 expires: Option<Instant>,
296 ) {
297 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StoreRecord {
298 key,
299 value,
300 publisher,
301 expires,
302 });
303 }
304}
305
306#[async_trait::async_trait]
307impl NetworkStatusProvider for Litep2pNetworkService {
308 async fn status(&self) -> Result<NetworkStatus, ()> {
309 let (tx, rx) = oneshot::channel();
310 self.cmd_tx
311 .unbounded_send(NetworkServiceCommand::Status { tx })
312 .map_err(|_| ())?;
313
314 rx.await.map_err(|_| ())
315 }
316
317 async fn network_state(&self) -> Result<NetworkState, ()> {
318 Ok(NetworkState {
319 peer_id: self.local_peer_id.to_base58(),
320 listened_addresses: self
321 .listen_addresses
322 .read()
323 .iter()
324 .cloned()
325 .map(|a| Multiaddr::from(a).into())
326 .collect(),
327 external_addresses: self
328 .external_addresses
329 .get_addresses()
330 .into_iter()
331 .map(|a| Multiaddr::from(a).into())
332 .collect(),
333 connected_peers: HashMap::new(),
334 not_connected_peers: HashMap::new(),
335 peerset: serde_json::json!(
338 "Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
339 ),
340 })
341 }
342}
343
344#[async_trait::async_trait]
348impl NetworkPeers for Litep2pNetworkService {
349 fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
350 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedPeers {
351 protocol: self.block_announce_protocol.clone(),
352 peers: peers
353 .into_iter()
354 .map(|peer| Multiaddr::empty().with(Protocol::P2p(peer.into())))
355 .collect(),
356 });
357 }
358
359 fn set_authorized_only(&self, reserved_only: bool) {
360 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
361 protocol: self.block_announce_protocol.clone(),
362 reserved_only,
363 });
364 }
365
366 fn add_known_address(&self, peer: PeerId, address: Multiaddr) {
367 let _ = self
368 .cmd_tx
369 .unbounded_send(NetworkServiceCommand::AddKnownAddress { peer, address });
370 }
371
372 fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
373 self.peer_store_handle.peer_reputation(peer_id)
374 }
375
376 fn report_peer(&self, peer: PeerId, cost_benefit: ReputationChange) {
377 self.peer_store_handle.report_peer(peer, cost_benefit);
378 }
379
380 fn disconnect_peer(&self, peer: PeerId, protocol: ProtocolName) {
381 let _ = self
382 .cmd_tx
383 .unbounded_send(NetworkServiceCommand::DisconnectPeer { protocol, peer });
384 }
385
386 fn accept_unreserved_peers(&self) {
387 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
388 protocol: self.block_announce_protocol.clone(),
389 reserved_only: false,
390 });
391 }
392
393 fn deny_unreserved_peers(&self) {
394 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
395 protocol: self.block_announce_protocol.clone(),
396 reserved_only: true,
397 });
398 }
399
400 fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
401 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::AddPeersToReservedSet {
402 protocol: self.block_announce_protocol.clone(),
403 peers: HashSet::from_iter([peer.concat().into()]),
404 });
405
406 Ok(())
407 }
408
409 fn remove_reserved_peer(&self, peer: PeerId) {
410 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
411 protocol: self.block_announce_protocol.clone(),
412 peers: HashSet::from_iter([peer]),
413 });
414 }
415
416 fn set_reserved_peers(
417 &self,
418 protocol: ProtocolName,
419 peers: HashSet<Multiaddr>,
420 ) -> Result<(), String> {
421 let _ = self
422 .cmd_tx
423 .unbounded_send(NetworkServiceCommand::SetReservedPeers { protocol, peers });
424 Ok(())
425 }
426
427 fn add_peers_to_reserved_set(
428 &self,
429 protocol: ProtocolName,
430 peers: HashSet<Multiaddr>,
431 ) -> Result<(), String> {
432 let _ = self
433 .cmd_tx
434 .unbounded_send(NetworkServiceCommand::AddPeersToReservedSet { protocol, peers });
435 Ok(())
436 }
437
438 fn remove_peers_from_reserved_set(
439 &self,
440 protocol: ProtocolName,
441 peers: Vec<PeerId>,
442 ) -> Result<(), String> {
443 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
444 protocol,
445 peers: peers.into_iter().map(From::from).collect(),
446 });
447
448 Ok(())
449 }
450
451 fn sync_num_connected(&self) -> usize {
452 self.peerset_handles
453 .get(&self.block_announce_protocol)
454 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed))
455 }
456
457 fn peer_role(&self, peer: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
458 match Roles::decode_all(&mut &handshake[..]) {
459 Ok(role) => Some(role.into()),
460 Err(_) => {
461 log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
462 self.peer_store_handle.peer_role(&(peer.into()))
463 },
464 }
465 }
466
467 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
471 let Some(handle) = self.peerset_handles.get(&self.block_announce_protocol) else {
472 return Err(())
473 };
474 let (tx, rx) = oneshot::channel();
475
476 handle
477 .tx
478 .unbounded_send(PeersetCommand::GetReservedPeers { tx })
479 .map_err(|_| ())?;
480
481 rx.await.map_err(|_| ())
483 }
484}
485
486impl NetworkEventStream for Litep2pNetworkService {
487 fn event_stream(&self, stream_name: &'static str) -> BoxStream<'static, Event> {
488 let (tx, rx) = out_events::channel(stream_name, 100_000);
489 let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::EventStream { tx });
490 Box::pin(rx)
491 }
492}
493
494impl NetworkStateInfo for Litep2pNetworkService {
495 fn external_addresses(&self) -> Vec<Multiaddr> {
496 self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
497 }
498
499 fn listen_addresses(&self) -> Vec<Multiaddr> {
500 self.listen_addresses.read().iter().cloned().map(Into::into).collect()
501 }
502
503 fn local_peer_id(&self) -> PeerId {
504 self.local_peer_id.into()
505 }
506}
507
508#[async_trait::async_trait]
510impl NetworkRequest for Litep2pNetworkService {
511 async fn request(
512 &self,
513 _target: PeerId,
514 _protocol: ProtocolName,
515 _request: Vec<u8>,
516 _fallback_request: Option<(Vec<u8>, ProtocolName)>,
517 _connect: IfDisconnected,
518 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
519 unimplemented!();
520 }
521
522 fn start_request(
523 &self,
524 peer: PeerId,
525 protocol: ProtocolName,
526 request: Vec<u8>,
527 fallback_request: Option<(Vec<u8>, ProtocolName)>,
528 sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
529 connect: IfDisconnected,
530 ) {
531 match self.request_response_protocols.get(&protocol) {
532 Some(tx) => {
533 let _ = tx.unbounded_send(OutboundRequest::new(
534 peer,
535 request,
536 sender,
537 fallback_request,
538 connect,
539 ));
540 },
541 None => log::warn!(
542 target: LOG_TARGET,
543 "{protocol} doesn't exist, cannot send request to {peer:?}"
544 ),
545 }
546 }
547}