1pub mod either;
42mod map_in;
43mod map_out;
44pub mod multi;
45mod one_shot;
46mod pending;
47mod select;
48
49pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
50pub use map_in::MapInEvent;
51pub use map_out::MapOutEvent;
52pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
53pub use pending::PendingConnectionHandler;
54pub use select::ConnectionHandlerSelect;
55
56use crate::StreamProtocol;
57use ::either::Either;
58use instant::Instant;
59use libp2p_core::Multiaddr;
60use once_cell::sync::Lazy;
61use smallvec::SmallVec;
62use std::collections::hash_map::RandomState;
63use std::collections::hash_set::{Difference, Intersection};
64use std::collections::HashSet;
65use std::iter::Peekable;
66use std::{cmp::Ordering, error, fmt, io, task::Context, task::Poll, time::Duration};
67
68pub trait ConnectionHandler: Send + 'static {
102 type FromBehaviour: fmt::Debug + Send + 'static;
104 type ToBehaviour: fmt::Debug + Send + 'static;
106 #[deprecated(
108 note = "Will be removed together with `ConnectionHandlerEvent::Close`. See <https://github.com/libp2p/rust-libp2p/issues/3591> for details."
109 )]
110 type Error: error::Error + fmt::Debug + Send + 'static;
111 type InboundProtocol: InboundUpgradeSend;
113 type OutboundProtocol: OutboundUpgradeSend;
115 type InboundOpenInfo: Send + 'static;
117 type OutboundOpenInfo: Send + 'static;
119
120 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>;
128
129 fn connection_keep_alive(&self) -> KeepAlive;
150
151 #[allow(deprecated)]
153 fn poll(
154 &mut self,
155 cx: &mut Context<'_>,
156 ) -> Poll<
157 ConnectionHandlerEvent<
158 Self::OutboundProtocol,
159 Self::OutboundOpenInfo,
160 Self::ToBehaviour,
161 Self::Error,
162 >,
163 >;
164
165 fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
167 where
168 Self: Sized,
169 TMap: Fn(&TNewIn) -> Option<&Self::FromBehaviour>,
170 {
171 MapInEvent::new(self, map)
172 }
173
174 fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
176 where
177 Self: Sized,
178 TMap: FnMut(Self::ToBehaviour) -> TNewOut,
179 {
180 MapOutEvent::new(self, map)
181 }
182
183 fn select<TProto2>(self, other: TProto2) -> ConnectionHandlerSelect<Self, TProto2>
190 where
191 Self: Sized,
192 {
193 ConnectionHandlerSelect::new(self, other)
194 }
195
196 fn on_behaviour_event(&mut self, _event: Self::FromBehaviour);
198
199 fn on_connection_event(
200 &mut self,
201 event: ConnectionEvent<
202 Self::InboundProtocol,
203 Self::OutboundProtocol,
204 Self::InboundOpenInfo,
205 Self::OutboundOpenInfo,
206 >,
207 );
208}
209
210pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> {
213 FullyNegotiatedInbound(FullyNegotiatedInbound<IP, IOI>),
215 FullyNegotiatedOutbound(FullyNegotiatedOutbound<OP, OOI>),
217 AddressChange(AddressChange<'a>),
219 DialUpgradeError(DialUpgradeError<OOI, OP>),
221 ListenUpgradeError(ListenUpgradeError<IOI, IP>),
223 LocalProtocolsChange(ProtocolsChange<'a>),
225 RemoteProtocolsChange(ProtocolsChange<'a>),
227}
228
229impl<'a, IP, OP, IOI, OOI> fmt::Debug for ConnectionEvent<'a, IP, OP, IOI, OOI>
230where
231 IP: InboundUpgradeSend + fmt::Debug,
232 IP::Output: fmt::Debug,
233 IP::Error: fmt::Debug,
234 OP: OutboundUpgradeSend + fmt::Debug,
235 OP::Output: fmt::Debug,
236 OP::Error: fmt::Debug,
237 IOI: fmt::Debug,
238 OOI: fmt::Debug,
239{
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 match self {
242 ConnectionEvent::FullyNegotiatedInbound(v) => {
243 f.debug_tuple("FullyNegotiatedInbound").field(v).finish()
244 }
245 ConnectionEvent::FullyNegotiatedOutbound(v) => {
246 f.debug_tuple("FullyNegotiatedOutbound").field(v).finish()
247 }
248 ConnectionEvent::AddressChange(v) => f.debug_tuple("AddressChange").field(v).finish(),
249 ConnectionEvent::DialUpgradeError(v) => {
250 f.debug_tuple("DialUpgradeError").field(v).finish()
251 }
252 ConnectionEvent::ListenUpgradeError(v) => {
253 f.debug_tuple("ListenUpgradeError").field(v).finish()
254 }
255 ConnectionEvent::LocalProtocolsChange(v) => {
256 f.debug_tuple("LocalProtocolsChange").field(v).finish()
257 }
258 ConnectionEvent::RemoteProtocolsChange(v) => {
259 f.debug_tuple("RemoteProtocolsChange").field(v).finish()
260 }
261 }
262 }
263}
264
265impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI>
266 ConnectionEvent<'a, IP, OP, IOI, OOI>
267{
268 pub fn is_outbound(&self) -> bool {
270 match self {
271 ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => {
272 true
273 }
274 ConnectionEvent::FullyNegotiatedInbound(_)
275 | ConnectionEvent::AddressChange(_)
276 | ConnectionEvent::LocalProtocolsChange(_)
277 | ConnectionEvent::RemoteProtocolsChange(_)
278 | ConnectionEvent::ListenUpgradeError(_) => false,
279 }
280 }
281
282 pub fn is_inbound(&self) -> bool {
284 match self {
285 ConnectionEvent::FullyNegotiatedInbound(_) | ConnectionEvent::ListenUpgradeError(_) => {
286 true
287 }
288 ConnectionEvent::FullyNegotiatedOutbound(_)
289 | ConnectionEvent::AddressChange(_)
290 | ConnectionEvent::LocalProtocolsChange(_)
291 | ConnectionEvent::RemoteProtocolsChange(_)
292 | ConnectionEvent::DialUpgradeError(_) => false,
293 }
294 }
295}
296
297#[derive(Debug)]
306pub struct FullyNegotiatedInbound<IP: InboundUpgradeSend, IOI> {
307 pub protocol: IP::Output,
308 pub info: IOI,
309}
310
311#[derive(Debug)]
316pub struct FullyNegotiatedOutbound<OP: OutboundUpgradeSend, OOI> {
317 pub protocol: OP::Output,
318 pub info: OOI,
319}
320
321#[derive(Debug)]
323pub struct AddressChange<'a> {
324 pub new_address: &'a Multiaddr,
325}
326
327#[derive(Debug, Clone)]
329pub enum ProtocolsChange<'a> {
330 Added(ProtocolsAdded<'a>),
331 Removed(ProtocolsRemoved<'a>),
332}
333
334impl<'a> ProtocolsChange<'a> {
335 pub(crate) fn add(
339 existing_protocols: &'a HashSet<StreamProtocol>,
340 to_add: &'a HashSet<StreamProtocol>,
341 ) -> Option<Self> {
342 let mut actually_added_protocols = to_add.difference(existing_protocols).peekable();
343
344 actually_added_protocols.peek()?;
345
346 Some(ProtocolsChange::Added(ProtocolsAdded {
347 protocols: actually_added_protocols,
348 }))
349 }
350
351 pub(crate) fn remove(
355 existing_protocols: &'a HashSet<StreamProtocol>,
356 to_remove: &'a HashSet<StreamProtocol>,
357 ) -> Option<Self> {
358 let mut actually_removed_protocols = existing_protocols.intersection(to_remove).peekable();
359
360 actually_removed_protocols.peek()?;
361
362 Some(ProtocolsChange::Removed(ProtocolsRemoved {
363 protocols: Either::Right(actually_removed_protocols),
364 }))
365 }
366
367 pub(crate) fn from_full_sets(
369 existing_protocols: &'a HashSet<StreamProtocol>,
370 new_protocols: &'a HashSet<StreamProtocol>,
371 ) -> SmallVec<[Self; 2]> {
372 if existing_protocols == new_protocols {
373 return SmallVec::new();
374 }
375
376 let mut changes = SmallVec::new();
377
378 let mut added_protocols = new_protocols.difference(existing_protocols).peekable();
379 let mut removed_protocols = existing_protocols.difference(new_protocols).peekable();
380
381 if added_protocols.peek().is_some() {
382 changes.push(ProtocolsChange::Added(ProtocolsAdded {
383 protocols: added_protocols,
384 }));
385 }
386
387 if removed_protocols.peek().is_some() {
388 changes.push(ProtocolsChange::Removed(ProtocolsRemoved {
389 protocols: Either::Left(removed_protocols),
390 }));
391 }
392
393 changes
394 }
395}
396
397#[derive(Debug, Clone)]
399pub struct ProtocolsAdded<'a> {
400 protocols: Peekable<Difference<'a, StreamProtocol, RandomState>>,
401}
402
403impl<'a> ProtocolsAdded<'a> {
404 pub(crate) fn from_set(protocols: &'a HashSet<StreamProtocol, RandomState>) -> Self {
405 ProtocolsAdded {
406 protocols: protocols.difference(&EMPTY_HASHSET).peekable(),
407 }
408 }
409}
410
411#[derive(Debug, Clone)]
413pub struct ProtocolsRemoved<'a> {
414 protocols: Either<
415 Peekable<Difference<'a, StreamProtocol, RandomState>>,
416 Peekable<Intersection<'a, StreamProtocol, RandomState>>,
417 >,
418}
419
420impl<'a> ProtocolsRemoved<'a> {
421 #[cfg(test)]
422 pub(crate) fn from_set(protocols: &'a HashSet<StreamProtocol, RandomState>) -> Self {
423 ProtocolsRemoved {
424 protocols: Either::Left(protocols.difference(&EMPTY_HASHSET).peekable()),
425 }
426 }
427}
428
429impl<'a> Iterator for ProtocolsAdded<'a> {
430 type Item = &'a StreamProtocol;
431 fn next(&mut self) -> Option<Self::Item> {
432 self.protocols.next()
433 }
434}
435
436impl<'a> Iterator for ProtocolsRemoved<'a> {
437 type Item = &'a StreamProtocol;
438 fn next(&mut self) -> Option<Self::Item> {
439 self.protocols.next()
440 }
441}
442
443#[derive(Debug)]
446pub struct DialUpgradeError<OOI, OP: OutboundUpgradeSend> {
447 pub info: OOI,
448 pub error: StreamUpgradeError<OP::Error>,
449}
450
451#[derive(Debug)]
454pub struct ListenUpgradeError<IOI, IP: InboundUpgradeSend> {
455 pub info: IOI,
456 pub error: IP::Error,
457}
458
459#[derive(Copy, Clone, Debug, PartialEq, Eq)]
465pub struct SubstreamProtocol<TUpgrade, TInfo> {
466 upgrade: TUpgrade,
467 info: TInfo,
468 timeout: Duration,
469}
470
471impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
472 pub fn new(upgrade: TUpgrade, info: TInfo) -> Self {
477 SubstreamProtocol {
478 upgrade,
479 info,
480 timeout: Duration::from_secs(10),
481 }
482 }
483
484 pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TInfo>
486 where
487 F: FnOnce(TUpgrade) -> U,
488 {
489 SubstreamProtocol {
490 upgrade: f(self.upgrade),
491 info: self.info,
492 timeout: self.timeout,
493 }
494 }
495
496 pub fn map_info<U, F>(self, f: F) -> SubstreamProtocol<TUpgrade, U>
498 where
499 F: FnOnce(TInfo) -> U,
500 {
501 SubstreamProtocol {
502 upgrade: self.upgrade,
503 info: f(self.info),
504 timeout: self.timeout,
505 }
506 }
507
508 pub fn with_timeout(mut self, timeout: Duration) -> Self {
510 self.timeout = timeout;
511 self
512 }
513
514 pub fn upgrade(&self) -> &TUpgrade {
516 &self.upgrade
517 }
518
519 pub fn info(&self) -> &TInfo {
521 &self.info
522 }
523
524 pub fn timeout(&self) -> &Duration {
526 &self.timeout
527 }
528
529 pub fn into_upgrade(self) -> (TUpgrade, TInfo) {
531 (self.upgrade, self.info)
532 }
533}
534
535#[derive(Debug, Clone, PartialEq, Eq)]
537pub enum ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr> {
538 OutboundSubstreamRequest {
540 protocol: SubstreamProtocol<TConnectionUpgrade, TOutboundOpenInfo>,
542 },
543
544 #[deprecated(
553 note = "To close a connection, use `ToSwarm::CloseConnection` or `Swarm::close_connection`. See <https://github.com/libp2p/rust-libp2p/issues/3591> for more details."
554 )]
555 Close(TErr),
556 ReportRemoteProtocols(ProtocolSupport),
558
559 NotifyBehaviour(TCustom),
561}
562
563#[derive(Debug, Clone, PartialEq, Eq)]
564pub enum ProtocolSupport {
565 Added(HashSet<StreamProtocol>),
567 Removed(HashSet<StreamProtocol>),
569}
570
571impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
573 ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
574{
575 pub fn map_outbound_open_info<F, I>(
578 self,
579 map: F,
580 ) -> ConnectionHandlerEvent<TConnectionUpgrade, I, TCustom, TErr>
581 where
582 F: FnOnce(TOutboundOpenInfo) -> I,
583 {
584 match self {
585 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
586 ConnectionHandlerEvent::OutboundSubstreamRequest {
587 protocol: protocol.map_info(map),
588 }
589 }
590 ConnectionHandlerEvent::NotifyBehaviour(val) => {
591 ConnectionHandlerEvent::NotifyBehaviour(val)
592 }
593 #[allow(deprecated)]
594 ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
595 ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
596 ConnectionHandlerEvent::ReportRemoteProtocols(support)
597 }
598 }
599 }
600
601 pub fn map_protocol<F, I>(
604 self,
605 map: F,
606 ) -> ConnectionHandlerEvent<I, TOutboundOpenInfo, TCustom, TErr>
607 where
608 F: FnOnce(TConnectionUpgrade) -> I,
609 {
610 match self {
611 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
612 ConnectionHandlerEvent::OutboundSubstreamRequest {
613 protocol: protocol.map_upgrade(map),
614 }
615 }
616 ConnectionHandlerEvent::NotifyBehaviour(val) => {
617 ConnectionHandlerEvent::NotifyBehaviour(val)
618 }
619 #[allow(deprecated)]
620 ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
621 ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
622 ConnectionHandlerEvent::ReportRemoteProtocols(support)
623 }
624 }
625 }
626
627 pub fn map_custom<F, I>(
629 self,
630 map: F,
631 ) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, I, TErr>
632 where
633 F: FnOnce(TCustom) -> I,
634 {
635 match self {
636 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
637 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
638 }
639 ConnectionHandlerEvent::NotifyBehaviour(val) => {
640 ConnectionHandlerEvent::NotifyBehaviour(map(val))
641 }
642 #[allow(deprecated)]
643 ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
644 ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
645 ConnectionHandlerEvent::ReportRemoteProtocols(support)
646 }
647 }
648 }
649
650 pub fn map_close<F, I>(
652 self,
653 map: F,
654 ) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, I>
655 where
656 F: FnOnce(TErr) -> I,
657 {
658 match self {
659 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
660 ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
661 }
662 ConnectionHandlerEvent::NotifyBehaviour(val) => {
663 ConnectionHandlerEvent::NotifyBehaviour(val)
664 }
665 #[allow(deprecated)]
666 ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)),
667 ConnectionHandlerEvent::ReportRemoteProtocols(support) => {
668 ConnectionHandlerEvent::ReportRemoteProtocols(support)
669 }
670 }
671 }
672}
673
674#[deprecated(note = "Renamed to `StreamUpgradeError`")]
675pub type ConnectionHandlerUpgrErr<TUpgrErr> = StreamUpgradeError<TUpgrErr>;
676
677#[derive(Debug)]
679pub enum StreamUpgradeError<TUpgrErr> {
680 Timeout,
682 Apply(TUpgrErr),
684 NegotiationFailed,
686 Io(io::Error),
688}
689
690impl<TUpgrErr> StreamUpgradeError<TUpgrErr> {
691 pub fn map_upgrade_err<F, E>(self, f: F) -> StreamUpgradeError<E>
693 where
694 F: FnOnce(TUpgrErr) -> E,
695 {
696 match self {
697 StreamUpgradeError::Timeout => StreamUpgradeError::Timeout,
698 StreamUpgradeError::Apply(e) => StreamUpgradeError::Apply(f(e)),
699 StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed,
700 StreamUpgradeError::Io(e) => StreamUpgradeError::Io(e),
701 }
702 }
703}
704
705impl<TUpgrErr> fmt::Display for StreamUpgradeError<TUpgrErr>
706where
707 TUpgrErr: error::Error + 'static,
708{
709 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
710 match self {
711 StreamUpgradeError::Timeout => {
712 write!(f, "Timeout error while opening a substream")
713 }
714 StreamUpgradeError::Apply(err) => {
715 write!(f, "Apply: ")?;
716 crate::print_error_chain(f, err)
717 }
718 StreamUpgradeError::NegotiationFailed => {
719 write!(f, "no protocols could be agreed upon")
720 }
721 StreamUpgradeError::Io(e) => {
722 write!(f, "IO error: ")?;
723 crate::print_error_chain(f, e)
724 }
725 }
726 }
727}
728
729impl<TUpgrErr> error::Error for StreamUpgradeError<TUpgrErr>
730where
731 TUpgrErr: error::Error + 'static,
732{
733 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
734 None
735 }
736}
737
738#[derive(Debug, Copy, Clone, PartialEq, Eq)]
740pub enum KeepAlive {
741 #[deprecated(
743 note = "Use `swarm::Config::with_idle_connection_timeout` instead. See <https://github.com/libp2p/rust-libp2p/issues/3844> for details."
744 )]
745 Until(Instant),
746 Yes,
748 No,
750}
751
752impl KeepAlive {
753 pub fn is_yes(&self) -> bool {
755 matches!(*self, KeepAlive::Yes)
756 }
757}
758
759impl PartialOrd for KeepAlive {
760 fn partial_cmp(&self, other: &KeepAlive) -> Option<Ordering> {
761 Some(self.cmp(other))
762 }
763}
764
765#[allow(deprecated)]
766impl Ord for KeepAlive {
767 fn cmp(&self, other: &KeepAlive) -> Ordering {
768 use self::KeepAlive::*;
769
770 match (self, other) {
771 (No, No) | (Yes, Yes) => Ordering::Equal,
772 (No, _) | (_, Yes) => Ordering::Less,
773 (_, No) | (Yes, _) => Ordering::Greater,
774 (Until(t1), Until(t2)) => t1.cmp(t2),
775 }
776 }
777}
778
779#[cfg(test)]
780impl quickcheck::Arbitrary for KeepAlive {
781 fn arbitrary(g: &mut quickcheck::Gen) -> Self {
782 match quickcheck::GenRange::gen_range(g, 1u8..4) {
783 1 =>
784 {
785 #[allow(deprecated)]
786 KeepAlive::Until(
787 Instant::now()
788 .checked_add(Duration::arbitrary(g))
789 .unwrap_or(Instant::now()),
790 )
791 }
792 2 => KeepAlive::Yes,
793 3 => KeepAlive::No,
794 _ => unreachable!(),
795 }
796 }
797}
798
799static EMPTY_HASHSET: Lazy<HashSet<StreamProtocol>> = Lazy::new(HashSet::new);