1use crate::protocol::{Info, PushInfo, UpgradeError};
22use crate::{protocol, PROTOCOL_NAME, PUSH_PROTOCOL_NAME};
23use either::Either;
24use futures::prelude::*;
25use futures_bounded::Timeout;
26use futures_timer::Delay;
27use libp2p_core::upgrade::{ReadyUpgrade, SelectUpgrade};
28use libp2p_core::Multiaddr;
29use libp2p_identity::PeerId;
30use libp2p_identity::PublicKey;
31use libp2p_swarm::handler::{
32 ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
33 ProtocolSupport,
34};
35use libp2p_swarm::{
36 ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
37 SubstreamProtocol, SupportedProtocols,
38};
39use log::{warn, Level};
40use smallvec::SmallVec;
41use std::collections::HashSet;
42use std::{io, task::Context, task::Poll, time::Duration};
43
44const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
45const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
46
47pub struct Handler {
53 remote_peer_id: PeerId,
54 events: SmallVec<
56 [ConnectionHandlerEvent<
57 Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
58 (),
59 Event,
60 io::Error,
61 >; 4],
62 >,
63
64 active_streams: futures_bounded::FuturesSet<Result<Success, UpgradeError>>,
65
66 trigger_next_identify: Delay,
68
69 exchanged_one_periodic_identify: bool,
71
72 interval: Duration,
74
75 public_key: PublicKey,
77
78 protocol_version: String,
81
82 agent_version: String,
85
86 observed_addr: Multiaddr,
88
89 remote_info: Option<Info>,
91
92 local_supported_protocols: SupportedProtocols,
93 remote_supported_protocols: HashSet<StreamProtocol>,
94 external_addresses: HashSet<Multiaddr>,
95}
96
97#[derive(Debug)]
99pub enum InEvent {
100 AddressesChanged(HashSet<Multiaddr>),
101 Push,
102}
103
104#[derive(Debug)]
106#[allow(clippy::large_enum_variant)]
107pub enum Event {
108 Identified(Info),
110 Identification,
112 IdentificationPushed,
114 IdentificationError(StreamUpgradeError<UpgradeError>),
116}
117
118impl Handler {
119 #[allow(clippy::too_many_arguments)]
121 pub fn new(
122 initial_delay: Duration,
123 interval: Duration,
124 remote_peer_id: PeerId,
125 public_key: PublicKey,
126 protocol_version: String,
127 agent_version: String,
128 observed_addr: Multiaddr,
129 external_addresses: HashSet<Multiaddr>,
130 ) -> Self {
131 Self {
132 remote_peer_id,
133 events: SmallVec::new(),
134 active_streams: futures_bounded::FuturesSet::new(
135 STREAM_TIMEOUT,
136 MAX_CONCURRENT_STREAMS_PER_CONNECTION,
137 ),
138 trigger_next_identify: Delay::new(initial_delay),
139 exchanged_one_periodic_identify: false,
140 interval,
141 public_key,
142 protocol_version,
143 agent_version,
144 observed_addr,
145 local_supported_protocols: SupportedProtocols::default(),
146 remote_supported_protocols: HashSet::default(),
147 remote_info: Default::default(),
148 external_addresses,
149 }
150 }
151
152 fn on_fully_negotiated_inbound(
153 &mut self,
154 FullyNegotiatedInbound {
155 protocol: output, ..
156 }: FullyNegotiatedInbound<
157 <Self as ConnectionHandler>::InboundProtocol,
158 <Self as ConnectionHandler>::InboundOpenInfo,
159 >,
160 ) {
161 match output {
162 future::Either::Left(stream) => {
163 let info = self.build_info();
164
165 if self
166 .active_streams
167 .try_push(
168 protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify),
169 )
170 .is_err()
171 {
172 warn!("Dropping inbound stream because we are at capacity");
173 } else {
174 self.exchanged_one_periodic_identify = true;
175 }
176 }
177 future::Either::Right(stream) => {
178 if self
179 .active_streams
180 .try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush))
181 .is_err()
182 {
183 warn!("Dropping inbound identify push stream because we are at capacity");
184 }
185 }
186 }
187 }
188
189 fn on_fully_negotiated_outbound(
190 &mut self,
191 FullyNegotiatedOutbound {
192 protocol: output, ..
193 }: FullyNegotiatedOutbound<
194 <Self as ConnectionHandler>::OutboundProtocol,
195 <Self as ConnectionHandler>::OutboundOpenInfo,
196 >,
197 ) {
198 match output {
199 future::Either::Left(stream) => {
200 if self
201 .active_streams
202 .try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify))
203 .is_err()
204 {
205 warn!("Dropping outbound identify stream because we are at capacity");
206 }
207 }
208 future::Either::Right(stream) => {
209 let info = self.build_info();
210
211 if self
212 .active_streams
213 .try_push(
214 protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentifyPush),
215 )
216 .is_err()
217 {
218 warn!("Dropping outbound identify push stream because we are at capacity");
219 }
220 }
221 }
222 }
223
224 fn build_info(&mut self) -> Info {
225 Info {
226 public_key: self.public_key.clone(),
227 protocol_version: self.protocol_version.clone(),
228 agent_version: self.agent_version.clone(),
229 listen_addrs: Vec::from_iter(self.external_addresses.iter().cloned()),
230 protocols: Vec::from_iter(self.local_supported_protocols.iter().cloned()),
231 observed_addr: self.observed_addr.clone(),
232 }
233 }
234
235 fn handle_incoming_info(&mut self, info: &Info) {
236 self.remote_info.replace(info.clone());
237
238 self.update_supported_protocols_for_remote(info);
239 }
240
241 fn update_supported_protocols_for_remote(&mut self, remote_info: &Info) {
242 let new_remote_protocols = HashSet::from_iter(remote_info.protocols.clone());
243
244 let remote_added_protocols = new_remote_protocols
245 .difference(&self.remote_supported_protocols)
246 .cloned()
247 .collect::<HashSet<_>>();
248 let remote_removed_protocols = self
249 .remote_supported_protocols
250 .difference(&new_remote_protocols)
251 .cloned()
252 .collect::<HashSet<_>>();
253
254 if !remote_added_protocols.is_empty() {
255 self.events
256 .push(ConnectionHandlerEvent::ReportRemoteProtocols(
257 ProtocolSupport::Added(remote_added_protocols),
258 ));
259 }
260
261 if !remote_removed_protocols.is_empty() {
262 self.events
263 .push(ConnectionHandlerEvent::ReportRemoteProtocols(
264 ProtocolSupport::Removed(remote_removed_protocols),
265 ));
266 }
267
268 self.remote_supported_protocols = new_remote_protocols;
269 }
270
271 fn local_protocols_to_string(&mut self) -> String {
272 self.local_supported_protocols
273 .iter()
274 .map(|p| p.to_string())
275 .collect::<Vec<_>>()
276 .join(", ")
277 }
278}
279
280impl ConnectionHandler for Handler {
281 type FromBehaviour = InEvent;
282 type ToBehaviour = Event;
283 type Error = io::Error;
284 type InboundProtocol =
285 SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
286 type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
287 type OutboundOpenInfo = ();
288 type InboundOpenInfo = ();
289
290 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
291 SubstreamProtocol::new(
292 SelectUpgrade::new(
293 ReadyUpgrade::new(PROTOCOL_NAME),
294 ReadyUpgrade::new(PUSH_PROTOCOL_NAME),
295 ),
296 (),
297 )
298 }
299
300 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
301 match event {
302 InEvent::AddressesChanged(addresses) => {
303 self.external_addresses = addresses;
304 }
305 InEvent::Push => {
306 self.events
307 .push(ConnectionHandlerEvent::OutboundSubstreamRequest {
308 protocol: SubstreamProtocol::new(
309 Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
310 (),
311 ),
312 });
313 }
314 }
315 }
316
317 fn connection_keep_alive(&self) -> KeepAlive {
318 if !self.active_streams.is_empty() {
319 return KeepAlive::Yes;
320 }
321
322 KeepAlive::No
323 }
324
325 fn poll(
326 &mut self,
327 cx: &mut Context<'_>,
328 ) -> Poll<
329 ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event, Self::Error>,
330 > {
331 if let Some(event) = self.events.pop() {
332 return Poll::Ready(event);
333 }
334
335 if let Poll::Ready(()) = self.trigger_next_identify.poll_unpin(cx) {
337 self.trigger_next_identify.reset(self.interval);
338 let event = ConnectionHandlerEvent::OutboundSubstreamRequest {
339 protocol: SubstreamProtocol::new(
340 Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)),
341 (),
342 ),
343 };
344 return Poll::Ready(event);
345 }
346
347 match self.active_streams.poll_unpin(cx) {
348 Poll::Ready(Ok(Ok(Success::ReceivedIdentify(remote_info)))) => {
349 self.handle_incoming_info(&remote_info);
350
351 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
352 remote_info,
353 )));
354 }
355 Poll::Ready(Ok(Ok(Success::SentIdentifyPush))) => {
356 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
357 Event::IdentificationPushed,
358 ));
359 }
360 Poll::Ready(Ok(Ok(Success::SentIdentify))) => {
361 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
362 Event::Identification,
363 ));
364 }
365 Poll::Ready(Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info)))) => {
366 if let Some(mut info) = self.remote_info.clone() {
367 info.merge(remote_push_info);
368 self.handle_incoming_info(&info);
369
370 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
371 Event::Identified(info),
372 ));
373 };
374 }
375 Poll::Ready(Ok(Err(e))) => {
376 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
377 Event::IdentificationError(StreamUpgradeError::Apply(e)),
378 ));
379 }
380 Poll::Ready(Err(Timeout { .. })) => {
381 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
382 Event::IdentificationError(StreamUpgradeError::Timeout),
383 ));
384 }
385 Poll::Pending => {}
386 }
387
388 Poll::Pending
389 }
390
391 fn on_connection_event(
392 &mut self,
393 event: ConnectionEvent<
394 Self::InboundProtocol,
395 Self::OutboundProtocol,
396 Self::InboundOpenInfo,
397 Self::OutboundOpenInfo,
398 >,
399 ) {
400 match event {
401 ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
402 self.on_fully_negotiated_inbound(fully_negotiated_inbound)
403 }
404 ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
405 self.on_fully_negotiated_outbound(fully_negotiated_outbound)
406 }
407 ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
408 self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
409 Event::IdentificationError(
410 error.map_upgrade_err(|e| void::unreachable(e.into_inner())),
411 ),
412 ));
413 self.trigger_next_identify.reset(self.interval);
414 }
415 ConnectionEvent::AddressChange(_)
416 | ConnectionEvent::ListenUpgradeError(_)
417 | ConnectionEvent::RemoteProtocolsChange(_) => {}
418 ConnectionEvent::LocalProtocolsChange(change) => {
419 let before = log::log_enabled!(Level::Debug)
420 .then(|| self.local_protocols_to_string())
421 .unwrap_or_default();
422 let protocols_changed = self.local_supported_protocols.on_protocols_change(change);
423 let after = log::log_enabled!(Level::Debug)
424 .then(|| self.local_protocols_to_string())
425 .unwrap_or_default();
426
427 if protocols_changed && self.exchanged_one_periodic_identify {
428 log::debug!(
429 "Supported listen protocols changed from [{before}] to [{after}], pushing to {}",
430 self.remote_peer_id
431 );
432
433 self.events
434 .push(ConnectionHandlerEvent::OutboundSubstreamRequest {
435 protocol: SubstreamProtocol::new(
436 Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
437 (),
438 ),
439 });
440 }
441 }
442 }
443 }
444}
445
446enum Success {
447 SentIdentify,
448 ReceivedIdentify(Info),
449 SentIdentifyPush,
450 ReceivedIdentifyPush(PushInfo),
451}