libp2p_identify/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::proto;
22use asynchronous_codec::{FramedRead, FramedWrite};
23use futures::prelude::*;
24use libp2p_core::{multiaddr, Multiaddr};
25use libp2p_identity as identity;
26use libp2p_identity::PublicKey;
27use libp2p_swarm::StreamProtocol;
28use std::io;
29use thiserror::Error;
30
31const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
32
33pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
34
35pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
36
37/// Identify information of a peer sent in protocol messages.
38#[derive(Debug, Clone)]
39pub struct Info {
40    /// The public key of the local peer.
41    pub public_key: PublicKey,
42    /// Application-specific version of the protocol family used by the peer,
43    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
44    pub protocol_version: String,
45    /// Name and version of the peer, similar to the `User-Agent` header in
46    /// the HTTP protocol.
47    pub agent_version: String,
48    /// The addresses that the peer is listening on.
49    pub listen_addrs: Vec<Multiaddr>,
50    /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
51    pub protocols: Vec<StreamProtocol>,
52    /// Address observed by or for the remote.
53    pub observed_addr: Multiaddr,
54}
55
56impl Info {
57    pub fn merge(&mut self, info: PushInfo) {
58        if let Some(public_key) = info.public_key {
59            self.public_key = public_key;
60        }
61        if let Some(protocol_version) = info.protocol_version {
62            self.protocol_version = protocol_version;
63        }
64        if let Some(agent_version) = info.agent_version {
65            self.agent_version = agent_version;
66        }
67        if !info.listen_addrs.is_empty() {
68            self.listen_addrs = info.listen_addrs;
69        }
70        if !info.protocols.is_empty() {
71            self.protocols = info.protocols;
72        }
73        if let Some(observed_addr) = info.observed_addr {
74            self.observed_addr = observed_addr;
75        }
76    }
77}
78
79/// Identify push information of a peer sent in protocol messages.
80/// Note that missing fields should be ignored, as peers may choose to send partial updates containing only the fields whose values have changed.
81#[derive(Debug, Clone)]
82pub struct PushInfo {
83    pub public_key: Option<PublicKey>,
84    pub protocol_version: Option<String>,
85    pub agent_version: Option<String>,
86    pub listen_addrs: Vec<Multiaddr>,
87    pub protocols: Vec<StreamProtocol>,
88    pub observed_addr: Option<Multiaddr>,
89}
90
91pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<Info, UpgradeError>
92where
93    T: AsyncWrite + Unpin,
94{
95    tracing::trace!("Sending: {:?}", info);
96
97    let listen_addrs = info.listen_addrs.iter().map(|addr| addr.to_vec()).collect();
98
99    let pubkey_bytes = info.public_key.encode_protobuf();
100
101    let message = proto::Identify {
102        agentVersion: Some(info.agent_version.clone()),
103        protocolVersion: Some(info.protocol_version.clone()),
104        publicKey: Some(pubkey_bytes),
105        listenAddrs: listen_addrs,
106        observedAddr: Some(info.observed_addr.to_vec()),
107        protocols: info.protocols.iter().map(|p| p.to_string()).collect(),
108    };
109
110    let mut framed_io = FramedWrite::new(
111        io,
112        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
113    );
114
115    framed_io.send(message).await?;
116    framed_io.close().await?;
117
118    Ok(info)
119}
120
121pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
122where
123    T: AsyncRead + AsyncWrite + Unpin,
124{
125    let info = recv(socket).await?.try_into()?;
126
127    tracing::trace!(?info, "Received");
128
129    Ok(info)
130}
131
132pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
133where
134    T: AsyncRead + AsyncWrite + Unpin,
135{
136    let info = recv(socket).await?.try_into()?;
137
138    tracing::trace!(?info, "Received");
139
140    Ok(info)
141}
142
143async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
144where
145    T: AsyncRead + AsyncWrite + Unpin,
146{
147    // Even though we won't write to the stream anymore we don't close it here.
148    // The reason for this is that the `close` call on some transport's require the
149    // remote's ACK, but it could be that the remote already dropped the stream
150    // after finishing their write.
151
152    let info = FramedRead::new(
153        socket,
154        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
155    )
156    .next()
157    .await
158    .ok_or(UpgradeError::StreamClosed)??;
159
160    Ok(info)
161}
162
163fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
164    listen_addrs
165        .into_iter()
166        .filter_map(|bytes| match Multiaddr::try_from(bytes) {
167            Ok(a) => Some(a),
168            Err(e) => {
169                tracing::debug!("Unable to parse multiaddr: {e:?}");
170                None
171            }
172        })
173        .collect()
174}
175
176fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
177    protocols
178        .into_iter()
179        .filter_map(|p| match StreamProtocol::try_from_owned(p) {
180            Ok(p) => Some(p),
181            Err(e) => {
182                tracing::debug!("Received invalid protocol from peer: {e}");
183                None
184            }
185        })
186        .collect()
187}
188
189fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
190    public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
191        Ok(k) => Some(k),
192        Err(e) => {
193            tracing::debug!("Unable to decode public key: {e:?}");
194            None
195        }
196    })
197}
198
199fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
200    observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
201        Ok(a) => Some(a),
202        Err(e) => {
203            tracing::debug!("Unable to parse observed multiaddr: {e:?}");
204            None
205        }
206    })
207}
208
209impl TryFrom<proto::Identify> for Info {
210    type Error = UpgradeError;
211
212    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
213        let public_key = {
214            match parse_public_key(msg.publicKey) {
215                Some(key) => key,
216                // This will always produce a DecodingError if the public key is missing.
217                None => PublicKey::try_decode_protobuf(Default::default())?,
218            }
219        };
220
221        let info = Info {
222            public_key,
223            protocol_version: msg.protocolVersion.unwrap_or_default(),
224            agent_version: msg.agentVersion.unwrap_or_default(),
225            listen_addrs: parse_listen_addrs(msg.listenAddrs),
226            protocols: parse_protocols(msg.protocols),
227            observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
228        };
229
230        Ok(info)
231    }
232}
233
234impl TryFrom<proto::Identify> for PushInfo {
235    type Error = UpgradeError;
236
237    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
238        let info = PushInfo {
239            public_key: parse_public_key(msg.publicKey),
240            protocol_version: msg.protocolVersion,
241            agent_version: msg.agentVersion,
242            listen_addrs: parse_listen_addrs(msg.listenAddrs),
243            protocols: parse_protocols(msg.protocols),
244            observed_addr: parse_observed_addr(msg.observedAddr),
245        };
246
247        Ok(info)
248    }
249}
250
251#[derive(Debug, Error)]
252pub enum UpgradeError {
253    #[error(transparent)]
254    Codec(#[from] quick_protobuf_codec::Error),
255    #[error("I/O interaction failed")]
256    Io(#[from] io::Error),
257    #[error("Stream closed")]
258    StreamClosed,
259    #[error("Failed decoding multiaddr")]
260    Multiaddr(#[from] multiaddr::Error),
261    #[error("Failed decoding public key")]
262    PublicKey(#[from] identity::DecodingError),
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use libp2p_identity as identity;
269
270    #[test]
271    fn skip_invalid_multiaddr() {
272        let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
273        let valid_multiaddr_bytes = valid_multiaddr.to_vec();
274
275        let invalid_multiaddr = {
276            let a = vec![255; 8];
277            assert!(Multiaddr::try_from(a.clone()).is_err());
278            a
279        };
280
281        let payload = proto::Identify {
282            agentVersion: None,
283            listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
284            observedAddr: None,
285            protocolVersion: None,
286            protocols: vec![],
287            publicKey: Some(
288                identity::Keypair::generate_ed25519()
289                    .public()
290                    .encode_protobuf(),
291            ),
292        };
293
294        let info = PushInfo::try_from(payload).expect("not to fail");
295
296        assert_eq!(info.listen_addrs, vec![valid_multiaddr])
297    }
298}