libp2p_identify/
protocol.rs1use crate::proto;
22use asynchronous_codec::{FramedRead, FramedWrite};
23use futures::prelude::*;
24use libp2p_core::{multiaddr, Multiaddr};
25use libp2p_identity as identity;
26use libp2p_identity::PublicKey;
27use libp2p_swarm::StreamProtocol;
28use std::io;
29use thiserror::Error;
30
31const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
32
33pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
34
35pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
36
37#[derive(Debug, Clone)]
39pub struct Info {
40 pub public_key: PublicKey,
42 pub protocol_version: String,
45 pub agent_version: String,
48 pub listen_addrs: Vec<Multiaddr>,
50 pub protocols: Vec<StreamProtocol>,
52 pub observed_addr: Multiaddr,
54}
55
56impl Info {
57 pub fn merge(&mut self, info: PushInfo) {
58 if let Some(public_key) = info.public_key {
59 self.public_key = public_key;
60 }
61 if let Some(protocol_version) = info.protocol_version {
62 self.protocol_version = protocol_version;
63 }
64 if let Some(agent_version) = info.agent_version {
65 self.agent_version = agent_version;
66 }
67 if !info.listen_addrs.is_empty() {
68 self.listen_addrs = info.listen_addrs;
69 }
70 if !info.protocols.is_empty() {
71 self.protocols = info.protocols;
72 }
73 if let Some(observed_addr) = info.observed_addr {
74 self.observed_addr = observed_addr;
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
82pub struct PushInfo {
83 pub public_key: Option<PublicKey>,
84 pub protocol_version: Option<String>,
85 pub agent_version: Option<String>,
86 pub listen_addrs: Vec<Multiaddr>,
87 pub protocols: Vec<StreamProtocol>,
88 pub observed_addr: Option<Multiaddr>,
89}
90
91pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<Info, UpgradeError>
92where
93 T: AsyncWrite + Unpin,
94{
95 tracing::trace!("Sending: {:?}", info);
96
97 let listen_addrs = info.listen_addrs.iter().map(|addr| addr.to_vec()).collect();
98
99 let pubkey_bytes = info.public_key.encode_protobuf();
100
101 let message = proto::Identify {
102 agentVersion: Some(info.agent_version.clone()),
103 protocolVersion: Some(info.protocol_version.clone()),
104 publicKey: Some(pubkey_bytes),
105 listenAddrs: listen_addrs,
106 observedAddr: Some(info.observed_addr.to_vec()),
107 protocols: info.protocols.iter().map(|p| p.to_string()).collect(),
108 };
109
110 let mut framed_io = FramedWrite::new(
111 io,
112 quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
113 );
114
115 framed_io.send(message).await?;
116 framed_io.close().await?;
117
118 Ok(info)
119}
120
121pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
122where
123 T: AsyncRead + AsyncWrite + Unpin,
124{
125 let info = recv(socket).await?.try_into()?;
126
127 tracing::trace!(?info, "Received");
128
129 Ok(info)
130}
131
132pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
133where
134 T: AsyncRead + AsyncWrite + Unpin,
135{
136 let info = recv(socket).await?.try_into()?;
137
138 tracing::trace!(?info, "Received");
139
140 Ok(info)
141}
142
143async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
144where
145 T: AsyncRead + AsyncWrite + Unpin,
146{
147 let info = FramedRead::new(
153 socket,
154 quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
155 )
156 .next()
157 .await
158 .ok_or(UpgradeError::StreamClosed)??;
159
160 Ok(info)
161}
162
163fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
164 listen_addrs
165 .into_iter()
166 .filter_map(|bytes| match Multiaddr::try_from(bytes) {
167 Ok(a) => Some(a),
168 Err(e) => {
169 tracing::debug!("Unable to parse multiaddr: {e:?}");
170 None
171 }
172 })
173 .collect()
174}
175
176fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
177 protocols
178 .into_iter()
179 .filter_map(|p| match StreamProtocol::try_from_owned(p) {
180 Ok(p) => Some(p),
181 Err(e) => {
182 tracing::debug!("Received invalid protocol from peer: {e}");
183 None
184 }
185 })
186 .collect()
187}
188
189fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
190 public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
191 Ok(k) => Some(k),
192 Err(e) => {
193 tracing::debug!("Unable to decode public key: {e:?}");
194 None
195 }
196 })
197}
198
199fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
200 observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
201 Ok(a) => Some(a),
202 Err(e) => {
203 tracing::debug!("Unable to parse observed multiaddr: {e:?}");
204 None
205 }
206 })
207}
208
209impl TryFrom<proto::Identify> for Info {
210 type Error = UpgradeError;
211
212 fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
213 let public_key = {
214 match parse_public_key(msg.publicKey) {
215 Some(key) => key,
216 None => PublicKey::try_decode_protobuf(Default::default())?,
218 }
219 };
220
221 let info = Info {
222 public_key,
223 protocol_version: msg.protocolVersion.unwrap_or_default(),
224 agent_version: msg.agentVersion.unwrap_or_default(),
225 listen_addrs: parse_listen_addrs(msg.listenAddrs),
226 protocols: parse_protocols(msg.protocols),
227 observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
228 };
229
230 Ok(info)
231 }
232}
233
234impl TryFrom<proto::Identify> for PushInfo {
235 type Error = UpgradeError;
236
237 fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
238 let info = PushInfo {
239 public_key: parse_public_key(msg.publicKey),
240 protocol_version: msg.protocolVersion,
241 agent_version: msg.agentVersion,
242 listen_addrs: parse_listen_addrs(msg.listenAddrs),
243 protocols: parse_protocols(msg.protocols),
244 observed_addr: parse_observed_addr(msg.observedAddr),
245 };
246
247 Ok(info)
248 }
249}
250
251#[derive(Debug, Error)]
252pub enum UpgradeError {
253 #[error(transparent)]
254 Codec(#[from] quick_protobuf_codec::Error),
255 #[error("I/O interaction failed")]
256 Io(#[from] io::Error),
257 #[error("Stream closed")]
258 StreamClosed,
259 #[error("Failed decoding multiaddr")]
260 Multiaddr(#[from] multiaddr::Error),
261 #[error("Failed decoding public key")]
262 PublicKey(#[from] identity::DecodingError),
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use libp2p_identity as identity;
269
270 #[test]
271 fn skip_invalid_multiaddr() {
272 let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
273 let valid_multiaddr_bytes = valid_multiaddr.to_vec();
274
275 let invalid_multiaddr = {
276 let a = vec![255; 8];
277 assert!(Multiaddr::try_from(a.clone()).is_err());
278 a
279 };
280
281 let payload = proto::Identify {
282 agentVersion: None,
283 listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
284 observedAddr: None,
285 protocolVersion: None,
286 protocols: vec![],
287 publicKey: Some(
288 identity::Keypair::generate_ed25519()
289 .public()
290 .encode_protobuf(),
291 ),
292 };
293
294 let info = PushInfo::try_from(payload).expect("not to fail");
295
296 assert_eq!(info.listen_addrs, vec![valid_multiaddr])
297 }
298}