1use crate::connection::{Connection, ConnectionId, PendingPoint};
22use crate::{
23 connection::{
24 Connected, ConnectionError, IncomingInfo, PendingConnectionError,
25 PendingInboundConnectionError, PendingOutboundConnectionError,
26 },
27 transport::TransportError,
28 ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
29};
30use concurrent_dial::ConcurrentDial;
31use fnv::FnvHashMap;
32use futures::prelude::*;
33use futures::stream::SelectAll;
34use futures::{
35 channel::{mpsc, oneshot},
36 future::{poll_fn, BoxFuture, Either},
37 ready,
38 stream::FuturesUnordered,
39};
40use instant::{Duration, Instant};
41use libp2p_core::connection::Endpoint;
42use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
43use std::task::Waker;
44use std::{
45 collections::{hash_map, HashMap},
46 fmt,
47 num::{NonZeroU8, NonZeroUsize},
48 pin::Pin,
49 task::Context,
50 task::Poll,
51};
52use void::Void;
53
54mod concurrent_dial;
55mod task;
56
57enum ExecSwitch {
58 Executor(Box<dyn Executor + Send>),
59 LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
60}
61
62impl ExecSwitch {
63 fn advance_local(&mut self, cx: &mut Context) {
64 match self {
65 ExecSwitch::Executor(_) => {}
66 ExecSwitch::LocalSpawn(local) => {
67 while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
68 }
69 }
70 }
71
72 fn spawn(&mut self, task: impl Future<Output = ()> + Send + 'static) {
73 let task = task.boxed();
74
75 match self {
76 Self::Executor(executor) => executor.exec(task),
77 Self::LocalSpawn(local) => local.push(task),
78 }
79 }
80}
81
82pub(crate) struct Pool<THandler>
84where
85 THandler: ConnectionHandler,
86{
87 local_id: PeerId,
88
89 counters: ConnectionCounters,
91
92 established: FnvHashMap<
94 PeerId,
95 FnvHashMap<ConnectionId, EstablishedConnection<THandler::FromBehaviour>>,
96 >,
97
98 pending: HashMap<ConnectionId, PendingConnection>,
100
101 task_command_buffer_size: usize,
103
104 dial_concurrency_factor: NonZeroU8,
106
107 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
109
110 max_negotiating_inbound_streams: usize,
114
115 per_connection_event_buffer_size: usize,
117
118 executor: ExecSwitch,
121
122 pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,
125
126 pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,
128
129 no_established_connections_waker: Option<Waker>,
131
132 established_connection_events:
134 SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler>>>,
135
136 new_connection_dropped_listeners: FuturesUnordered<oneshot::Receiver<StreamMuxerBox>>,
138
139 idle_connection_timeout: Duration,
141}
142
143#[derive(Debug)]
144pub(crate) struct EstablishedConnection<TInEvent> {
145 endpoint: ConnectedPoint,
146 sender: mpsc::Sender<task::Command<TInEvent>>,
148}
149
150impl<TInEvent> EstablishedConnection<TInEvent> {
151 pub(crate) fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
162 let cmd = task::Command::NotifyHandler(event);
163 self.sender.try_send(cmd).map_err(|e| match e.into_inner() {
164 task::Command::NotifyHandler(event) => event,
165 _ => unreachable!("Expect failed send to return initial event."),
166 })
167 }
168
169 pub(crate) fn poll_ready_notify_handler(
176 &mut self,
177 cx: &mut Context<'_>,
178 ) -> Poll<Result<(), ()>> {
179 self.sender.poll_ready(cx).map_err(|_| ())
180 }
181
182 pub(crate) fn start_close(&mut self) {
186 match self.sender.clone().try_send(task::Command::Close) {
189 Ok(()) => {}
190 Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
191 };
192 }
193}
194
195struct PendingConnection {
196 peer_id: Option<PeerId>,
198 endpoint: PendingPoint,
199 abort_notifier: Option<oneshot::Sender<Void>>,
201 accepted_at: Instant,
203}
204
205impl PendingConnection {
206 fn is_for_same_remote_as(&self, other: PeerId) -> bool {
207 self.peer_id.map_or(false, |peer| peer == other)
208 }
209
210 fn abort(&mut self) {
212 if let Some(notifier) = self.abort_notifier.take() {
213 drop(notifier);
214 }
215 }
216}
217
218impl<THandler: ConnectionHandler> fmt::Debug for Pool<THandler> {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
220 f.debug_struct("Pool")
221 .field("counters", &self.counters)
222 .finish()
223 }
224}
225
226#[derive(Debug)]
228#[allow(deprecated)]
229pub(crate) enum PoolEvent<THandler: ConnectionHandler> {
230 ConnectionEstablished {
232 id: ConnectionId,
233 peer_id: PeerId,
234 endpoint: ConnectedPoint,
235 connection: NewConnection,
236 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
240 established_in: std::time::Duration,
242 },
243
244 ConnectionClosed {
256 id: ConnectionId,
257 connected: Connected,
259 error: Option<ConnectionError<THandler::Error>>,
262 remaining_established_connection_ids: Vec<ConnectionId>,
264 handler: THandler,
265 },
266
267 PendingOutboundConnectionError {
269 id: ConnectionId,
271 error: PendingOutboundConnectionError,
273 peer: Option<PeerId>,
275 },
276
277 PendingInboundConnectionError {
279 id: ConnectionId,
281 send_back_addr: Multiaddr,
283 local_addr: Multiaddr,
285 error: PendingInboundConnectionError,
287 },
288
289 ConnectionEvent {
291 id: ConnectionId,
292 peer_id: PeerId,
293 event: THandler::ToBehaviour,
295 },
296
297 AddressChange {
299 id: ConnectionId,
300 peer_id: PeerId,
301 new_endpoint: ConnectedPoint,
303 old_endpoint: ConnectedPoint,
305 },
306}
307
308impl<THandler> Pool<THandler>
309where
310 THandler: ConnectionHandler,
311{
312 pub(crate) fn new(local_id: PeerId, config: PoolConfig) -> Self {
314 let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
315 let executor = match config.executor {
316 Some(exec) => ExecSwitch::Executor(exec),
317 None => ExecSwitch::LocalSpawn(Default::default()),
318 };
319 Pool {
320 local_id,
321 counters: ConnectionCounters::new(),
322 established: Default::default(),
323 pending: Default::default(),
324 task_command_buffer_size: config.task_command_buffer_size,
325 dial_concurrency_factor: config.dial_concurrency_factor,
326 substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
327 max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
328 per_connection_event_buffer_size: config.per_connection_event_buffer_size,
329 idle_connection_timeout: config.idle_connection_timeout,
330 executor,
331 pending_connection_events_tx,
332 pending_connection_events_rx,
333 no_established_connections_waker: None,
334 established_connection_events: Default::default(),
335 new_connection_dropped_listeners: Default::default(),
336 }
337 }
338
339 pub(crate) fn counters(&self) -> &ConnectionCounters {
341 &self.counters
342 }
343
344 pub(crate) fn get_established(
346 &mut self,
347 id: ConnectionId,
348 ) -> Option<&mut EstablishedConnection<THandler::FromBehaviour>> {
349 self.established
350 .values_mut()
351 .find_map(|connections| connections.get_mut(&id))
352 }
353
354 pub(crate) fn is_connected(&self, id: PeerId) -> bool {
358 self.established.contains_key(&id)
359 }
360
361 pub(crate) fn num_peers(&self) -> usize {
364 self.established.len()
365 }
366
367 pub(crate) fn disconnect(&mut self, peer: PeerId) {
373 if let Some(conns) = self.established.get_mut(&peer) {
374 for (_, conn) in conns.iter_mut() {
375 conn.start_close();
376 }
377 }
378
379 for connection in self
380 .pending
381 .iter_mut()
382 .filter_map(|(_, info)| info.is_for_same_remote_as(peer).then_some(info))
383 {
384 connection.abort()
385 }
386 }
387
388 pub(crate) fn iter_established_connections_of_peer(
390 &mut self,
391 peer: &PeerId,
392 ) -> impl Iterator<Item = ConnectionId> + '_ {
393 match self.established.get(peer) {
394 Some(conns) => either::Either::Left(conns.iter().map(|(id, _)| *id)),
395 None => either::Either::Right(std::iter::empty()),
396 }
397 }
398
399 pub(crate) fn is_dialing(&self, peer: PeerId) -> bool {
401 self.pending.iter().any(|(_, info)| {
402 matches!(info.endpoint, PendingPoint::Dialer { .. }) && info.is_for_same_remote_as(peer)
403 })
404 }
405
406 pub(crate) fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
409 self.established.keys()
410 }
411
412 pub(crate) fn add_outgoing(
415 &mut self,
416 dials: Vec<
417 BoxFuture<
418 'static,
419 (
420 Multiaddr,
421 Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
422 ),
423 >,
424 >,
425 peer: Option<PeerId>,
426 role_override: Endpoint,
427 dial_concurrency_factor_override: Option<NonZeroU8>,
428 connection_id: ConnectionId,
429 ) {
430 let dial = ConcurrentDial::new(
431 dials,
432 dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
433 );
434
435 let (abort_notifier, abort_receiver) = oneshot::channel();
436
437 self.executor
438 .spawn(task::new_for_pending_outgoing_connection(
439 connection_id,
440 dial,
441 abort_receiver,
442 self.pending_connection_events_tx.clone(),
443 ));
444
445 let endpoint = PendingPoint::Dialer { role_override };
446
447 self.counters.inc_pending(&endpoint);
448 self.pending.insert(
449 connection_id,
450 PendingConnection {
451 peer_id: peer,
452 endpoint,
453 abort_notifier: Some(abort_notifier),
454 accepted_at: Instant::now(),
455 },
456 );
457 }
458
459 pub(crate) fn add_incoming<TFut>(
462 &mut self,
463 future: TFut,
464 info: IncomingInfo<'_>,
465 connection_id: ConnectionId,
466 ) where
467 TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
468 {
469 let endpoint = info.create_connected_point();
470
471 let (abort_notifier, abort_receiver) = oneshot::channel();
472
473 self.executor
474 .spawn(task::new_for_pending_incoming_connection(
475 connection_id,
476 future,
477 abort_receiver,
478 self.pending_connection_events_tx.clone(),
479 ));
480
481 self.counters.inc_pending_incoming();
482 self.pending.insert(
483 connection_id,
484 PendingConnection {
485 peer_id: None,
486 endpoint: endpoint.into(),
487 abort_notifier: Some(abort_notifier),
488 accepted_at: Instant::now(),
489 },
490 );
491 }
492
493 pub(crate) fn spawn_connection(
494 &mut self,
495 id: ConnectionId,
496 obtained_peer_id: PeerId,
497 endpoint: &ConnectedPoint,
498 connection: NewConnection,
499 handler: THandler,
500 ) {
501 let connection = connection.extract();
502
503 let conns = self.established.entry(obtained_peer_id).or_default();
504 self.counters.inc_established(endpoint);
505
506 let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size);
507 let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size);
508
509 conns.insert(
510 id,
511 EstablishedConnection {
512 endpoint: endpoint.clone(),
513 sender: command_sender,
514 },
515 );
516 self.established_connection_events.push(event_receiver);
517 if let Some(waker) = self.no_established_connections_waker.take() {
518 waker.wake();
519 }
520
521 let connection = Connection::new(
522 connection,
523 handler,
524 self.substream_upgrade_protocol_override,
525 self.max_negotiating_inbound_streams,
526 self.idle_connection_timeout,
527 );
528
529 self.executor.spawn(task::new_for_established_connection(
530 id,
531 obtained_peer_id,
532 connection,
533 command_receiver,
534 event_sender,
535 ))
536 }
537
538 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler>>
540 where
541 THandler: ConnectionHandler + 'static,
542 <THandler as ConnectionHandler>::OutboundOpenInfo: Send,
543 {
544 match self.established_connection_events.poll_next_unpin(cx) {
549 Poll::Pending => {}
550 Poll::Ready(None) => {
551 self.no_established_connections_waker = Some(cx.waker().clone());
552 }
553
554 Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
555 return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
556 }
557 Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
558 id,
559 peer_id,
560 new_address,
561 })) => {
562 let connection = self
563 .established
564 .get_mut(&peer_id)
565 .expect("Receive `AddressChange` event for established peer.")
566 .get_mut(&id)
567 .expect("Receive `AddressChange` event from established connection");
568 let mut new_endpoint = connection.endpoint.clone();
569 new_endpoint.set_remote_address(new_address);
570 let old_endpoint =
571 std::mem::replace(&mut connection.endpoint, new_endpoint.clone());
572
573 return Poll::Ready(PoolEvent::AddressChange {
574 peer_id,
575 id,
576 new_endpoint,
577 old_endpoint,
578 });
579 }
580 Poll::Ready(Some(task::EstablishedConnectionEvent::Closed {
581 id,
582 peer_id,
583 error,
584 handler,
585 })) => {
586 let connections = self
587 .established
588 .get_mut(&peer_id)
589 .expect("`Closed` event for established connection");
590 let EstablishedConnection { endpoint, .. } =
591 connections.remove(&id).expect("Connection to be present");
592 self.counters.dec_established(&endpoint);
593 let remaining_established_connection_ids: Vec<ConnectionId> =
594 connections.keys().cloned().collect();
595 if remaining_established_connection_ids.is_empty() {
596 self.established.remove(&peer_id);
597 }
598 return Poll::Ready(PoolEvent::ConnectionClosed {
599 id,
600 connected: Connected { endpoint, peer_id },
601 error,
602 remaining_established_connection_ids,
603 handler,
604 });
605 }
606 }
607
608 loop {
610 if let Poll::Ready(Some(result)) =
611 self.new_connection_dropped_listeners.poll_next_unpin(cx)
612 {
613 if let Ok(dropped_connection) = result {
614 self.executor.spawn(async move {
615 let _ = dropped_connection.close().await;
616 });
617 }
618 continue;
619 }
620
621 let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {
622 Poll::Ready(Some(event)) => event,
623 Poll::Pending => break,
624 Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
625 };
626
627 match event {
628 task::PendingConnectionEvent::ConnectionEstablished {
629 id,
630 output: (obtained_peer_id, mut muxer),
631 outgoing,
632 } => {
633 let PendingConnection {
634 peer_id: expected_peer_id,
635 endpoint,
636 abort_notifier: _,
637 accepted_at,
638 } = self
639 .pending
640 .remove(&id)
641 .expect("Entry in `self.pending` for previously pending connection.");
642
643 self.counters.dec_pending(&endpoint);
644
645 let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
646 (PendingPoint::Dialer { role_override }, Some((address, errors))) => (
647 ConnectedPoint::Dialer {
648 address,
649 role_override,
650 },
651 Some(errors),
652 ),
653 (
654 PendingPoint::Listener {
655 local_addr,
656 send_back_addr,
657 },
658 None,
659 ) => (
660 ConnectedPoint::Listener {
661 local_addr,
662 send_back_addr,
663 },
664 None,
665 ),
666 (PendingPoint::Dialer { .. }, None) => unreachable!(
667 "Established incoming connection via pending outgoing connection."
668 ),
669 (PendingPoint::Listener { .. }, Some(_)) => unreachable!(
670 "Established outgoing connection via pending incoming connection."
671 ),
672 };
673
674 let check_peer_id = || {
675 if let Some(peer) = expected_peer_id {
676 if peer != obtained_peer_id {
677 return Err(PendingConnectionError::WrongPeerId {
678 obtained: obtained_peer_id,
679 endpoint: endpoint.clone(),
680 });
681 }
682 }
683
684 if self.local_id == obtained_peer_id {
685 return Err(PendingConnectionError::LocalPeerId {
686 endpoint: endpoint.clone(),
687 });
688 }
689
690 Ok(())
691 };
692
693 if let Err(error) = check_peer_id() {
694 self.executor.spawn(poll_fn(move |cx| {
695 if let Err(e) = ready!(muxer.poll_close_unpin(cx)) {
696 log::debug!(
697 "Failed to close connection {:?} to peer {}: {:?}",
698 id,
699 obtained_peer_id,
700 e
701 );
702 }
703 Poll::Ready(())
704 }));
705
706 match endpoint {
707 ConnectedPoint::Dialer { .. } => {
708 return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
709 id,
710 error: error
711 .map(|t| vec![(endpoint.get_remote_address().clone(), t)]),
712 peer: expected_peer_id.or(Some(obtained_peer_id)),
713 })
714 }
715 ConnectedPoint::Listener {
716 send_back_addr,
717 local_addr,
718 } => {
719 return Poll::Ready(PoolEvent::PendingInboundConnectionError {
720 id,
721 error,
722 send_back_addr,
723 local_addr,
724 })
725 }
726 };
727 }
728
729 let established_in = accepted_at.elapsed();
730
731 let (connection, drop_listener) = NewConnection::new(muxer);
732 self.new_connection_dropped_listeners.push(drop_listener);
733
734 return Poll::Ready(PoolEvent::ConnectionEstablished {
735 peer_id: obtained_peer_id,
736 endpoint,
737 id,
738 connection,
739 concurrent_dial_errors,
740 established_in,
741 });
742 }
743 task::PendingConnectionEvent::PendingFailed { id, error } => {
744 if let Some(PendingConnection {
745 peer_id,
746 endpoint,
747 abort_notifier: _,
748 accepted_at: _, }) = self.pending.remove(&id)
750 {
751 self.counters.dec_pending(&endpoint);
752
753 match (endpoint, error) {
754 (PendingPoint::Dialer { .. }, Either::Left(error)) => {
755 return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
756 id,
757 error,
758 peer: peer_id,
759 });
760 }
761 (
762 PendingPoint::Listener {
763 send_back_addr,
764 local_addr,
765 },
766 Either::Right(error),
767 ) => {
768 return Poll::Ready(PoolEvent::PendingInboundConnectionError {
769 id,
770 error,
771 send_back_addr,
772 local_addr,
773 });
774 }
775 (PendingPoint::Dialer { .. }, Either::Right(_)) => {
776 unreachable!("Inbound error for outbound connection.")
777 }
778 (PendingPoint::Listener { .. }, Either::Left(_)) => {
779 unreachable!("Outbound error for inbound connection.")
780 }
781 }
782 }
783 }
784 }
785 }
786
787 self.executor.advance_local(cx);
788
789 Poll::Pending
790 }
791}
792
793#[derive(Debug)]
800pub(crate) struct NewConnection {
801 connection: Option<StreamMuxerBox>,
802 drop_sender: Option<oneshot::Sender<StreamMuxerBox>>,
803}
804
805impl NewConnection {
806 fn new(conn: StreamMuxerBox) -> (Self, oneshot::Receiver<StreamMuxerBox>) {
807 let (sender, receiver) = oneshot::channel();
808
809 (
810 Self {
811 connection: Some(conn),
812 drop_sender: Some(sender),
813 },
814 receiver,
815 )
816 }
817
818 fn extract(mut self) -> StreamMuxerBox {
819 self.connection.take().unwrap()
820 }
821}
822
823impl Drop for NewConnection {
824 fn drop(&mut self) {
825 if let Some(connection) = self.connection.take() {
826 let _ = self
827 .drop_sender
828 .take()
829 .expect("`drop_sender` to always be `Some`")
830 .send(connection);
831 }
832 }
833}
834
835#[derive(Debug, Clone)]
837pub struct ConnectionCounters {
838 pending_incoming: u32,
840 pending_outgoing: u32,
842 established_incoming: u32,
844 established_outgoing: u32,
846}
847
848impl ConnectionCounters {
849 fn new() -> Self {
850 Self {
851 pending_incoming: 0,
852 pending_outgoing: 0,
853 established_incoming: 0,
854 established_outgoing: 0,
855 }
856 }
857
858 pub fn num_connections(&self) -> u32 {
860 self.num_pending() + self.num_established()
861 }
862
863 pub fn num_pending(&self) -> u32 {
865 self.pending_incoming + self.pending_outgoing
866 }
867
868 pub fn num_pending_incoming(&self) -> u32 {
870 self.pending_incoming
871 }
872
873 pub fn num_pending_outgoing(&self) -> u32 {
875 self.pending_outgoing
876 }
877
878 pub fn num_established_incoming(&self) -> u32 {
880 self.established_incoming
881 }
882
883 pub fn num_established_outgoing(&self) -> u32 {
885 self.established_outgoing
886 }
887
888 pub fn num_established(&self) -> u32 {
890 self.established_outgoing + self.established_incoming
891 }
892
893 fn inc_pending(&mut self, endpoint: &PendingPoint) {
894 match endpoint {
895 PendingPoint::Dialer { .. } => {
896 self.pending_outgoing += 1;
897 }
898 PendingPoint::Listener { .. } => {
899 self.pending_incoming += 1;
900 }
901 }
902 }
903
904 fn inc_pending_incoming(&mut self) {
905 self.pending_incoming += 1;
906 }
907
908 fn dec_pending(&mut self, endpoint: &PendingPoint) {
909 match endpoint {
910 PendingPoint::Dialer { .. } => {
911 self.pending_outgoing -= 1;
912 }
913 PendingPoint::Listener { .. } => {
914 self.pending_incoming -= 1;
915 }
916 }
917 }
918
919 fn inc_established(&mut self, endpoint: &ConnectedPoint) {
920 match endpoint {
921 ConnectedPoint::Dialer { .. } => {
922 self.established_outgoing += 1;
923 }
924 ConnectedPoint::Listener { .. } => {
925 self.established_incoming += 1;
926 }
927 }
928 }
929
930 fn dec_established(&mut self, endpoint: &ConnectedPoint) {
931 match endpoint {
932 ConnectedPoint::Dialer { .. } => {
933 self.established_outgoing -= 1;
934 }
935 ConnectedPoint::Listener { .. } => {
936 self.established_incoming -= 1;
937 }
938 }
939 }
940}
941
942pub(crate) struct PoolConfig {
947 pub(crate) executor: Option<Box<dyn Executor + Send>>,
949 pub(crate) task_command_buffer_size: usize,
951 pub(crate) per_connection_event_buffer_size: usize,
954 pub(crate) dial_concurrency_factor: NonZeroU8,
956 pub(crate) idle_connection_timeout: Duration,
958 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
960
961 max_negotiating_inbound_streams: usize,
965}
966
967impl PoolConfig {
968 pub(crate) fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
969 Self {
970 executor,
971 task_command_buffer_size: 32,
972 per_connection_event_buffer_size: 7,
973 dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
974 idle_connection_timeout: Duration::ZERO,
975 substream_upgrade_protocol_override: None,
976 max_negotiating_inbound_streams: 128,
977 }
978 }
979
980 pub(crate) fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
988 self.task_command_buffer_size = n.get() - 1;
989 self
990 }
991
992 pub(crate) fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
999 self.per_connection_event_buffer_size = n;
1000 self
1001 }
1002
1003 pub(crate) fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1005 self.dial_concurrency_factor = factor;
1006 self
1007 }
1008
1009 pub(crate) fn with_substream_upgrade_protocol_override(
1011 mut self,
1012 v: libp2p_core::upgrade::Version,
1013 ) -> Self {
1014 self.substream_upgrade_protocol_override = Some(v);
1015 self
1016 }
1017
1018 pub(crate) fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1022 self.max_negotiating_inbound_streams = v;
1023 self
1024 }
1025}
1026
1027trait EntryExt<'a, K, V> {
1028 fn expect_occupied(self, msg: &'static str) -> hash_map::OccupiedEntry<'a, K, V>;
1029}
1030
1031impl<'a, K: 'a, V: 'a> EntryExt<'a, K, V> for hash_map::Entry<'a, K, V> {
1032 fn expect_occupied(self, msg: &'static str) -> hash_map::OccupiedEntry<'a, K, V> {
1033 match self {
1034 hash_map::Entry::Occupied(entry) => entry,
1035 hash_map::Entry::Vacant(_) => panic!("{}", msg),
1036 }
1037 }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 use super::*;
1043 use futures::future::Future;
1044
1045 struct Dummy;
1046
1047 impl Executor for Dummy {
1048 fn exec(&self, _: Pin<Box<dyn Future<Output = ()> + Send>>) {}
1049 }
1050}