libp2p_identify/
handler.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::protocol::{Info, PushInfo, UpgradeError};
22use crate::{protocol, PROTOCOL_NAME, PUSH_PROTOCOL_NAME};
23use either::Either;
24use futures::prelude::*;
25use futures_bounded::Timeout;
26use futures_timer::Delay;
27use libp2p_core::upgrade::{ReadyUpgrade, SelectUpgrade};
28use libp2p_core::Multiaddr;
29use libp2p_identity::PeerId;
30use libp2p_identity::PublicKey;
31use libp2p_swarm::handler::{
32    ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
33    ProtocolSupport,
34};
35use libp2p_swarm::{
36    ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
37    SubstreamProtocol, SupportedProtocols,
38};
39use log::{warn, Level};
40use smallvec::SmallVec;
41use std::collections::HashSet;
42use std::{io, task::Context, task::Poll, time::Duration};
43
44const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
45const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
46
47/// Protocol handler for sending and receiving identification requests.
48///
49/// Outbound requests are sent periodically. The handler performs expects
50/// at least one identification request to be answered by the remote before
51/// permitting the underlying connection to be closed.
52pub struct Handler {
53    remote_peer_id: PeerId,
54    /// Pending events to yield.
55    events: SmallVec<
56        [ConnectionHandlerEvent<
57            Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
58            (),
59            Event,
60            io::Error,
61        >; 4],
62    >,
63
64    active_streams: futures_bounded::FuturesSet<Result<Success, UpgradeError>>,
65
66    /// Future that fires when we need to identify the node again.
67    trigger_next_identify: Delay,
68
69    /// Whether we have exchanged at least one periodic identify.
70    exchanged_one_periodic_identify: bool,
71
72    /// The interval of `trigger_next_identify`, i.e. the recurrent delay.
73    interval: Duration,
74
75    /// The public key of the local peer.
76    public_key: PublicKey,
77
78    /// Application-specific version of the protocol family used by the peer,
79    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
80    protocol_version: String,
81
82    /// Name and version of the peer, similar to the `User-Agent` header in
83    /// the HTTP protocol.
84    agent_version: String,
85
86    /// Address observed by or for the remote.
87    observed_addr: Multiaddr,
88
89    /// Identify information about the remote peer.
90    remote_info: Option<Info>,
91
92    local_supported_protocols: SupportedProtocols,
93    remote_supported_protocols: HashSet<StreamProtocol>,
94    external_addresses: HashSet<Multiaddr>,
95}
96
97/// An event from `Behaviour` with the information requested by the `Handler`.
98#[derive(Debug)]
99pub enum InEvent {
100    AddressesChanged(HashSet<Multiaddr>),
101    Push,
102}
103
104/// Event produced by the `Handler`.
105#[derive(Debug)]
106#[allow(clippy::large_enum_variant)]
107pub enum Event {
108    /// We obtained identification information from the remote.
109    Identified(Info),
110    /// We replied to an identification request from the remote.
111    Identification,
112    /// We actively pushed our identification information to the remote.
113    IdentificationPushed,
114    /// Failed to identify the remote, or to reply to an identification request.
115    IdentificationError(StreamUpgradeError<UpgradeError>),
116}
117
118impl Handler {
119    /// Creates a new `Handler`.
120    #[allow(clippy::too_many_arguments)]
121    pub fn new(
122        initial_delay: Duration,
123        interval: Duration,
124        remote_peer_id: PeerId,
125        public_key: PublicKey,
126        protocol_version: String,
127        agent_version: String,
128        observed_addr: Multiaddr,
129        external_addresses: HashSet<Multiaddr>,
130    ) -> Self {
131        Self {
132            remote_peer_id,
133            events: SmallVec::new(),
134            active_streams: futures_bounded::FuturesSet::new(
135                STREAM_TIMEOUT,
136                MAX_CONCURRENT_STREAMS_PER_CONNECTION,
137            ),
138            trigger_next_identify: Delay::new(initial_delay),
139            exchanged_one_periodic_identify: false,
140            interval,
141            public_key,
142            protocol_version,
143            agent_version,
144            observed_addr,
145            local_supported_protocols: SupportedProtocols::default(),
146            remote_supported_protocols: HashSet::default(),
147            remote_info: Default::default(),
148            external_addresses,
149        }
150    }
151
152    fn on_fully_negotiated_inbound(
153        &mut self,
154        FullyNegotiatedInbound {
155            protocol: output, ..
156        }: FullyNegotiatedInbound<
157            <Self as ConnectionHandler>::InboundProtocol,
158            <Self as ConnectionHandler>::InboundOpenInfo,
159        >,
160    ) {
161        match output {
162            future::Either::Left(stream) => {
163                let info = self.build_info();
164
165                if self
166                    .active_streams
167                    .try_push(
168                        protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify),
169                    )
170                    .is_err()
171                {
172                    warn!("Dropping inbound stream because we are at capacity");
173                } else {
174                    self.exchanged_one_periodic_identify = true;
175                }
176            }
177            future::Either::Right(stream) => {
178                if self
179                    .active_streams
180                    .try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush))
181                    .is_err()
182                {
183                    warn!("Dropping inbound identify push stream because we are at capacity");
184                }
185            }
186        }
187    }
188
189    fn on_fully_negotiated_outbound(
190        &mut self,
191        FullyNegotiatedOutbound {
192            protocol: output, ..
193        }: FullyNegotiatedOutbound<
194            <Self as ConnectionHandler>::OutboundProtocol,
195            <Self as ConnectionHandler>::OutboundOpenInfo,
196        >,
197    ) {
198        match output {
199            future::Either::Left(stream) => {
200                if self
201                    .active_streams
202                    .try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify))
203                    .is_err()
204                {
205                    warn!("Dropping outbound identify stream because we are at capacity");
206                }
207            }
208            future::Either::Right(stream) => {
209                let info = self.build_info();
210
211                if self
212                    .active_streams
213                    .try_push(
214                        protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentifyPush),
215                    )
216                    .is_err()
217                {
218                    warn!("Dropping outbound identify push stream because we are at capacity");
219                }
220            }
221        }
222    }
223
224    fn build_info(&mut self) -> Info {
225        Info {
226            public_key: self.public_key.clone(),
227            protocol_version: self.protocol_version.clone(),
228            agent_version: self.agent_version.clone(),
229            listen_addrs: Vec::from_iter(self.external_addresses.iter().cloned()),
230            protocols: Vec::from_iter(self.local_supported_protocols.iter().cloned()),
231            observed_addr: self.observed_addr.clone(),
232        }
233    }
234
235    fn handle_incoming_info(&mut self, info: &Info) {
236        self.remote_info.replace(info.clone());
237
238        self.update_supported_protocols_for_remote(info);
239    }
240
241    fn update_supported_protocols_for_remote(&mut self, remote_info: &Info) {
242        let new_remote_protocols = HashSet::from_iter(remote_info.protocols.clone());
243
244        let remote_added_protocols = new_remote_protocols
245            .difference(&self.remote_supported_protocols)
246            .cloned()
247            .collect::<HashSet<_>>();
248        let remote_removed_protocols = self
249            .remote_supported_protocols
250            .difference(&new_remote_protocols)
251            .cloned()
252            .collect::<HashSet<_>>();
253
254        if !remote_added_protocols.is_empty() {
255            self.events
256                .push(ConnectionHandlerEvent::ReportRemoteProtocols(
257                    ProtocolSupport::Added(remote_added_protocols),
258                ));
259        }
260
261        if !remote_removed_protocols.is_empty() {
262            self.events
263                .push(ConnectionHandlerEvent::ReportRemoteProtocols(
264                    ProtocolSupport::Removed(remote_removed_protocols),
265                ));
266        }
267
268        self.remote_supported_protocols = new_remote_protocols;
269    }
270
271    fn local_protocols_to_string(&mut self) -> String {
272        self.local_supported_protocols
273            .iter()
274            .map(|p| p.to_string())
275            .collect::<Vec<_>>()
276            .join(", ")
277    }
278}
279
280impl ConnectionHandler for Handler {
281    type FromBehaviour = InEvent;
282    type ToBehaviour = Event;
283    type Error = io::Error;
284    type InboundProtocol =
285        SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
286    type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
287    type OutboundOpenInfo = ();
288    type InboundOpenInfo = ();
289
290    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
291        SubstreamProtocol::new(
292            SelectUpgrade::new(
293                ReadyUpgrade::new(PROTOCOL_NAME),
294                ReadyUpgrade::new(PUSH_PROTOCOL_NAME),
295            ),
296            (),
297        )
298    }
299
300    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
301        match event {
302            InEvent::AddressesChanged(addresses) => {
303                self.external_addresses = addresses;
304            }
305            InEvent::Push => {
306                self.events
307                    .push(ConnectionHandlerEvent::OutboundSubstreamRequest {
308                        protocol: SubstreamProtocol::new(
309                            Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
310                            (),
311                        ),
312                    });
313            }
314        }
315    }
316
317    fn connection_keep_alive(&self) -> KeepAlive {
318        if !self.active_streams.is_empty() {
319            return KeepAlive::Yes;
320        }
321
322        KeepAlive::No
323    }
324
325    fn poll(
326        &mut self,
327        cx: &mut Context<'_>,
328    ) -> Poll<
329        ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event, Self::Error>,
330    > {
331        if let Some(event) = self.events.pop() {
332            return Poll::Ready(event);
333        }
334
335        // Poll the future that fires when we need to identify the node again.
336        if let Poll::Ready(()) = self.trigger_next_identify.poll_unpin(cx) {
337            self.trigger_next_identify.reset(self.interval);
338            let event = ConnectionHandlerEvent::OutboundSubstreamRequest {
339                protocol: SubstreamProtocol::new(
340                    Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)),
341                    (),
342                ),
343            };
344            return Poll::Ready(event);
345        }
346
347        match self.active_streams.poll_unpin(cx) {
348            Poll::Ready(Ok(Ok(Success::ReceivedIdentify(remote_info)))) => {
349                self.handle_incoming_info(&remote_info);
350
351                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
352                    remote_info,
353                )));
354            }
355            Poll::Ready(Ok(Ok(Success::SentIdentifyPush))) => {
356                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
357                    Event::IdentificationPushed,
358                ));
359            }
360            Poll::Ready(Ok(Ok(Success::SentIdentify))) => {
361                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
362                    Event::Identification,
363                ));
364            }
365            Poll::Ready(Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info)))) => {
366                if let Some(mut info) = self.remote_info.clone() {
367                    info.merge(remote_push_info);
368                    self.handle_incoming_info(&info);
369
370                    return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
371                        Event::Identified(info),
372                    ));
373                };
374            }
375            Poll::Ready(Ok(Err(e))) => {
376                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
377                    Event::IdentificationError(StreamUpgradeError::Apply(e)),
378                ));
379            }
380            Poll::Ready(Err(Timeout { .. })) => {
381                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
382                    Event::IdentificationError(StreamUpgradeError::Timeout),
383                ));
384            }
385            Poll::Pending => {}
386        }
387
388        Poll::Pending
389    }
390
391    fn on_connection_event(
392        &mut self,
393        event: ConnectionEvent<
394            Self::InboundProtocol,
395            Self::OutboundProtocol,
396            Self::InboundOpenInfo,
397            Self::OutboundOpenInfo,
398        >,
399    ) {
400        match event {
401            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
402                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
403            }
404            ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
405                self.on_fully_negotiated_outbound(fully_negotiated_outbound)
406            }
407            ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
408                self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
409                    Event::IdentificationError(
410                        error.map_upgrade_err(|e| void::unreachable(e.into_inner())),
411                    ),
412                ));
413                self.trigger_next_identify.reset(self.interval);
414            }
415            ConnectionEvent::AddressChange(_)
416            | ConnectionEvent::ListenUpgradeError(_)
417            | ConnectionEvent::RemoteProtocolsChange(_) => {}
418            ConnectionEvent::LocalProtocolsChange(change) => {
419                let before = log::log_enabled!(Level::Debug)
420                    .then(|| self.local_protocols_to_string())
421                    .unwrap_or_default();
422                let protocols_changed = self.local_supported_protocols.on_protocols_change(change);
423                let after = log::log_enabled!(Level::Debug)
424                    .then(|| self.local_protocols_to_string())
425                    .unwrap_or_default();
426
427                if protocols_changed && self.exchanged_one_periodic_identify {
428                    log::debug!(
429                        "Supported listen protocols changed from [{before}] to [{after}], pushing to {}",
430                        self.remote_peer_id
431                    );
432
433                    self.events
434                        .push(ConnectionHandlerEvent::OutboundSubstreamRequest {
435                            protocol: SubstreamProtocol::new(
436                                Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
437                                (),
438                            ),
439                        });
440                }
441            }
442        }
443    }
444}
445
446enum Success {
447    SentIdentify,
448    ReceivedIdentify(Info),
449    SentIdentifyPush,
450    ReceivedIdentifyPush(PushInfo),
451}