libp2p_identify/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::proto;
22use asynchronous_codec::{FramedRead, FramedWrite};
23use futures::prelude::*;
24use libp2p_core::{multiaddr, Multiaddr};
25use libp2p_identity as identity;
26use libp2p_identity::PublicKey;
27use libp2p_swarm::StreamProtocol;
28use log::{debug, trace};
29use std::convert::TryFrom;
30use std::io;
31use thiserror::Error;
32
33const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
34
35pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0");
36
37pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0");
38
39/// Identify information of a peer sent in protocol messages.
40#[derive(Debug, Clone)]
41pub struct Info {
42    /// The public key of the local peer.
43    pub public_key: PublicKey,
44    /// Application-specific version of the protocol family used by the peer,
45    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
46    pub protocol_version: String,
47    /// Name and version of the peer, similar to the `User-Agent` header in
48    /// the HTTP protocol.
49    pub agent_version: String,
50    /// The addresses that the peer is listening on.
51    pub listen_addrs: Vec<Multiaddr>,
52    /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
53    pub protocols: Vec<StreamProtocol>,
54    /// Address observed by or for the remote.
55    pub observed_addr: Multiaddr,
56}
57
58impl Info {
59    pub fn merge(&mut self, info: PushInfo) {
60        if let Some(public_key) = info.public_key {
61            self.public_key = public_key;
62        }
63        if let Some(protocol_version) = info.protocol_version {
64            self.protocol_version = protocol_version;
65        }
66        if let Some(agent_version) = info.agent_version {
67            self.agent_version = agent_version;
68        }
69        if !info.listen_addrs.is_empty() {
70            self.listen_addrs = info.listen_addrs;
71        }
72        if !info.protocols.is_empty() {
73            self.protocols = info.protocols;
74        }
75        if let Some(observed_addr) = info.observed_addr {
76            self.observed_addr = observed_addr;
77        }
78    }
79}
80
81/// Identify push information of a peer sent in protocol messages.
82/// Note that missing fields should be ignored, as peers may choose to send partial updates containing only the fields whose values have changed.
83#[derive(Debug, Clone)]
84pub struct PushInfo {
85    pub public_key: Option<PublicKey>,
86    pub protocol_version: Option<String>,
87    pub agent_version: Option<String>,
88    pub listen_addrs: Vec<Multiaddr>,
89    pub protocols: Vec<StreamProtocol>,
90    pub observed_addr: Option<Multiaddr>,
91}
92
93pub(crate) async fn send_identify<T>(io: T, info: Info) -> Result<(), UpgradeError>
94where
95    T: AsyncWrite + Unpin,
96{
97    trace!("Sending: {:?}", info);
98
99    let listen_addrs = info
100        .listen_addrs
101        .into_iter()
102        .map(|addr| addr.to_vec())
103        .collect();
104
105    let pubkey_bytes = info.public_key.encode_protobuf();
106
107    let message = proto::Identify {
108        agentVersion: Some(info.agent_version),
109        protocolVersion: Some(info.protocol_version),
110        publicKey: Some(pubkey_bytes),
111        listenAddrs: listen_addrs,
112        observedAddr: Some(info.observed_addr.to_vec()),
113        protocols: info.protocols.into_iter().map(|p| p.to_string()).collect(),
114    };
115
116    let mut framed_io = FramedWrite::new(
117        io,
118        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
119    );
120
121    framed_io.send(message).await?;
122    framed_io.close().await?;
123
124    Ok(())
125}
126
127pub(crate) async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
128where
129    T: AsyncRead + AsyncWrite + Unpin,
130{
131    let info = recv(socket).await?.try_into()?;
132
133    trace!("Received {:?}", info);
134
135    Ok(info)
136}
137
138pub(crate) async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
139where
140    T: AsyncRead + AsyncWrite + Unpin,
141{
142    let info = recv(socket).await?.try_into()?;
143
144    trace!("Received {:?}", info);
145
146    Ok(info)
147}
148
149async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
150where
151    T: AsyncRead + AsyncWrite + Unpin,
152{
153    // Even though we won't write to the stream anymore we don't close it here.
154    // The reason for this is that the `close` call on some transport's require the
155    // remote's ACK, but it could be that the remote already dropped the stream
156    // after finishing their write.
157
158    let info = FramedRead::new(
159        socket,
160        quick_protobuf_codec::Codec::<proto::Identify>::new(MAX_MESSAGE_SIZE_BYTES),
161    )
162    .next()
163    .await
164    .ok_or(UpgradeError::StreamClosed)??;
165
166    Ok(info)
167}
168
169fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
170    listen_addrs
171        .into_iter()
172        .filter_map(|bytes| match Multiaddr::try_from(bytes) {
173            Ok(a) => Some(a),
174            Err(e) => {
175                debug!("Unable to parse multiaddr: {e:?}");
176                None
177            }
178        })
179        .collect()
180}
181
182fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
183    protocols
184        .into_iter()
185        .filter_map(|p| match StreamProtocol::try_from_owned(p) {
186            Ok(p) => Some(p),
187            Err(e) => {
188                debug!("Received invalid protocol from peer: {e}");
189                None
190            }
191        })
192        .collect()
193}
194
195fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
196    public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
197        Ok(k) => Some(k),
198        Err(e) => {
199            debug!("Unable to decode public key: {e:?}");
200            None
201        }
202    })
203}
204
205fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
206    observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
207        Ok(a) => Some(a),
208        Err(e) => {
209            debug!("Unable to parse observed multiaddr: {e:?}");
210            None
211        }
212    })
213}
214
215impl TryFrom<proto::Identify> for Info {
216    type Error = UpgradeError;
217
218    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
219        let public_key = {
220            match parse_public_key(msg.publicKey) {
221                Some(key) => key,
222                // This will always produce a DecodingError if the public key is missing.
223                None => PublicKey::try_decode_protobuf(Default::default())?,
224            }
225        };
226
227        let info = Info {
228            public_key,
229            protocol_version: msg.protocolVersion.unwrap_or_default(),
230            agent_version: msg.agentVersion.unwrap_or_default(),
231            listen_addrs: parse_listen_addrs(msg.listenAddrs),
232            protocols: parse_protocols(msg.protocols),
233            observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
234        };
235
236        Ok(info)
237    }
238}
239
240impl TryFrom<proto::Identify> for PushInfo {
241    type Error = UpgradeError;
242
243    fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
244        let info = PushInfo {
245            public_key: parse_public_key(msg.publicKey),
246            protocol_version: msg.protocolVersion,
247            agent_version: msg.agentVersion,
248            listen_addrs: parse_listen_addrs(msg.listenAddrs),
249            protocols: parse_protocols(msg.protocols),
250            observed_addr: parse_observed_addr(msg.observedAddr),
251        };
252
253        Ok(info)
254    }
255}
256
257#[derive(Debug, Error)]
258pub enum UpgradeError {
259    #[error(transparent)]
260    Codec(#[from] quick_protobuf_codec::Error),
261    #[error("I/O interaction failed")]
262    Io(#[from] io::Error),
263    #[error("Stream closed")]
264    StreamClosed,
265    #[error("Failed decoding multiaddr")]
266    Multiaddr(#[from] multiaddr::Error),
267    #[error("Failed decoding public key")]
268    PublicKey(#[from] identity::DecodingError),
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use libp2p_identity as identity;
275
276    #[test]
277    fn skip_invalid_multiaddr() {
278        let valid_multiaddr: Multiaddr = "/ip6/2001:db8::/tcp/1234".parse().unwrap();
279        let valid_multiaddr_bytes = valid_multiaddr.to_vec();
280
281        let invalid_multiaddr = {
282            let a = vec![255; 8];
283            assert!(Multiaddr::try_from(a.clone()).is_err());
284            a
285        };
286
287        let payload = proto::Identify {
288            agentVersion: None,
289            listenAddrs: vec![valid_multiaddr_bytes, invalid_multiaddr],
290            observedAddr: None,
291            protocolVersion: None,
292            protocols: vec![],
293            publicKey: Some(
294                identity::Keypair::generate_ed25519()
295                    .public()
296                    .encode_protobuf(),
297            ),
298        };
299
300        let info = PushInfo::try_from(payload).expect("not to fail");
301
302        assert_eq!(info.listen_addrs, vec![valid_multiaddr])
303    }
304}