1use crate::connection::{Connection, ConnectionId, PendingPoint};
22use crate::{
23 connection::{
24 Connected, ConnectionError, IncomingInfo, PendingConnectionError,
25 PendingInboundConnectionError, PendingOutboundConnectionError,
26 },
27 transport::TransportError,
28 ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
29};
30use concurrent_dial::ConcurrentDial;
31use fnv::FnvHashMap;
32use futures::prelude::*;
33use futures::stream::SelectAll;
34use futures::{
35 channel::{mpsc, oneshot},
36 future::{poll_fn, BoxFuture, Either},
37 ready,
38 stream::FuturesUnordered,
39};
40use libp2p_core::connection::Endpoint;
41use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
42use libp2p_core::transport::PortUse;
43use std::task::Waker;
44use std::{
45 collections::HashMap,
46 fmt,
47 num::{NonZeroU8, NonZeroUsize},
48 pin::Pin,
49 task::Context,
50 task::Poll,
51};
52use tracing::Instrument;
53use void::Void;
54use web_time::{Duration, Instant};
55
56mod concurrent_dial;
57mod task;
58
59enum ExecSwitch {
60 Executor(Box<dyn Executor + Send>),
61 LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
62}
63
64impl ExecSwitch {
65 fn advance_local(&mut self, cx: &mut Context) {
66 match self {
67 ExecSwitch::Executor(_) => {}
68 ExecSwitch::LocalSpawn(local) => {
69 while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
70 }
71 }
72 }
73
74 #[track_caller]
75 fn spawn(&mut self, task: impl Future<Output = ()> + Send + 'static) {
76 let task = task.boxed();
77
78 match self {
79 Self::Executor(executor) => executor.exec(task),
80 Self::LocalSpawn(local) => local.push(task),
81 }
82 }
83}
84
85pub(crate) struct Pool<THandler>
87where
88 THandler: ConnectionHandler,
89{
90 local_id: PeerId,
91
92 counters: ConnectionCounters,
94
95 established: FnvHashMap<
97 PeerId,
98 FnvHashMap<ConnectionId, EstablishedConnection<THandler::FromBehaviour>>,
99 >,
100
101 pending: HashMap<ConnectionId, PendingConnection>,
103
104 task_command_buffer_size: usize,
106
107 dial_concurrency_factor: NonZeroU8,
109
110 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
112
113 max_negotiating_inbound_streams: usize,
117
118 per_connection_event_buffer_size: usize,
120
121 executor: ExecSwitch,
124
125 pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
128
129 pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
131
132 no_established_connections_waker: Option<Waker>,
134
135 established_connection_events:
137 SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler::ToBehaviour>>>,
138
139 new_connection_dropped_listeners: FuturesUnordered<oneshot::Receiver<StreamMuxerBox>>,
141
142 idle_connection_timeout: Duration,
144}
145
146#[derive(Debug)]
147pub(crate) struct EstablishedConnection<TInEvent> {
148 endpoint: ConnectedPoint,
149 sender: mpsc::Sender<task::Command<TInEvent>>,
151}
152
153impl<TInEvent> EstablishedConnection<TInEvent> {
154 pub(crate) fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
165 let cmd = task::Command::NotifyHandler(event);
166 self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
167 task::Command::NotifyHandler(event) => event,
168 _ => unreachable!("Expect failed send to return initial event."),
169 })
170 }
171
172 pub(crate) fn poll_ready_notify_handler(
179 &mut self,
180 cx: &mut Context<'_>,
181 ) -> Poll<Result<(), ()>> {
182 self.sender.poll_ready(cx).map_err(|_| ())
183 }
184
185 pub(crate) fn start_close(&mut self) {
189 match self.sender.clone().try_send(task::Command::Close) {
192 Ok(()) => {}
193 Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
194 };
195 }
196}
197
198struct PendingConnection {
199 peer_id: Option<PeerId>,
201 endpoint: PendingPoint,
202 abort_notifier: Option<oneshot::Sender<Void>>,
204 accepted_at: Instant,
206}
207
208impl PendingConnection {
209 fn is_for_same_remote_as(&self, other: PeerId) -> bool {
210 self.peer_id.map_or(false, |peer| peer == other)
211 }
212
213 fn abort(&mut self) {
215 if let Some(notifier) = self.abort_notifier.take() {
216 drop(notifier);
217 }
218 }
219}
220
221impl<THandler: ConnectionHandler> fmt::Debug for Pool<THandler> {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
223 f.debug_struct("Pool")
224 .field("counters", &self.counters)
225 .finish()
226 }
227}
228
229#[derive(Debug)]
231pub(crate) enum PoolEvent<ToBehaviour> {
232 ConnectionEstablished {
234 id: ConnectionId,
235 peer_id: PeerId,
236 endpoint: ConnectedPoint,
237 connection: NewConnection,
238 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
242 established_in: std::time::Duration,
244 },
245
246 ConnectionClosed {
258 id: ConnectionId,
259 connected: Connected,
261 error: Option<ConnectionError>,
264 remaining_established_connection_ids: Vec<ConnectionId>,
266 },
267
268 PendingOutboundConnectionError {
270 id: ConnectionId,
272 error: PendingOutboundConnectionError,
274 peer: Option<PeerId>,
276 },
277
278 PendingInboundConnectionError {
280 id: ConnectionId,
282 send_back_addr: Multiaddr,
284 local_addr: Multiaddr,
286 error: PendingInboundConnectionError,
288 },
289
290 ConnectionEvent {
292 id: ConnectionId,
293 peer_id: PeerId,
294 event: ToBehaviour,
296 },
297
298 AddressChange {
300 id: ConnectionId,
301 peer_id: PeerId,
302 new_endpoint: ConnectedPoint,
304 old_endpoint: ConnectedPoint,
306 },
307}
308
309impl<THandler> Pool<THandler>
310where
311 THandler: ConnectionHandler,
312{
313 pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
315 let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
316 let executor = match config.executor {
317 Some(exec) => ExecSwitch::Executor(exec),
318 None => ExecSwitch::LocalSpawn(Default::default()),
319 };
320 Pool {
321 local_id,
322 counters: ConnectionCounters::new(),
323 established: Default::default(),
324 pending: Default::default(),
325 task_command_buffer_size: config.task_command_buffer_size,
326 dial_concurrency_factor: config.dial_concurrency_factor,
327 substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
328 max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
329 per_connection_event_buffer_size: config.per_connection_event_buffer_size,
330 idle_connection_timeout: config.idle_connection_timeout,
331 executor,
332 pending_connection_events_tx,
333 pending_connection_events_rx,
334 no_established_connections_waker: None,
335 established_connection_events: Default::default(),
336 new_connection_dropped_listeners: Default::default(),
337 }
338 }
339
340 pub(crate) fn counters(&self) -> &ConnectionCounters {
342 &self.counters
343 }
344
345 pub(crate) fn get_established(
347 &mut self,
348 id: ConnectionId,
349 ) -> Option<&mut EstablishedConnection<THandler::FromBehaviour>> {
350 self.established
351 .values_mut()
352 .find_map(|connections| connections.get_mut(&id))
353 }
354
355 pub(crate) fn is_connected(&self, id: PeerId) -> bool {
359 self.established.contains_key(&id)
360 }
361
362 pub(crate) fn num_peers(&self) -> usize {
365 self.established.len()
366 }
367
368 pub(crate) fn disconnect(&mut self, peer: PeerId) {
374 if let Some(conns) = self.established.get_mut(&peer) {
375 for (_, conn) in conns.iter_mut() {
376 conn.start_close();
377 }
378 }
379
380 for connection in self
381 .pending
382 .iter_mut()
383 .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
384 {
385 connection.abort()
386 }
387 }
388
389 pub(crate) fn iter_established_connections_of_peer(
391 &mut self,
392 peer: &PeerId,
393 ) -> impl Iterator<Item = ConnectionId> + '_ {
394 match self.established.get(peer) {
395 Some(conns) => either::Either::Left(conns.iter().map(|(id, _)| *id)),
396 None => either::Either::Right(std::iter::empty()),
397 }
398 }
399
400 pub(crate) fn is_dialing(&self, peer: PeerId) -> bool {
402 self.pending.iter().any(|(_, info)| {
403 matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
404 })
405 }
406
407 pub(crate) fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
410 self.established.keys()
411 }
412
413 pub(crate) fn add_outgoing(
416 &mut self,
417 dials: Vec<
418 BoxFuture<
419 'static,
420 (
421 Multiaddr,
422 Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
423 ),
424 >,
425 >,
426 peer: Option<PeerId>,
427 role_override: Endpoint,
428 port_use: PortUse,
429 dial_concurrency_factor_override: Option<NonZeroU8>,
430 connection_id: ConnectionId,
431 ) {
432 let concurrency_factor =
433 dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor);
434 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_outgoing_connection", %concurrency_factor, num_dials=%dials.len(), id = %connection_id);
435 span.follows_from(tracing::Span::current());
436
437 let (abort_notifier, abort_receiver) = oneshot::channel();
438
439 self.executor.spawn(
440 task::new_for_pending_outgoing_connection(
441 connection_id,
442 ConcurrentDial::new(dials, concurrency_factor),
443 abort_receiver,
444 self.pending_connection_events_tx.clone(),
445 )
446 .instrument(span),
447 );
448
449 let endpoint = PendingPoint::Dialer {
450 role_override,
451 port_use,
452 };
453
454 self.counters.inc_pending(&endpoint);
455 self.pending.insert(
456 connection_id,
457 PendingConnection {
458 peer_id: peer,
459 endpoint,
460 abort_notifier: Some(abort_notifier),
461 accepted_at: Instant::now(),
462 },
463 );
464 }
465
466 pub(crate) fn add_incoming<TFut>(
469 &mut self,
470 future: TFut,
471 info: IncomingInfo<'_>,
472 connection_id: ConnectionId,
473 ) where
474 TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
475 {
476 let endpoint = info.create_connected_point();
477
478 let (abort_notifier, abort_receiver) = oneshot::channel();
479
480 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_incoming_connection", remote_addr = %info.send_back_addr, id = %connection_id);
481 span.follows_from(tracing::Span::current());
482
483 self.executor.spawn(
484 task::new_for_pending_incoming_connection(
485 connection_id,
486 future,
487 abort_receiver,
488 self.pending_connection_events_tx.clone(),
489 )
490 .instrument(span),
491 );
492
493 self.counters.inc_pending_incoming();
494 self.pending.insert(
495 connection_id,
496 PendingConnection {
497 peer_id: None,
498 endpoint: endpoint.into(),
499 abort_notifier: Some(abort_notifier),
500 accepted_at: Instant::now(),
501 },
502 );
503 }
504
505 pub(crate) fn spawn_connection(
506 &mut self,
507 id: ConnectionId,
508 obtained_peer_id: PeerId,
509 endpoint: &ConnectedPoint,
510 connection: NewConnection,
511 handler: THandler,
512 ) {
513 let connection = connection.extract();
514 let conns = self.established.entry(obtained_peer_id).or_default();
515 self.counters.inc_established(endpoint);
516
517 let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size);
518 let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size);
519
520 conns.insert(
521 id,
522 EstablishedConnection {
523 endpoint: endpoint.clone(),
524 sender: command_sender,
525 },
526 );
527 self.established_connection_events.push(event_receiver);
528 if let Some(waker) = self.no_established_connections_waker.take() {
529 waker.wake();
530 }
531
532 let connection = Connection::new(
533 connection,
534 handler,
535 self.substream_upgrade_protocol_override,
536 self.max_negotiating_inbound_streams,
537 self.idle_connection_timeout,
538 );
539
540 let span = tracing::debug_span!(parent: tracing::Span::none(), "new_established_connection", remote_addr = %endpoint.get_remote_address(), %id, peer = %obtained_peer_id);
541 span.follows_from(tracing::Span::current());
542
543 self.executor.spawn(
544 task::new_for_established_connection(
545 id,
546 obtained_peer_id,
547 connection,
548 command_receiver,
549 event_sender,
550 )
551 .instrument(span),
552 )
553 }
554
555 #[tracing::instrument(level = "debug", name = "Pool::poll", skip(self, cx))]
557 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler::ToBehaviour>>
558 where
559 THandler: ConnectionHandler + 'static,
560 <THandler as ConnectionHandler>::OutboundOpenInfo: Send,
561 {
562 match self.established_connection_events.poll_next_unpin(cx) {
567 Poll::Pending => {}
568 Poll::Ready(None) => {
569 self.no_established_connections_waker = Some(cx.waker().clone());
570 }
571
572 Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
573 return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
574 }
575 Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
576 id,
577 peer_id,
578 new_address,
579 })) => {
580 let connection = self
581 .established
582 .get_mut(&peer_id)
583 .expect("Receive `AddressChange` event for established peer.")
584 .get_mut(&id)
585 .expect("Receive `AddressChange` event from established connection");
586 let mut new_endpoint = connection.endpoint.clone();
587 new_endpoint.set_remote_address(new_address);
588 let old_endpoint =
589 std::mem::replace(&mut connection.endpoint, new_endpoint.clone());
590
591 return Poll::Ready(PoolEvent::AddressChange {
592 peer_id,
593 id,
594 new_endpoint,
595 old_endpoint,
596 });
597 }
598 Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, peer_id, error })) => {
599 let connections = self
600 .established
601 .get_mut(&peer_id)
602 .expect("`Closed` event for established connection");
603 let EstablishedConnection { endpoint, .. } =
604 connections.remove(&id).expect("Connection to be present");
605 self.counters.dec_established(&endpoint);
606 let remaining_established_connection_ids: Vec<ConnectionId> =
607 connections.keys().cloned().collect();
608 if remaining_established_connection_ids.is_empty() {
609 self.established.remove(&peer_id);
610 }
611 return Poll::Ready(PoolEvent::ConnectionClosed {
612 id,
613 connected: Connected { endpoint, peer_id },
614 error,
615 remaining_established_connection_ids,
616 });
617 }
618 }
619
620 loop {
622 if let Poll::Ready(Some(result)) =
623 self.new_connection_dropped_listeners.poll_next_unpin(cx)
624 {
625 if let Ok(dropped_connection) = result {
626 self.executor.spawn(async move {
627 let _ = dropped_connection.close().await;
628 });
629 }
630 continue;
631 }
632
633 let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {
634 Poll::Ready(Some(event)) => event,
635 Poll::Pending => break,
636 Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
637 };
638
639 match event {
640 task::PendingConnectionEvent::ConnectionEstablished {
641 id,
642 output: (obtained_peer_id, mut muxer),
643 outgoing,
644 } => {
645 let PendingConnection {
646 peer_id: expected_peer_id,
647 endpoint,
648 abort_notifier: _,
649 accepted_at,
650 } = self
651 .pending
652 .remove(&id)
653 .expect("Entry in `self.pending` for previously pending connection.");
654
655 self.counters.dec_pending(&endpoint);
656
657 let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
658 (
659 PendingPoint::Dialer {
660 role_override,
661 port_use,
662 },
663 Some((address, errors)),
664 ) => (
665 ConnectedPoint::Dialer {
666 address,
667 role_override,
668 port_use,
669 },
670 Some(errors),
671 ),
672 (
673 PendingPoint::Listener {
674 local_addr,
675 send_back_addr,
676 },
677 None,
678 ) => (
679 ConnectedPoint::Listener {
680 local_addr,
681 send_back_addr,
682 },
683 None,
684 ),
685 (PendingPoint::Dialer { .. }, None) => unreachable!(
686 "Established incoming connection via pending outgoing connection."
687 ),
688 (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
689 "Established outgoing connection via pending incoming connection."
690 ),
691 };
692
693 let check_peer_id = || {
694 if let Some(peer) = expected_peer_id {
695 if peer != obtained_peer_id {
696 return Err(PendingConnectionError::WrongPeerId {
697 obtained: obtained_peer_id,
698 endpoint: endpoint.clone(),
699 });
700 }
701 }
702
703 if self.local_id == obtained_peer_id {
704 return Err(PendingConnectionError::LocalPeerId {
705 endpoint: endpoint.clone(),
706 });
707 }
708
709 Ok(())
710 };
711
712 if let Err(error) = check_peer_id() {
713 self.executor.spawn(poll_fn(move |cx| {
714 if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
715 tracing::debug!(
716 peer=%obtained_peer_id,
717 connection=%id,
718 "Failed to close connection to peer: {:?}",
719 e
720 );
721 }
722 Poll::Ready(())
723 }));
724
725 match endpoint {
726 ConnectedPoint::Dialer { .. } => {
727 return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
728 id,
729 error: error
730 .map(|t| vec![(endpoint.get_remote_address().clone(), t)]),
731 peer: expected_peer_id.or(Some(obtained_peer_id)),
732 })
733 }
734 ConnectedPoint::Listener {
735 send_back_addr,
736 local_addr,
737 } => {
738 return Poll::Ready(PoolEvent::PendingInboundConnectionError {
739 id,
740 error,
741 send_back_addr,
742 local_addr,
743 })
744 }
745 };
746 }
747
748 let established_in = accepted_at.elapsed();
749
750 let (connection, drop_listener) = NewConnection::new(muxer);
751 self.new_connection_dropped_listeners.push(drop_listener);
752
753 return Poll::Ready(PoolEvent::ConnectionEstablished {
754 peer_id: obtained_peer_id,
755 endpoint,
756 id,
757 connection,
758 concurrent_dial_errors,
759 established_in,
760 });
761 }
762 task::PendingConnectionEvent::PendingFailed { id, error } => {
763 if let Some(PendingConnection {
764 peer_id,
765 endpoint,
766 abort_notifier: _,
767 accepted_at: _, }) = self.pending.remove(&id)
769 {
770 self.counters.dec_pending(&endpoint);
771
772 match (endpoint, error) {
773 (PendingPoint::Dialer { .. }, Either::Left(error)) => {
774 return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
775 id,
776 error,
777 peer: peer_id,
778 });
779 }
780 (
781 PendingPoint::Listener {
782 send_back_addr,
783 local_addr,
784 },
785 Either::Right(error),
786 ) => {
787 return Poll::Ready(PoolEvent::PendingInboundConnectionError {
788 id,
789 error,
790 send_back_addr,
791 local_addr,
792 });
793 }
794 (PendingPoint::Dialer { .. }, Either::Right(_)) => {
795 unreachable!("Inbound error for outbound connection.")
796 }
797 (PendingPoint::Listener { .. }, Either::Left(_)) => {
798 unreachable!("Outbound error for inbound connection.")
799 }
800 }
801 }
802 }
803 }
804 }
805
806 self.executor.advance_local(cx);
807
808 Poll::Pending
809 }
810}
811
812#[derive(Debug)]
819pub(crate) struct NewConnection {
820 connection: Option<StreamMuxerBox>,
821 drop_sender: Option<oneshot::Sender<StreamMuxerBox>>,
822}
823
824impl NewConnection {
825 fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver<StreamMuxerBox>) {
826 let (sender, receiver) = oneshot::channel();
827
828 (
829 Self {
830 connection: Some(conn),
831 drop_sender: Some(sender),
832 },
833 receiver,
834 )
835 }
836
837 fn extract(mut self) -> StreamMuxerBox {
838 self.connection.take().unwrap()
839 }
840}
841
842impl Drop for NewConnection {
843 fn drop(&mut self) {
844 if let Some(connection) = self.connection.take() {
845 let _ = self
846 .drop_sender
847 .take()
848 .expect("`drop_sender` to always be `Some`")
849 .send(connection);
850 }
851 }
852}
853
854#[derive(Debug, Clone)]
856pub struct ConnectionCounters {
857 pending_incoming: u32,
859 pending_outgoing: u32,
861 established_incoming: u32,
863 established_outgoing: u32,
865}
866
867impl ConnectionCounters {
868 fn new() -> Self {
869 Self {
870 pending_incoming: 0,
871 pending_outgoing: 0,
872 established_incoming: 0,
873 established_outgoing: 0,
874 }
875 }
876
877 pub fn num_connections(&self) -> u32 {
879 self.num_pending() + self.num_established()
880 }
881
882 pub fn num_pending(&self) -> u32 {
884 self.pending_incoming + self.pending_outgoing
885 }
886
887 pub fn num_pending_incoming(&self) -> u32 {
889 self.pending_incoming
890 }
891
892 pub fn num_pending_outgoing(&self) -> u32 {
894 self.pending_outgoing
895 }
896
897 pub fn num_established_incoming(&self) -> u32 {
899 self.established_incoming
900 }
901
902 pub fn num_established_outgoing(&self) -> u32 {
904 self.established_outgoing
905 }
906
907 pub fn num_established(&self) -> u32 {
909 self.established_outgoing + self.established_incoming
910 }
911
912 fn inc_pending(&mut self, endpoint: &PendingPoint) {
913 match endpoint {
914 PendingPoint::Dialer { .. } => {
915 self.pending_outgoing += 1;
916 }
917 PendingPoint::Listener { .. } => {
918 self.pending_incoming += 1;
919 }
920 }
921 }
922
923 fn inc_pending_incoming(&mut self) {
924 self.pending_incoming += 1;
925 }
926
927 fn dec_pending(&mut self, endpoint: &PendingPoint) {
928 match endpoint {
929 PendingPoint::Dialer { .. } => {
930 self.pending_outgoing -= 1;
931 }
932 PendingPoint::Listener { .. } => {
933 self.pending_incoming -= 1;
934 }
935 }
936 }
937
938 fn inc_established(&mut self, endpoint: &ConnectedPoint) {
939 match endpoint {
940 ConnectedPoint::Dialer { .. } => {
941 self.established_outgoing += 1;
942 }
943 ConnectedPoint::Listener { .. } => {
944 self.established_incoming += 1;
945 }
946 }
947 }
948
949 fn dec_established(&mut self, endpoint: &ConnectedPoint) {
950 match endpoint {
951 ConnectedPoint::Dialer { .. } => {
952 self.established_outgoing -= 1;
953 }
954 ConnectedPoint::Listener { .. } => {
955 self.established_incoming -= 1;
956 }
957 }
958 }
959}
960
961pub(crate) struct PoolConfig {
966 pub(crate) executor: Option<Box<dyn Executor + Send>>,
968 pub(crate) task_command_buffer_size: usize,
970 pub(crate) per_connection_event_buffer_size: usize,
973 pub(crate) dial_concurrency_factor: NonZeroU8,
975 pub(crate) idle_connection_timeout: Duration,
977 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
979
980 max_negotiating_inbound_streams: usize,
984}
985
986impl PoolConfig {
987 pub(crate) fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
988 Self {
989 executor,
990 task_command_buffer_size: 32,
991 per_connection_event_buffer_size: 7,
992 dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
993 idle_connection_timeout: Duration::ZERO,
994 substream_upgrade_protocol_override: None,
995 max_negotiating_inbound_streams: 128,
996 }
997 }
998
999 pub(crate) fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1007 self.task_command_buffer_size = n.get() - 1;
1008 self
1009 }
1010
1011 pub(crate) fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1018 self.per_connection_event_buffer_size = n;
1019 self
1020 }
1021
1022 pub(crate) fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1024 self.dial_concurrency_factor = factor;
1025 self
1026 }
1027
1028 pub(crate) fn with_substream_upgrade_protocol_override(
1030 mut self,
1031 v: libp2p_core::upgrade::Version,
1032 ) -> Self {
1033 self.substream_upgrade_protocol_override = Some(v);
1034 self
1035 }
1036
1037 pub(crate) fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1041 self.max_negotiating_inbound_streams = v;
1042 self
1043 }
1044}