pub use crate::{
discovery::DEFAULT_KADEMLIA_REPLICATION_FACTOR,
peer_store::PeerStoreProvider,
protocol::{notification_service, NotificationsSink, ProtocolHandlePair},
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
service::{
metrics::NotificationMetrics,
traits::{NotificationConfig, NotificationService, PeerStore},
},
types::ProtocolName,
};
pub use sc_network_types::{build_multiaddr, ed25519};
use sc_network_types::{
multiaddr::{self, Multiaddr},
PeerId,
};
use crate::service::{ensure_addresses_consistent_with_transport, traits::NetworkBackend};
use codec::Encode;
use prometheus_endpoint::Registry;
use zeroize::Zeroize;
pub use sc_network_common::{
role::{Role, Roles},
sync::SyncMode,
ExHashT,
};
use sp_runtime::traits::Block as BlockT;
use std::{
error::Error,
fmt, fs,
future::Future,
io::{self, Write},
iter,
net::Ipv4Addr,
num::NonZeroUsize,
path::{Path, PathBuf},
pin::Pin,
str::{self, FromStr},
sync::Arc,
};
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>);
impl<'a> From<&'a str> for ProtocolId {
fn from(bytes: &'a str) -> ProtocolId {
Self(bytes.as_bytes().into())
}
}
impl AsRef<str> for ProtocolId {
fn as_ref(&self) -> &str {
str::from_utf8(&self.0[..])
.expect("the only way to build a ProtocolId is through a UTF-8 String; qed")
}
}
impl fmt::Debug for ProtocolId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self.as_ref(), f)
}
}
pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> {
let addr: Multiaddr = addr_str.parse()?;
parse_addr(addr)
}
pub fn parse_addr(mut addr: Multiaddr) -> Result<(PeerId, Multiaddr), ParseErr> {
let multihash = match addr.pop() {
Some(multiaddr::Protocol::P2p(multihash)) => multihash,
_ => return Err(ParseErr::PeerIdMissing),
};
let peer_id = PeerId::from_multihash(multihash).map_err(|_| ParseErr::InvalidPeerId)?;
Ok((peer_id, addr))
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(try_from = "String", into = "String")]
pub struct MultiaddrWithPeerId {
pub multiaddr: Multiaddr,
pub peer_id: PeerId,
}
impl MultiaddrWithPeerId {
pub fn concat(&self) -> Multiaddr {
let proto = multiaddr::Protocol::P2p(From::from(self.peer_id));
self.multiaddr.clone().with(proto)
}
}
impl fmt::Display for MultiaddrWithPeerId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.concat(), f)
}
}
impl FromStr for MultiaddrWithPeerId {
type Err = ParseErr;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (peer_id, multiaddr) = parse_str_addr(s)?;
Ok(Self { peer_id, multiaddr })
}
}
impl From<MultiaddrWithPeerId> for String {
fn from(ma: MultiaddrWithPeerId) -> String {
format!("{}", ma)
}
}
impl TryFrom<String> for MultiaddrWithPeerId {
type Error = ParseErr;
fn try_from(string: String) -> Result<Self, Self::Error> {
string.parse()
}
}
#[derive(Debug)]
pub enum ParseErr {
MultiaddrParse(multiaddr::ParseError),
InvalidPeerId,
PeerIdMissing,
}
impl fmt::Display for ParseErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::MultiaddrParse(err) => write!(f, "{}", err),
Self::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"),
Self::PeerIdMissing => write!(f, "Peer id is missing from the address"),
}
}
}
impl std::error::Error for ParseErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::MultiaddrParse(err) => Some(err),
Self::InvalidPeerId => None,
Self::PeerIdMissing => None,
}
}
}
impl From<multiaddr::ParseError> for ParseErr {
fn from(err: multiaddr::ParseError) -> ParseErr {
Self::MultiaddrParse(err)
}
}
#[derive(Debug, Clone)]
pub struct NotificationHandshake(Vec<u8>);
impl NotificationHandshake {
pub fn new<H: Encode>(handshake: H) -> Self {
Self(handshake.encode())
}
pub fn from_bytes(bytes: Vec<u8>) -> Self {
Self(bytes)
}
}
impl std::ops::Deref for NotificationHandshake {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Clone, Debug)]
pub enum TransportConfig {
Normal {
enable_mdns: bool,
allow_private_ip: bool,
},
MemoryOnly,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
Accept,
Deny,
}
impl NonReservedPeerMode {
pub fn parse(s: &str) -> Option<Self> {
match s {
"accept" => Some(Self::Accept),
"deny" => Some(Self::Deny),
_ => None,
}
}
pub fn is_reserved_only(&self) -> bool {
matches!(self, NonReservedPeerMode::Deny)
}
}
#[derive(Clone, Debug)]
pub enum NodeKeyConfig {
Ed25519(Secret<ed25519::SecretKey>),
}
impl Default for NodeKeyConfig {
fn default() -> NodeKeyConfig {
Self::Ed25519(Secret::New)
}
}
pub type Ed25519Secret = Secret<ed25519::SecretKey>;
#[derive(Clone)]
pub enum Secret<K> {
Input(K),
File(PathBuf),
New,
}
impl<K> fmt::Debug for Secret<K> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Input(_) => f.debug_tuple("Secret::Input").finish(),
Self::File(path) => f.debug_tuple("Secret::File").field(path).finish(),
Self::New => f.debug_tuple("Secret::New").finish(),
}
}
}
impl NodeKeyConfig {
pub fn into_keypair(self) -> io::Result<ed25519::Keypair> {
use NodeKeyConfig::*;
match self {
Ed25519(Secret::New) => Ok(ed25519::Keypair::generate()),
Ed25519(Secret::Input(k)) => Ok(ed25519::Keypair::from(k).into()),
Ed25519(Secret::File(f)) => get_secret(
f,
|mut b| match String::from_utf8(b.to_vec()).ok().and_then(|s| {
if s.len() == 64 {
array_bytes::hex2bytes(&s).ok()
} else {
None
}
}) {
Some(s) => ed25519::SecretKey::try_from_bytes(s),
_ => ed25519::SecretKey::try_from_bytes(&mut b),
},
ed25519::SecretKey::generate,
|b| b.as_ref().to_vec(),
)
.map(ed25519::Keypair::from),
}
}
}
fn get_secret<P, F, G, E, W, K>(file: P, parse: F, generate: G, serialize: W) -> io::Result<K>
where
P: AsRef<Path>,
F: for<'r> FnOnce(&'r mut [u8]) -> Result<K, E>,
G: FnOnce() -> K,
E: Error + Send + Sync + 'static,
W: Fn(&K) -> Vec<u8>,
{
std::fs::read(&file)
.and_then(|mut sk_bytes| {
parse(&mut sk_bytes).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})
.or_else(|e| {
if e.kind() == io::ErrorKind::NotFound {
file.as_ref().parent().map_or(Ok(()), fs::create_dir_all)?;
let sk = generate();
let mut sk_vec = serialize(&sk);
write_secret_file(file, &sk_vec)?;
sk_vec.zeroize();
Ok(sk)
} else {
Err(e)
}
})
}
fn write_secret_file<P>(path: P, sk_bytes: &[u8]) -> io::Result<()>
where
P: AsRef<Path>,
{
let mut file = open_secret_file(&path)?;
file.write_all(sk_bytes)
}
#[cfg(unix)]
fn open_secret_file<P>(path: P) -> io::Result<fs::File>
where
P: AsRef<Path>,
{
use std::os::unix::fs::OpenOptionsExt;
fs::OpenOptions::new().write(true).create_new(true).mode(0o600).open(path)
}
#[cfg(not(unix))]
fn open_secret_file<P>(path: P) -> Result<fs::File, io::Error>
where
P: AsRef<Path>,
{
fs::OpenOptions::new().write(true).create_new(true).open(path)
}
#[derive(Clone, Debug)]
pub struct SetConfig {
pub in_peers: u32,
pub out_peers: u32,
pub reserved_nodes: Vec<MultiaddrWithPeerId>,
pub non_reserved_mode: NonReservedPeerMode,
}
impl Default for SetConfig {
fn default() -> Self {
Self {
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
}
}
}
#[derive(Debug)]
pub struct NonDefaultSetConfig {
protocol_name: ProtocolName,
fallback_names: Vec<ProtocolName>,
handshake: Option<NotificationHandshake>,
max_notification_size: u64,
set_config: SetConfig,
protocol_handle_pair: ProtocolHandlePair,
}
impl NonDefaultSetConfig {
pub fn new(
protocol_name: ProtocolName,
fallback_names: Vec<ProtocolName>,
max_notification_size: u64,
handshake: Option<NotificationHandshake>,
set_config: SetConfig,
) -> (Self, Box<dyn NotificationService>) {
let (protocol_handle_pair, notification_service) =
notification_service(protocol_name.clone());
(
Self {
protocol_name,
max_notification_size,
fallback_names,
handshake,
set_config,
protocol_handle_pair,
},
notification_service,
)
}
pub fn protocol_name(&self) -> &ProtocolName {
&self.protocol_name
}
pub fn fallback_names(&self) -> impl Iterator<Item = &ProtocolName> {
self.fallback_names.iter()
}
pub fn handshake(&self) -> &Option<NotificationHandshake> {
&self.handshake
}
pub fn max_notification_size(&self) -> u64 {
self.max_notification_size
}
pub fn set_config(&self) -> &SetConfig {
&self.set_config
}
pub fn take_protocol_handle(self) -> ProtocolHandlePair {
self.protocol_handle_pair
}
pub fn allow_non_reserved(&mut self, in_peers: u32, out_peers: u32) {
self.set_config.in_peers = in_peers;
self.set_config.out_peers = out_peers;
self.set_config.non_reserved_mode = NonReservedPeerMode::Accept;
}
pub fn add_reserved(&mut self, peer: MultiaddrWithPeerId) {
self.set_config.reserved_nodes.push(peer);
}
pub fn add_fallback_names(&mut self, fallback_names: Vec<ProtocolName>) {
self.fallback_names.extend(fallback_names);
}
}
impl NotificationConfig for NonDefaultSetConfig {
fn set_config(&self) -> &SetConfig {
&self.set_config
}
fn protocol_name(&self) -> &ProtocolName {
&self.protocol_name
}
}
#[derive(Clone, Debug)]
pub struct NetworkConfiguration {
pub net_config_path: Option<PathBuf>,
pub listen_addresses: Vec<Multiaddr>,
pub public_addresses: Vec<Multiaddr>,
pub boot_nodes: Vec<MultiaddrWithPeerId>,
pub node_key: NodeKeyConfig,
pub default_peers_set: SetConfig,
pub default_peers_set_num_full: u32,
pub client_version: String,
pub node_name: String,
pub transport: TransportConfig,
pub max_parallel_downloads: u32,
pub max_blocks_per_request: u32,
pub sync_mode: SyncMode,
pub enable_dht_random_walk: bool,
pub allow_non_globals_in_dht: bool,
pub kademlia_disjoint_query_paths: bool,
pub kademlia_replication_factor: NonZeroUsize,
pub ipfs_server: bool,
pub network_backend: NetworkBackendType,
}
impl NetworkConfiguration {
pub fn new<SN: Into<String>, SV: Into<String>>(
node_name: SN,
client_version: SV,
node_key: NodeKeyConfig,
net_config_path: Option<PathBuf>,
) -> Self {
let default_peers_set = SetConfig::default();
Self {
net_config_path,
listen_addresses: Vec::new(),
public_addresses: Vec::new(),
boot_nodes: Vec::new(),
node_key,
default_peers_set_num_full: default_peers_set.in_peers + default_peers_set.out_peers,
default_peers_set,
client_version: client_version.into(),
node_name: node_name.into(),
transport: TransportConfig::Normal { enable_mdns: false, allow_private_ip: true },
max_parallel_downloads: 5,
max_blocks_per_request: 64,
sync_mode: SyncMode::Full,
enable_dht_random_walk: true,
allow_non_globals_in_dht: false,
kademlia_disjoint_query_paths: false,
kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
.expect("value is a constant; constant is non-zero; qed."),
ipfs_server: false,
network_backend: NetworkBackendType::Libp2p,
}
}
pub fn new_local() -> NetworkConfiguration {
let mut config =
NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
config.listen_addresses =
vec![iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(multiaddr::Protocol::Tcp(0)))
.collect()];
config.allow_non_globals_in_dht = true;
config
}
pub fn new_memory() -> NetworkConfiguration {
let mut config =
NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
config.listen_addresses =
vec![iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(multiaddr::Protocol::Tcp(0)))
.collect()];
config.allow_non_globals_in_dht = true;
config
}
}
pub struct Params<Block: BlockT, H: ExHashT, N: NetworkBackend<Block, H>> {
pub role: Role,
pub executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
pub network_config: FullNetworkConfiguration<Block, H, N>,
pub protocol_id: ProtocolId,
pub genesis_hash: Block::Hash,
pub fork_id: Option<String>,
pub metrics_registry: Option<Registry>,
pub block_announce_config: N::NotificationProtocolConfig,
pub bitswap_config: Option<N::BitswapConfig>,
pub notification_metrics: NotificationMetrics,
}
pub struct FullNetworkConfiguration<B: BlockT + 'static, H: ExHashT, N: NetworkBackend<B, H>> {
pub(crate) notification_protocols: Vec<N::NotificationProtocolConfig>,
pub(crate) request_response_protocols: Vec<N::RequestResponseProtocolConfig>,
pub network_config: NetworkConfiguration,
peer_store: Option<N::PeerStore>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
pub metrics_registry: Option<Registry>,
}
impl<B: BlockT + 'static, H: ExHashT, N: NetworkBackend<B, H>> FullNetworkConfiguration<B, H, N> {
pub fn new(network_config: &NetworkConfiguration, metrics_registry: Option<Registry>) -> Self {
let bootnodes = network_config.boot_nodes.iter().map(|bootnode| bootnode.peer_id).collect();
let peer_store = N::peer_store(bootnodes, metrics_registry.clone());
let peer_store_handle = peer_store.handle();
Self {
peer_store: Some(peer_store),
peer_store_handle,
notification_protocols: Vec::new(),
request_response_protocols: Vec::new(),
network_config: network_config.clone(),
metrics_registry,
}
}
pub fn add_notification_protocol(&mut self, config: N::NotificationProtocolConfig) {
self.notification_protocols.push(config);
}
pub fn notification_protocols(&self) -> &Vec<N::NotificationProtocolConfig> {
&self.notification_protocols
}
pub fn add_request_response_protocol(&mut self, config: N::RequestResponseProtocolConfig) {
self.request_response_protocols.push(config);
}
pub fn peer_store_handle(&self) -> Arc<dyn PeerStoreProvider> {
Arc::clone(&self.peer_store_handle)
}
pub fn take_peer_store(&mut self) -> N::PeerStore {
self.peer_store
.take()
.expect("`PeerStore` can only be taken once when it's started; qed")
}
pub fn sanity_check_addresses(&self) -> Result<(), crate::error::Error> {
ensure_addresses_consistent_with_transport(
self.network_config.listen_addresses.iter(),
&self.network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
self.network_config.boot_nodes.iter().map(|x| &x.multiaddr),
&self.network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
self.network_config
.default_peers_set
.reserved_nodes
.iter()
.map(|x| &x.multiaddr),
&self.network_config.transport,
)?;
for notification_protocol in &self.notification_protocols {
ensure_addresses_consistent_with_transport(
notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
&self.network_config.transport,
)?;
}
ensure_addresses_consistent_with_transport(
self.network_config.public_addresses.iter(),
&self.network_config.transport,
)?;
Ok(())
}
pub fn sanity_check_bootnodes(&self) -> Result<(), crate::error::Error> {
self.network_config.boot_nodes.iter().try_for_each(|bootnode| {
if let Some(other) = self
.network_config
.boot_nodes
.iter()
.filter(|o| o.multiaddr == bootnode.multiaddr)
.find(|o| o.peer_id != bootnode.peer_id)
{
Err(crate::error::Error::DuplicateBootnode {
address: bootnode.multiaddr.clone().into(),
first_id: bootnode.peer_id.into(),
second_id: other.peer_id.into(),
})
} else {
Ok(())
}
})
}
pub fn known_addresses(&self) -> Vec<(PeerId, Multiaddr)> {
let mut addresses: Vec<_> = self
.network_config
.default_peers_set
.reserved_nodes
.iter()
.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
.chain(self.notification_protocols.iter().flat_map(|protocol| {
protocol
.set_config()
.reserved_nodes
.iter()
.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
}))
.chain(
self.network_config
.boot_nodes
.iter()
.map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
)
.collect();
addresses.sort();
addresses.dedup();
addresses
}
}
#[derive(Debug, Clone)]
pub enum NetworkBackendType {
Libp2p,
Litep2p,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn tempdir_with_prefix(prefix: &str) -> TempDir {
tempfile::Builder::new().prefix(prefix).tempdir().unwrap()
}
fn secret_bytes(kp: ed25519::Keypair) -> Vec<u8> {
kp.secret().to_bytes().into()
}
#[test]
fn test_secret_file() {
let tmp = tempdir_with_prefix("x");
std::fs::remove_dir(tmp.path()).unwrap(); let file = tmp.path().join("x").to_path_buf();
let kp1 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap();
assert!(file.is_file() && secret_bytes(kp1) == secret_bytes(kp2))
}
#[test]
fn test_secret_input() {
let sk = ed25519::SecretKey::generate();
let kp1 = NodeKeyConfig::Ed25519(Secret::Input(sk.clone())).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::Input(sk)).into_keypair().unwrap();
assert!(secret_bytes(kp1) == secret_bytes(kp2));
}
#[test]
fn test_secret_new() {
let kp1 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap();
assert!(secret_bytes(kp1) != secret_bytes(kp2));
}
}