use crate::{
codec::ProtocolCodec,
crypto::PublicKey,
error::{Error, SubstreamError},
protocol::{Direction, TransportEvent, TransportService},
substream::Substream,
transport::Endpoint,
types::{protocol::ProtocolName, SubstreamId},
PeerId, DEFAULT_CHANNEL_SIZE,
};
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
use multiaddr::Multiaddr;
use prost::Message;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
const LOG_TARGET: &str = "litep2p::ipfs::identify";
const PROTOCOL_NAME: &str = "/ipfs/id/1.0.0";
const _PUSH_PROTOCOL_NAME: &str = "/ipfs/id/push/1.0.0";
const DEFAULT_AGENT: &str = "litep2p/1.0.0";
const IDENTIFY_PAYLOAD_SIZE: usize = 4096;
mod identify_schema {
include!(concat!(env!("OUT_DIR"), "/identify.rs"));
}
pub struct Config {
pub(crate) protocol: ProtocolName,
pub(crate) codec: ProtocolCodec,
tx_event: Sender<IdentifyEvent>,
pub(crate) public: Option<PublicKey>,
pub(crate) protocols: Vec<ProtocolName>,
pub(crate) protocol_version: String,
pub(crate) user_agent: Option<String>,
}
impl Config {
pub fn new(
protocol_version: String,
user_agent: Option<String>,
) -> (Self, Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>) {
let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE);
(
Self {
tx_event,
public: None,
protocol_version,
user_agent,
codec: ProtocolCodec::UnsignedVarint(Some(IDENTIFY_PAYLOAD_SIZE)),
protocols: Vec::new(),
protocol: ProtocolName::from(PROTOCOL_NAME),
},
Box::new(ReceiverStream::new(rx_event)),
)
}
}
#[derive(Debug)]
pub enum IdentifyEvent {
PeerIdentified {
peer: PeerId,
protocol_version: Option<String>,
user_agent: Option<String>,
supported_protocols: HashSet<ProtocolName>,
observed_address: Multiaddr,
listen_addresses: Vec<Multiaddr>,
},
}
struct IdentifyResponse {
peer: PeerId,
protocol_version: Option<String>,
user_agent: Option<String>,
supported_protocols: HashSet<String>,
listen_addresses: Vec<Multiaddr>,
observed_address: Option<Multiaddr>,
}
pub(crate) struct Identify {
service: TransportService,
tx: Sender<IdentifyEvent>,
peers: HashMap<PeerId, Endpoint>,
public: PublicKey,
protocol_version: String,
user_agent: String,
protocols: Vec<String>,
pending_opens: HashMap<SubstreamId, PeerId>,
pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<IdentifyResponse>>>,
pending_inbound: FuturesUnordered<BoxFuture<'static, ()>>,
}
impl Identify {
pub(crate) fn new(service: TransportService, config: Config) -> Self {
Self {
service,
tx: config.tx_event,
peers: HashMap::new(),
public: config.public.expect("public key to be supplied"),
protocol_version: config.protocol_version,
user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()),
pending_opens: HashMap::new(),
pending_inbound: FuturesUnordered::new(),
pending_outbound: FuturesUnordered::new(),
protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(),
}
}
fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> {
tracing::trace!(target: LOG_TARGET, ?peer, ?endpoint, "connection established");
let substream_id = self.service.open_substream(peer)?;
self.pending_opens.insert(substream_id, peer);
self.peers.insert(peer, endpoint);
Ok(())
}
fn on_connection_closed(&mut self, peer: PeerId) {
tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
self.peers.remove(&peer);
}
fn on_inbound_substream(
&mut self,
peer: PeerId,
protocol: ProtocolName,
mut substream: Substream,
) {
tracing::trace!(
target: LOG_TARGET,
?peer,
?protocol,
"inbound substream opened"
);
let observed_addr = match self.peers.get(&peer) {
Some(endpoint) => Some(endpoint.address().to_vec()),
None => {
tracing::warn!(
target: LOG_TARGET,
?peer,
%protocol,
"inbound identify substream opened for peer who doesn't exist",
);
None
}
};
let mut listen_addr: HashSet<_> =
self.service.listen_addresses().into_iter().map(|addr| addr.to_vec()).collect();
listen_addr
.extend(self.service.public_addresses().inner.read().iter().map(|addr| addr.to_vec()));
let identify = identify_schema::Identify {
protocol_version: Some(self.protocol_version.clone()),
agent_version: Some(self.user_agent.clone()),
public_key: Some(self.public.to_protobuf_encoding()),
listen_addrs: listen_addr.into_iter().collect(),
observed_addr,
protocols: self.protocols.clone(),
};
tracing::trace!(
target: LOG_TARGET,
?peer,
?identify,
"sending identify response",
);
let mut msg = Vec::with_capacity(identify.encoded_len());
identify.encode(&mut msg).expect("`msg` to have enough capacity");
self.pending_inbound.push(Box::pin(async move {
match tokio::time::timeout(Duration::from_secs(10), substream.send_framed(msg.into()))
.await
{
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?error,
"timed out while sending ipfs identify response",
);
}
Ok(Err(error)) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?error,
"failed to send ipfs identify response",
);
}
Ok(_) => {}
}
}))
}
fn on_outbound_substream(
&mut self,
peer: PeerId,
protocol: ProtocolName,
substream_id: SubstreamId,
mut substream: Substream,
) {
tracing::trace!(
target: LOG_TARGET,
?peer,
?protocol,
?substream_id,
"outbound substream opened"
);
self.pending_outbound.push(Box::pin(async move {
let payload =
match tokio::time::timeout(Duration::from_secs(10), substream.next()).await {
Err(_) => return Err(Error::Timeout),
Ok(None) =>
return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some(
substream_id,
)))),
Ok(Some(Err(error))) => return Err(error.into()),
Ok(Some(Ok(payload))) => payload,
};
let info = identify_schema::Identify::decode(payload.to_vec().as_slice())?;
tracing::trace!(target: LOG_TARGET, ?peer, ?info, "peer identified");
let listen_addresses = info
.listen_addrs
.iter()
.filter_map(|address| Multiaddr::try_from(address.clone()).ok())
.collect();
let observed_address =
info.observed_addr.and_then(|address| Multiaddr::try_from(address).ok());
let protocol_version = info.protocol_version;
let user_agent = info.agent_version;
Ok(IdentifyResponse {
peer,
protocol_version,
user_agent,
supported_protocols: HashSet::from_iter(info.protocols),
observed_address,
listen_addresses,
})
}));
}
pub async fn run(mut self) {
tracing::debug!(target: LOG_TARGET, "starting identify event loop");
loop {
tokio::select! {
event = self.service.next() => match event {
None => return,
Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => {
let _ = self.on_connection_established(peer, endpoint);
}
Some(TransportEvent::ConnectionClosed { peer }) => {
self.on_connection_closed(peer);
}
Some(TransportEvent::SubstreamOpened {
peer,
protocol,
direction,
substream,
..
}) => match direction {
Direction::Inbound => self.on_inbound_substream(peer, protocol, substream),
Direction::Outbound(substream_id) => self.on_outbound_substream(peer, protocol, substream_id, substream),
},
_ => {}
},
_ = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => match event {
Some(Ok(response)) => {
let _ = self.tx
.send(IdentifyEvent::PeerIdentified {
peer: response.peer,
protocol_version: response.protocol_version,
user_agent: response.user_agent,
supported_protocols: response.supported_protocols.into_iter().map(From::from).collect(),
observed_address: response.observed_address.map_or(Multiaddr::empty(), |address| address),
listen_addresses: response.listen_addresses,
})
.await;
}
Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"),
None => return,
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{config::ConfigBuilder, transport::tcp::config::Config as TcpConfig, Litep2p};
use multiaddr::{Multiaddr, Protocol};
fn create_litep2p() -> (
Litep2p,
Box<dyn Stream<Item = IdentifyEvent> + Send + Unpin>,
PeerId,
) {
let (identify_config, identify) =
Config::new("1.0.0".to_string(), Some("litep2p/1.0.0".to_string()));
let keypair = crate::crypto::ed25519::Keypair::generate();
let peer = PeerId::from_public_key(&crate::crypto::PublicKey::Ed25519(keypair.public()));
let config = ConfigBuilder::new()
.with_keypair(keypair)
.with_tcp(TcpConfig {
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
..Default::default()
})
.with_libp2p_identify(identify_config)
.build();
(Litep2p::new(config).unwrap(), identify, peer)
}
#[tokio::test]
async fn update_identify_addresses() {
let (mut litep2p1, mut event_stream1, peer1) = create_litep2p();
let (mut litep2p2, mut event_stream2, _peer2) = create_litep2p();
let litep2p1_address = litep2p1.listen_addresses().into_iter().next().unwrap();
let multiaddr: Multiaddr = "/ip6/::9/tcp/111".parse().unwrap();
assert!(litep2p1.public_addresses().add_address(multiaddr.clone()).unwrap());
litep2p2.dial_address(litep2p1_address.clone()).await.unwrap();
let expected_multiaddr = multiaddr.with(Protocol::P2p(peer1.into()));
tokio::spawn(async move {
loop {
tokio::select! {
_ = litep2p1.next_event() => {}
_event = event_stream1.next() => {}
}
}
});
loop {
tokio::select! {
_ = litep2p2.next_event() => {}
event = event_stream2.next() => match event {
Some(IdentifyEvent::PeerIdentified {
listen_addresses,
..
}) => {
assert!(listen_addresses.iter().any(|address| address == &expected_multiaddr));
break;
}
_ => {}
}
}
}
}
}