libp2p_swarm/connection/pool/
task.rs1use super::concurrent_dial::ConcurrentDial;
25use crate::{
26 connection::{
27 self, ConnectionError, ConnectionId, PendingInboundConnectionError,
28 PendingOutboundConnectionError,
29 },
30 transport::TransportError,
31 ConnectionHandler, Multiaddr, PeerId,
32};
33use futures::{
34 channel::{mpsc, oneshot},
35 future::{poll_fn, Either, Future},
36 SinkExt, StreamExt,
37};
38use libp2p_core::muxing::StreamMuxerBox;
39use std::pin::Pin;
40use void::Void;
41
42#[derive(Debug)]
44pub(crate) enum Command<T> {
45 NotifyHandler(T),
47 Close,
50}
51
52pub(crate) enum PendingConnectionEvent {
53 ConnectionEstablished {
54 id: ConnectionId,
55 output: (PeerId, StreamMuxerBox),
56 outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
60 },
61 PendingFailed {
63 id: ConnectionId,
64 error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
65 },
66}
67
68#[derive(Debug)]
69#[allow(deprecated)]
70pub(crate) enum EstablishedConnectionEvent<THandler: ConnectionHandler> {
71 AddressChange {
73 id: ConnectionId,
74 peer_id: PeerId,
75 new_address: Multiaddr,
76 },
77 Notify {
79 id: ConnectionId,
80 peer_id: PeerId,
81 event: THandler::ToBehaviour,
82 },
83 Closed {
88 id: ConnectionId,
89 peer_id: PeerId,
90 error: Option<ConnectionError<THandler::Error>>,
91 handler: THandler,
92 },
93}
94
95pub(crate) async fn new_for_pending_outgoing_connection(
96 connection_id: ConnectionId,
97 dial: ConcurrentDial,
98 abort_receiver: oneshot::Receiver<Void>,
99 mut events: mpsc::Sender<PendingConnectionEvent>,
100) {
101 match futures::future::select(abort_receiver, Box::pin(dial)).await {
102 Either::Left((Err(oneshot::Canceled), _)) => {
103 let _ = events
104 .send(PendingConnectionEvent::PendingFailed {
105 id: connection_id,
106 error: Either::Left(PendingOutboundConnectionError::Aborted),
107 })
108 .await;
109 }
110 Either::Left((Ok(v), _)) => void::unreachable(v),
111 Either::Right((Ok((address, output, errors)), _)) => {
112 let _ = events
113 .send(PendingConnectionEvent::ConnectionEstablished {
114 id: connection_id,
115 output,
116 outgoing: Some((address, errors)),
117 })
118 .await;
119 }
120 Either::Right((Err(e), _)) => {
121 let _ = events
122 .send(PendingConnectionEvent::PendingFailed {
123 id: connection_id,
124 error: Either::Left(PendingOutboundConnectionError::Transport(e)),
125 })
126 .await;
127 }
128 }
129}
130
131pub(crate) async fn new_for_pending_incoming_connection<TFut>(
132 connection_id: ConnectionId,
133 future: TFut,
134 abort_receiver: oneshot::Receiver<Void>,
135 mut events: mpsc::Sender<PendingConnectionEvent>,
136) where
137 TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
138{
139 match futures::future::select(abort_receiver, Box::pin(future)).await {
140 Either::Left((Err(oneshot::Canceled), _)) => {
141 let _ = events
142 .send(PendingConnectionEvent::PendingFailed {
143 id: connection_id,
144 error: Either::Right(PendingInboundConnectionError::Aborted),
145 })
146 .await;
147 }
148 Either::Left((Ok(v), _)) => void::unreachable(v),
149 Either::Right((Ok(output), _)) => {
150 let _ = events
151 .send(PendingConnectionEvent::ConnectionEstablished {
152 id: connection_id,
153 output,
154 outgoing: None,
155 })
156 .await;
157 }
158 Either::Right((Err(e), _)) => {
159 let _ = events
160 .send(PendingConnectionEvent::PendingFailed {
161 id: connection_id,
162 error: Either::Right(PendingInboundConnectionError::Transport(
163 TransportError::Other(e),
164 )),
165 })
166 .await;
167 }
168 }
169}
170
171pub(crate) async fn new_for_established_connection<THandler>(
172 connection_id: ConnectionId,
173 peer_id: PeerId,
174 mut connection: crate::connection::Connection<THandler>,
175 mut command_receiver: mpsc::Receiver<Command<THandler::FromBehaviour>>,
176 mut events: mpsc::Sender<EstablishedConnectionEvent<THandler>>,
177) where
178 THandler: ConnectionHandler,
179{
180 loop {
181 match futures::future::select(
182 command_receiver.next(),
183 poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
184 )
185 .await
186 {
187 Either::Left((Some(command), _)) => match command {
188 Command::NotifyHandler(event) => connection.on_behaviour_event(event),
189 Command::Close => {
190 command_receiver.close();
191 let (handler, closing_muxer) = connection.close();
192
193 let error = closing_muxer.await.err().map(ConnectionError::IO);
194 let _ = events
195 .send(EstablishedConnectionEvent::Closed {
196 id: connection_id,
197 peer_id,
198 error,
199 handler,
200 })
201 .await;
202 return;
203 }
204 },
205
206 Either::Left((None, _)) => return,
208
209 Either::Right((event, _)) => {
210 match event {
211 Ok(connection::Event::Handler(event)) => {
212 let _ = events
213 .send(EstablishedConnectionEvent::Notify {
214 id: connection_id,
215 peer_id,
216 event,
217 })
218 .await;
219 }
220 Ok(connection::Event::AddressChange(new_address)) => {
221 let _ = events
222 .send(EstablishedConnectionEvent::AddressChange {
223 id: connection_id,
224 peer_id,
225 new_address,
226 })
227 .await;
228 }
229 Err(error) => {
230 command_receiver.close();
231 let (handler, _closing_muxer) = connection.close();
232 let _ = events
234 .send(EstablishedConnectionEvent::Closed {
235 id: connection_id,
236 peer_id,
237 error: Some(error),
238 handler,
239 })
240 .await;
241 return;
242 }
243 }
244 }
245 }
246 }
247}