libp2p_identify/
protocol.rs1use crate::proto;
22use asynchronous_codec::{FramedRead, FramedWrite};
23use futures::prelude::*;
24use libp2p_core::{multiaddr, Multiaddr};
25use libp2p_identity as identity;
26use libp2p_identity::PublicKey;
27use libp2p_swarm::StreamProtocol;
28use log::{debug, trace};
29use std::convert::TryFrom;
30use std::io;
31use thiserror::Error;
32
33const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
34
35pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
36
37pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
38
39#[derive(Debug, Clone)]
41pub struct Info {
42 pub public_key: PublicKey,
44 pub protocol_version: String,
47 pub agent_version: String,
50 pub listen_addrs: Vec<Multiaddr>,
52 pub protocols: Vec<StreamProtocol>,
54 pub observed_addr: Multiaddr,
56}
57
58impl Info {
59 pub fn merge(&mut self, info: PushInfo) {
60 if let Some(public_key) = info.public_key {
61 self.public_key = public_key;
62 }
63 if let Some(protocol_version) = info.protocol_version {
64 self.protocol_version = protocol_version;
65 }
66 if let Some(agent_version) = info.agent_version {
67 self.agent_version = agent_version;
68 }
69 if !info.listen_addrs.is_empty() {
70 self.listen_addrs = info.listen_addrs;
71 }
72 if !info.protocols.is_empty() {
73 self.protocols = info.protocols;
74 }
75 if let Some(observed_addr) = info.observed_addr {
76 self.observed_addr = observed_addr;
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
84pub struct PushInfo {
85 pub public_key: Option<PublicKey>,
86 pub protocol_version: Option<String>,
87 pub agent_version: Option<String>,
88 pub listen_addrs: Vec<Multiaddr>,
89 pub protocols: Vec<StreamProtocol>,
90 pub observed_addr: Option<Multiaddr>,
91}
92
93pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<(), UpgradeError>
94where
95 T: AsyncWrite + Unpin,
96{
97 trace!("Sending: {:?}", info);
98
99 let listen_addrs = info
100 .listen_addrs
101 .into_iter()
102 .map(|addr| addr.to_vec())
103 .collect();
104
105 let pubkey_bytes = info.public_key.encode_protobuf();
106
107 let message = proto::Identify {
108 agentVersion: Some(info.agent_version),
109 protocolVersion: Some(info.protocol_version),
110 publicKey: Some(pubkey_bytes),
111 listenAddrs: listen_addrs,
112 observedAddr: Some(info.observed_addr.to_vec()),
113 protocols: info.protocols.into_iter().map(|p| p.to_string()).collect(),
114 };
115
116 let mut framed_io = FramedWrite::new(
117 io,
118 quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
119 );
120
121 framed_io.send(message).await?;
122 framed_io.close().await?;
123
124 Ok(())
125}
126
127pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
128where
129 T: AsyncRead + AsyncWrite + Unpin,
130{
131 let info = recv(socket).await?.try_into()?;
132
133 trace!("Received {:?}", info);
134
135 Ok(info)
136}
137
138pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
139where
140 T: AsyncRead + AsyncWrite + Unpin,
141{
142 let info = recv(socket).await?.try_into()?;
143
144 trace!("Received {:?}", info);
145
146 Ok(info)
147}
148
149async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
150where
151 T: AsyncRead + AsyncWrite + Unpin,
152{
153 let info = FramedRead::new(
159 socket,
160 quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
161 )
162 .next()
163 .await
164 .ok_or(UpgradeError::StreamClosed)??;
165
166 Ok(info)
167}
168
169fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
170 listen_addrs
171 .into_iter()
172 .filter_map(|bytes| match Multiaddr::try_from(bytes) {
173 Ok(a) => Some(a),
174 Err(e) => {
175 debug!("Unable to parse multiaddr: {e:?}");
176 None
177 }
178 })
179 .collect()
180}
181
182fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
183 protocols
184 .into_iter()
185 .filter_map(|p| match StreamProtocol::try_from_owned(p) {
186 Ok(p) => Some(p),
187 Err(e) => {
188 debug!("Received invalid protocol from peer: {e}");
189 None
190 }
191 })
192 .collect()
193}
194
195fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
196 public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
197 Ok(k) => Some(k),
198 Err(e) => {
199 debug!("Unable to decode public key: {e:?}");
200 None
201 }
202 })
203}
204
205fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
206 observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
207 Ok(a) => Some(a),
208 Err(e) => {
209 debug!("Unable to parse observed multiaddr: {e:?}");
210 None
211 }
212 })
213}
214
215impl TryFrom<proto::Identify> for Info {
216 type Error = UpgradeError;
217
218 fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
219 let public_key = {
220 match parse_public_key(msg.publicKey) {
221 Some(key) => key,
222 None => PublicKey::try_decode_protobuf(Default::default())?,
224 }
225 };
226
227 let info = Info {
228 public_key,
229 protocol_version: msg.protocolVersion.unwrap_or_default(),
230 agent_version: msg.agentVersion.unwrap_or_default(),
231 listen_addrs: parse_listen_addrs(msg.listenAddrs),
232 protocols: parse_protocols(msg.protocols),
233 observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
234 };
235
236 Ok(info)
237 }
238}
239
240impl TryFrom<proto::Identify> for PushInfo {
241 type Error = UpgradeError;
242
243 fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
244 let info = PushInfo {
245 public_key: parse_public_key(msg.publicKey),
246 protocol_version: msg.protocolVersion,
247 agent_version: msg.agentVersion,
248 listen_addrs: parse_listen_addrs(msg.listenAddrs),
249 protocols: parse_protocols(msg.protocols),
250 observed_addr: parse_observed_addr(msg.observedAddr),
251 };
252
253 Ok(info)
254 }
255}
256
257#[derive(Debug, Error)]
258pub enum UpgradeError {
259 #[error(transparent)]
260 Codec(#[from] quick_protobuf_codec::Error),
261 #[error("I/O interaction failed")]
262 Io(#[from] io::Error),
263 #[error("Stream closed")]
264 StreamClosed,
265 #[error("Failed decoding multiaddr")]
266 Multiaddr(#[from] multiaddr::Error),
267 #[error("Failed decoding public key")]
268 PublicKey(#[from] identity::DecodingError),
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use libp2p_identity as identity;
275
276 #[test]
277 fn skip_invalid_multiaddr() {
278 let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
279 let valid_multiaddr_bytes = valid_multiaddr.to_vec();
280
281 let invalid_multiaddr = {
282 let a = vec![255; 8];
283 assert!(Multiaddr::try_from(a.clone()).is_err());
284 a
285 };
286
287 let payload = proto::Identify {
288 agentVersion: None,
289 listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
290 observedAddr: None,
291 protocolVersion: None,
292 protocols: vec![],
293 publicKey: Some(
294 identity::Keypair::generate_ed25519()
295 .public()
296 .encode_protobuf(),
297 ),
298 };
299
300 let info = PushInfo::try_from(payload).expect("not to fail");
301
302 assert_eq!(info.listen_addrs, vec![valid_multiaddr])
303 }
304}