use crate::{
config::MultiaddrWithPeerId,
litep2p::shim::{
notification::{config::ProtocolControlHandle, peerset::PeersetCommand},
request_response::OutboundRequest,
},
network_state::NetworkState,
peer_store::PeerStoreProvider,
service::out_events,
Event, IfDisconnected, NetworkDHTProvider, NetworkEventStream, NetworkPeers, NetworkRequest,
NetworkSigner, NetworkStateInfo, NetworkStatus, NetworkStatusProvider, OutboundFailure,
ProtocolName, RequestFailure, Signature,
};
use codec::DecodeAll;
use futures::{channel::oneshot, stream::BoxStream};
use libp2p::identity::SigningError;
use litep2p::{
addresses::PublicAddresses, crypto::ed25519::Keypair,
types::multiaddr::Multiaddr as LiteP2pMultiaddr,
};
use parking_lot::RwLock;
use sc_network_types::kad::{Key as KademliaKey, Record};
use sc_network_common::{
role::{ObservedRole, Roles},
types::ReputationChange,
};
use sc_network_types::{
multiaddr::{Multiaddr, Protocol},
PeerId,
};
use sc_utils::mpsc::TracingUnboundedSender;
use std::{
collections::{HashMap, HashSet},
sync::{atomic::Ordering, Arc},
time::Instant,
};
const LOG_TARGET: &str = "sub-libp2p";
#[derive(Debug)]
pub enum NetworkServiceCommand {
GetValue {
key: KademliaKey,
},
PutValue {
key: KademliaKey,
value: Vec<u8>,
},
PutValueTo {
record: Record,
peers: Vec<sc_network_types::PeerId>,
update_local_storage: bool,
},
StoreRecord {
key: KademliaKey,
value: Vec<u8>,
publisher: Option<PeerId>,
expires: Option<Instant>,
},
StartProviding { key: KademliaKey },
StopProviding { key: KademliaKey },
GetProviders { key: KademliaKey },
Status {
tx: oneshot::Sender<NetworkStatus>,
},
AddPeersToReservedSet {
protocol: ProtocolName,
peers: HashSet<Multiaddr>,
},
AddKnownAddress {
peer: PeerId,
address: Multiaddr,
},
SetReservedPeers {
protocol: ProtocolName,
peers: HashSet<Multiaddr>,
},
DisconnectPeer {
protocol: ProtocolName,
peer: PeerId,
},
SetReservedOnly {
protocol: ProtocolName,
reserved_only: bool,
},
RemoveReservedPeers {
protocol: ProtocolName,
peers: HashSet<PeerId>,
},
EventStream {
tx: out_events::Sender,
},
}
#[derive(Debug, Clone)]
pub struct Litep2pNetworkService {
local_peer_id: litep2p::PeerId,
keypair: Keypair,
cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
block_announce_protocol: ProtocolName,
request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
}
impl Litep2pNetworkService {
pub fn new(
local_peer_id: litep2p::PeerId,
keypair: Keypair,
cmd_tx: TracingUnboundedSender<NetworkServiceCommand>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
block_announce_protocol: ProtocolName,
request_response_protocols: HashMap<ProtocolName, TracingUnboundedSender<OutboundRequest>>,
listen_addresses: Arc<RwLock<HashSet<LiteP2pMultiaddr>>>,
external_addresses: PublicAddresses,
) -> Self {
Self {
local_peer_id,
keypair,
cmd_tx,
peer_store_handle,
peerset_handles,
block_announce_protocol,
request_response_protocols,
listen_addresses,
external_addresses,
}
}
}
impl NetworkSigner for Litep2pNetworkService {
fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
let public_key = self.keypair.public();
let bytes = self.keypair.sign(msg.as_ref());
Ok(Signature {
public_key: crate::service::signature::PublicKey::Litep2p(
litep2p::crypto::PublicKey::Ed25519(public_key),
),
bytes,
})
}
fn verify(
&self,
peer: PeerId,
public_key: &Vec<u8>,
signature: &Vec<u8>,
message: &Vec<u8>,
) -> Result<bool, String> {
let public_key = litep2p::crypto::PublicKey::from_protobuf_encoding(&public_key)
.map_err(|error| error.to_string())?;
let peer: litep2p::PeerId = peer.into();
Ok(peer == public_key.to_peer_id() && public_key.verify(message, signature))
}
}
impl NetworkDHTProvider for Litep2pNetworkService {
fn get_value(&self, key: &KademliaKey) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetValue { key: key.clone() });
}
fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value });
}
fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValueTo {
record: Record {
key: record.key.to_vec().into(),
value: record.value,
publisher: record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.expires,
},
peers: peers.into_iter().collect(),
update_local_storage,
});
}
fn store_record(
&self,
key: KademliaKey,
value: Vec<u8>,
publisher: Option<PeerId>,
expires: Option<Instant>,
) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StoreRecord {
key,
value,
publisher,
expires,
});
}
fn start_providing(&self, key: KademliaKey) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StartProviding { key });
}
fn stop_providing(&self, key: KademliaKey) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StopProviding { key });
}
fn get_providers(&self, key: KademliaKey) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetProviders { key });
}
}
#[async_trait::async_trait]
impl NetworkStatusProvider for Litep2pNetworkService {
async fn status(&self) -> Result<NetworkStatus, ()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.unbounded_send(NetworkServiceCommand::Status { tx })
.map_err(|_| ())?;
rx.await.map_err(|_| ())
}
async fn network_state(&self) -> Result<NetworkState, ()> {
Ok(NetworkState {
peer_id: self.local_peer_id.to_base58(),
listened_addresses: self
.listen_addresses
.read()
.iter()
.cloned()
.map(|a| Multiaddr::from(a).into())
.collect(),
external_addresses: self
.external_addresses
.get_addresses()
.into_iter()
.map(|a| Multiaddr::from(a).into())
.collect(),
connected_peers: HashMap::new(),
not_connected_peers: HashMap::new(),
peerset: serde_json::json!(
"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
),
})
}
}
#[async_trait::async_trait]
impl NetworkPeers for Litep2pNetworkService {
fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedPeers {
protocol: self.block_announce_protocol.clone(),
peers: peers
.into_iter()
.map(|peer| Multiaddr::empty().with(Protocol::P2p(peer.into())))
.collect(),
});
}
fn set_authorized_only(&self, reserved_only: bool) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
protocol: self.block_announce_protocol.clone(),
reserved_only,
});
}
fn add_known_address(&self, peer: PeerId, address: Multiaddr) {
let _ = self
.cmd_tx
.unbounded_send(NetworkServiceCommand::AddKnownAddress { peer, address });
}
fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
self.peer_store_handle.peer_reputation(peer_id)
}
fn report_peer(&self, peer: PeerId, cost_benefit: ReputationChange) {
self.peer_store_handle.report_peer(peer, cost_benefit);
}
fn disconnect_peer(&self, peer: PeerId, protocol: ProtocolName) {
let _ = self
.cmd_tx
.unbounded_send(NetworkServiceCommand::DisconnectPeer { protocol, peer });
}
fn accept_unreserved_peers(&self) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
protocol: self.block_announce_protocol.clone(),
reserved_only: false,
});
}
fn deny_unreserved_peers(&self) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::SetReservedOnly {
protocol: self.block_announce_protocol.clone(),
reserved_only: true,
});
}
fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::AddPeersToReservedSet {
protocol: self.block_announce_protocol.clone(),
peers: HashSet::from_iter([peer.concat().into()]),
});
Ok(())
}
fn remove_reserved_peer(&self, peer: PeerId) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
protocol: self.block_announce_protocol.clone(),
peers: HashSet::from_iter([peer]),
});
}
fn set_reserved_peers(
&self,
protocol: ProtocolName,
peers: HashSet<Multiaddr>,
) -> Result<(), String> {
let _ = self
.cmd_tx
.unbounded_send(NetworkServiceCommand::SetReservedPeers { protocol, peers });
Ok(())
}
fn add_peers_to_reserved_set(
&self,
protocol: ProtocolName,
peers: HashSet<Multiaddr>,
) -> Result<(), String> {
let _ = self
.cmd_tx
.unbounded_send(NetworkServiceCommand::AddPeersToReservedSet { protocol, peers });
Ok(())
}
fn remove_peers_from_reserved_set(
&self,
protocol: ProtocolName,
peers: Vec<PeerId>,
) -> Result<(), String> {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::RemoveReservedPeers {
protocol,
peers: peers.into_iter().map(From::from).collect(),
});
Ok(())
}
fn sync_num_connected(&self) -> usize {
self.peerset_handles
.get(&self.block_announce_protocol)
.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed))
}
fn peer_role(&self, peer: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
match Roles::decode_all(&mut &handshake[..]) {
Ok(role) => Some(role.into()),
Err(_) => {
log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
self.peer_store_handle.peer_role(&(peer.into()))
},
}
}
async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
let Some(handle) = self.peerset_handles.get(&self.block_announce_protocol) else {
return Err(())
};
let (tx, rx) = oneshot::channel();
handle
.tx
.unbounded_send(PeersetCommand::GetReservedPeers { tx })
.map_err(|_| ())?;
rx.await.map_err(|_| ())
}
}
impl NetworkEventStream for Litep2pNetworkService {
fn event_stream(&self, stream_name: &'static str) -> BoxStream<'static, Event> {
let (tx, rx) = out_events::channel(stream_name, 100_000);
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::EventStream { tx });
Box::pin(rx)
}
}
impl NetworkStateInfo for Litep2pNetworkService {
fn external_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.get_addresses().into_iter().map(Into::into).collect()
}
fn listen_addresses(&self) -> Vec<Multiaddr> {
self.listen_addresses.read().iter().cloned().map(Into::into).collect()
}
fn local_peer_id(&self) -> PeerId {
self.local_peer_id.into()
}
}
#[async_trait::async_trait]
impl NetworkRequest for Litep2pNetworkService {
async fn request(
&self,
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
connect: IfDisconnected,
) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
let (tx, rx) = oneshot::channel();
self.start_request(target, protocol, request, fallback_request, tx, connect);
match rx.await {
Ok(v) => v,
Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
}
}
fn start_request(
&self,
peer: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
sender: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
match self.request_response_protocols.get(&protocol) {
Some(tx) => {
let _ = tx.unbounded_send(OutboundRequest::new(
peer,
request,
sender,
fallback_request,
connect,
));
},
None => log::warn!(
target: LOG_TARGET,
"{protocol} doesn't exist, cannot send request to {peer:?}"
),
}
}
}