#![allow(clippy::single_match)]
#![allow(clippy::result_large_err)]
#![allow(clippy::redundant_pattern_matching)]
#![allow(clippy::type_complexity)]
#![allow(clippy::result_unit_err)]
#![allow(clippy::should_implement_trait)]
#![allow(clippy::too_many_arguments)]
#![allow(clippy::assign_op_pattern)]
#![allow(clippy::match_like_matches_macro)]
use crate::{
addresses::PublicAddresses,
config::Litep2pConfig,
error::DialError,
protocol::{
libp2p::{bitswap::Bitswap, identify::Identify, kademlia::Kademlia, ping::Ping},
mdns::Mdns,
notification::NotificationProtocol,
request_response::RequestResponseProtocol,
},
transport::{
manager::{SupportedTransport, TransportManager},
tcp::TcpTransport,
TransportBuilder, TransportEvent,
},
};
#[cfg(feature = "quic")]
use crate::transport::quic::QuicTransport;
#[cfg(feature = "webrtc")]
use crate::transport::webrtc::WebRtcTransport;
#[cfg(feature = "websocket")]
use crate::transport::websocket::WebSocketTransport;
use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;
use transport::Endpoint;
use types::ConnectionId;
use std::{collections::HashSet, sync::Arc};
pub use bandwidth::BandwidthSink;
pub use error::Error;
pub use peer_id::PeerId;
pub use types::protocol::ProtocolName;
pub(crate) mod peer_id;
pub mod addresses;
pub mod codec;
pub mod config;
pub mod crypto;
pub mod error;
pub mod executor;
pub mod protocol;
pub mod substream;
pub mod transport;
pub mod types;
pub mod yamux;
mod bandwidth;
mod multistream_select;
#[cfg(test)]
mod mock;
pub type Result<T> = std::result::Result<T, error::Error>;
const LOG_TARGET: &str = "litep2p";
const DEFAULT_CHANNEL_SIZE: usize = 4096usize;
#[derive(Debug)]
pub enum Litep2pEvent {
ConnectionEstablished {
peer: PeerId,
endpoint: Endpoint,
},
ConnectionClosed {
peer: PeerId,
connection_id: ConnectionId,
},
DialFailure {
address: Multiaddr,
error: DialError,
},
ListDialFailures {
errors: Vec<(Multiaddr, DialError)>,
},
}
pub struct Litep2p {
local_peer_id: PeerId,
listen_addresses: Vec<Multiaddr>,
transport_manager: TransportManager,
bandwidth_sink: BandwidthSink,
}
impl Litep2p {
pub fn new(mut litep2p_config: Litep2pConfig) -> crate::Result<Litep2p> {
let local_peer_id = PeerId::from_public_key(&litep2p_config.keypair.public().into());
let bandwidth_sink = BandwidthSink::new();
let mut listen_addresses = vec![];
let supported_transports = Self::supported_transports(&litep2p_config);
let (mut transport_manager, transport_handle) = TransportManager::new(
litep2p_config.keypair.clone(),
supported_transports,
bandwidth_sink.clone(),
litep2p_config.max_parallel_dials,
litep2p_config.connection_limits,
);
if !litep2p_config.known_addresses.is_empty() {
for (peer, addresses) in litep2p_config.known_addresses {
transport_manager.add_known_address(peer, addresses.iter().cloned());
}
}
for (protocol, config) in litep2p_config.notification_protocols.into_iter() {
tracing::debug!(
target: LOG_TARGET,
?protocol,
"enable notification protocol",
);
let service = transport_manager.register_protocol(
protocol,
config.fallback_names.clone(),
config.codec,
litep2p_config.keep_alive_timeout,
);
let executor = Arc::clone(&litep2p_config.executor);
litep2p_config.executor.run(Box::pin(async move {
NotificationProtocol::new(service, config, executor).run().await
}));
}
for (protocol, config) in litep2p_config.request_response_protocols.into_iter() {
tracing::debug!(
target: LOG_TARGET,
?protocol,
"enable request-response protocol",
);
let service = transport_manager.register_protocol(
protocol,
config.fallback_names.clone(),
config.codec,
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
RequestResponseProtocol::new(service, config).run().await
}));
}
for (protocol_name, protocol) in litep2p_config.user_protocols.into_iter() {
tracing::debug!(target: LOG_TARGET, protocol = ?protocol_name, "enable user protocol");
let service = transport_manager.register_protocol(
protocol_name,
Vec::new(),
protocol.codec(),
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
let _ = protocol.run(service).await;
}));
}
if let Some(ping_config) = litep2p_config.ping.take() {
tracing::debug!(
target: LOG_TARGET,
protocol = ?ping_config.protocol,
"enable ipfs ping protocol",
);
let service = transport_manager.register_protocol(
ping_config.protocol.clone(),
Vec::new(),
ping_config.codec,
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
Ping::new(service, ping_config).run().await
}));
}
if let Some(kademlia_config) = litep2p_config.kademlia.take() {
tracing::debug!(
target: LOG_TARGET,
protocol_names = ?kademlia_config.protocol_names,
"enable ipfs kademlia protocol",
);
let main_protocol =
kademlia_config.protocol_names.first().expect("protocol name to exist");
let fallback_names = kademlia_config.protocol_names.iter().skip(1).cloned().collect();
let service = transport_manager.register_protocol(
main_protocol.clone(),
fallback_names,
kademlia_config.codec,
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
let _ = Kademlia::new(service, kademlia_config).run().await;
}));
}
let mut identify_info = match litep2p_config.identify.take() {
None => None,
Some(mut identify_config) => {
tracing::debug!(
target: LOG_TARGET,
protocol = ?identify_config.protocol,
"enable ipfs identify protocol",
);
let service = transport_manager.register_protocol(
identify_config.protocol.clone(),
Vec::new(),
identify_config.codec,
litep2p_config.keep_alive_timeout,
);
identify_config.public = Some(litep2p_config.keypair.public().into());
Some((service, identify_config))
}
};
if let Some(bitswap_config) = litep2p_config.bitswap.take() {
tracing::debug!(
target: LOG_TARGET,
protocol = ?bitswap_config.protocol,
"enable ipfs bitswap protocol",
);
let service = transport_manager.register_protocol(
bitswap_config.protocol.clone(),
Vec::new(),
bitswap_config.codec,
litep2p_config.keep_alive_timeout,
);
litep2p_config.executor.run(Box::pin(async move {
Bitswap::new(service, bitswap_config).run().await
}));
}
if let Some(config) = litep2p_config.tcp.take() {
let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
let (transport, transport_listen_addresses) =
<TcpTransport as TransportBuilder>::new(handle, config)?;
for address in transport_listen_addresses {
transport_manager.register_listen_address(address.clone());
listen_addresses.push(address.with(Protocol::P2p(
Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
)));
}
transport_manager.register_transport(SupportedTransport::Tcp, Box::new(transport));
}
#[cfg(feature = "quic")]
if let Some(config) = litep2p_config.quic.take() {
let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
let (transport, transport_listen_addresses) =
<QuicTransport as TransportBuilder>::new(handle, config)?;
for address in transport_listen_addresses {
transport_manager.register_listen_address(address.clone());
listen_addresses.push(address.with(Protocol::P2p(
Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
)));
}
transport_manager.register_transport(SupportedTransport::Quic, Box::new(transport));
}
#[cfg(feature = "webrtc")]
if let Some(config) = litep2p_config.webrtc.take() {
let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
let (transport, transport_listen_addresses) =
<WebRtcTransport as TransportBuilder>::new(handle, config)?;
for address in transport_listen_addresses {
transport_manager.register_listen_address(address.clone());
listen_addresses.push(address.with(Protocol::P2p(
Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
)));
}
transport_manager.register_transport(SupportedTransport::WebRtc, Box::new(transport));
}
#[cfg(feature = "websocket")]
if let Some(config) = litep2p_config.websocket.take() {
let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
let (transport, transport_listen_addresses) =
<WebSocketTransport as TransportBuilder>::new(handle, config)?;
for address in transport_listen_addresses {
transport_manager.register_listen_address(address.clone());
listen_addresses.push(address.with(Protocol::P2p(
Multihash::from_bytes(&local_peer_id.to_bytes()).unwrap(),
)));
}
transport_manager
.register_transport(SupportedTransport::WebSocket, Box::new(transport));
}
if let Some(config) = litep2p_config.mdns.take() {
let mdns = Mdns::new(transport_handle, config, listen_addresses.clone())?;
litep2p_config.executor.run(Box::pin(async move {
let _ = mdns.start().await;
}));
}
if let Some((service, mut identify_config)) = identify_info.take() {
identify_config.protocols = transport_manager.protocols().cloned().collect();
let identify = Identify::new(service, identify_config);
litep2p_config.executor.run(Box::pin(async move {
let _ = identify.run().await;
}));
}
if transport_manager.installed_transports().count() == 0 {
return Err(Error::Other("No transport specified".to_string()));
}
if listen_addresses.is_empty() {
tracing::warn!(
target: LOG_TARGET,
"litep2p started with no listen addresses, cannot accept inbound connections",
);
}
Ok(Self {
local_peer_id,
bandwidth_sink,
listen_addresses,
transport_manager,
})
}
fn supported_transports(config: &Litep2pConfig) -> HashSet<SupportedTransport> {
let mut supported_transports = HashSet::new();
config
.tcp
.is_some()
.then(|| supported_transports.insert(SupportedTransport::Tcp));
#[cfg(feature = "quic")]
config
.quic
.is_some()
.then(|| supported_transports.insert(SupportedTransport::Quic));
#[cfg(feature = "websocket")]
config
.websocket
.is_some()
.then(|| supported_transports.insert(SupportedTransport::WebSocket));
#[cfg(feature = "webrtc")]
config
.webrtc
.is_some()
.then(|| supported_transports.insert(SupportedTransport::WebRtc));
supported_transports
}
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
pub fn public_addresses(&self) -> PublicAddresses {
self.transport_manager.public_addresses()
}
pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
self.listen_addresses.iter()
}
pub fn bandwidth_sink(&self) -> BandwidthSink {
self.bandwidth_sink.clone()
}
pub async fn dial(&mut self, peer: &PeerId) -> crate::Result<()> {
self.transport_manager.dial(*peer).await
}
pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> {
self.transport_manager.dial_address(address).await
}
pub fn add_known_address(
&mut self,
peer: PeerId,
address: impl Iterator<Item = Multiaddr>,
) -> usize {
self.transport_manager.add_known_address(peer, address)
}
pub async fn next_event(&mut self) -> Option<Litep2pEvent> {
loop {
match self.transport_manager.next().await? {
TransportEvent::ConnectionEstablished { peer, endpoint, .. } =>
return Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }),
TransportEvent::ConnectionClosed {
peer,
connection_id,
} =>
return Some(Litep2pEvent::ConnectionClosed {
peer,
connection_id,
}),
TransportEvent::DialFailure { address, error, .. } =>
return Some(Litep2pEvent::DialFailure { address, error }),
TransportEvent::OpenFailure { errors, .. } => {
return Some(Litep2pEvent::ListDialFailures { errors });
}
_ => {}
}
}
}
}
#[cfg(test)]
mod tests {
use crate::{
config::ConfigBuilder,
protocol::{libp2p::ping, notification::Config as NotificationConfig},
types::protocol::ProtocolName,
Litep2p, Litep2pEvent, PeerId,
};
use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;
use std::net::Ipv4Addr;
#[tokio::test]
async fn initialize_litep2p() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let (config1, _service1) = NotificationConfig::new(
ProtocolName::from("/notificaton/1"),
1337usize,
vec![1, 2, 3, 4],
Vec::new(),
false,
64,
64,
true,
);
let (config2, _service2) = NotificationConfig::new(
ProtocolName::from("/notificaton/2"),
1337usize,
vec![1, 2, 3, 4],
Vec::new(),
false,
64,
64,
true,
);
let (ping_config, _ping_event_stream) = ping::Config::default();
let config = ConfigBuilder::new()
.with_tcp(Default::default())
.with_notification_protocol(config1)
.with_notification_protocol(config2)
.with_libp2p_ping(ping_config)
.build();
let _litep2p = Litep2p::new(config).unwrap();
}
#[tokio::test]
async fn no_transport_given() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let (config1, _service1) = NotificationConfig::new(
ProtocolName::from("/notificaton/1"),
1337usize,
vec![1, 2, 3, 4],
Vec::new(),
false,
64,
64,
true,
);
let (config2, _service2) = NotificationConfig::new(
ProtocolName::from("/notificaton/2"),
1337usize,
vec![1, 2, 3, 4],
Vec::new(),
false,
64,
64,
true,
);
let (ping_config, _ping_event_stream) = ping::Config::default();
let config = ConfigBuilder::new()
.with_notification_protocol(config1)
.with_notification_protocol(config2)
.with_libp2p_ping(ping_config)
.build();
assert!(Litep2p::new(config).is_err());
}
#[tokio::test]
async fn dial_same_address_twice() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let (config1, _service1) = NotificationConfig::new(
ProtocolName::from("/notificaton/1"),
1337usize,
vec![1, 2, 3, 4],
Vec::new(),
false,
64,
64,
true,
);
let (config2, _service2) = NotificationConfig::new(
ProtocolName::from("/notificaton/2"),
1337usize,
vec![1, 2, 3, 4],
Vec::new(),
false,
64,
64,
true,
);
let (ping_config, _ping_event_stream) = ping::Config::default();
let config = ConfigBuilder::new()
.with_tcp(Default::default())
.with_notification_protocol(config1)
.with_notification_protocol(config2)
.with_libp2p_ping(ping_config)
.build();
let peer = PeerId::random();
let address = Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::new(255, 254, 253, 252)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(
Multihash::from_bytes(&peer.to_bytes()).unwrap(),
));
let mut litep2p = Litep2p::new(config).unwrap();
litep2p.dial_address(address.clone()).await.unwrap();
litep2p.dial_address(address.clone()).await.unwrap();
match litep2p.next_event().await {
Some(Litep2pEvent::DialFailure { .. }) => {}
_ => panic!("invalid event received"),
}
match tokio::time::timeout(std::time::Duration::from_secs(20), litep2p.next_event()).await {
Err(_) => {}
_ => panic!("invalid event received"),
}
}
}