litep2p/protocol/notification/
connection.rs1use crate::{
22 protocol::notification::handle::NotificationEventHandle, substream::Substream, PeerId,
23};
24
25use bytes::BytesMut;
26use futures::{FutureExt, SinkExt, Stream, StreamExt};
27use tokio::sync::{
28 mpsc::{Receiver, Sender},
29 oneshot,
30};
31use tokio_util::sync::PollSender;
32
33use std::{
34 pin::Pin,
35 task::{Context, Poll},
36};
37
38const LOG_TARGET: &str = "litep2p::notification::connection";
40
41pub(crate) struct Connection {
43 peer: PeerId,
45
46 inbound: Substream,
48
49 outbound: Substream,
51
52 event_handle: NotificationEventHandle,
54
55 conn_closed_tx: Sender<PeerId>,
58
59 notif_tx: PollSender<(PeerId, BytesMut)>,
61
62 async_rx: Receiver<Vec<u8>>,
64
65 sync_rx: Receiver<Vec<u8>>,
67
68 rx: oneshot::Receiver<()>,
71
72 next_notification: Option<Vec<u8>>,
74}
75
76#[derive(Debug)]
78pub enum NotifyProtocol {
79 Yes,
81
82 No,
84}
85
86impl Connection {
87 pub(crate) fn new(
89 peer: PeerId,
90 inbound: Substream,
91 outbound: Substream,
92 event_handle: NotificationEventHandle,
93 conn_closed_tx: Sender<PeerId>,
94 notif_tx: Sender<(PeerId, BytesMut)>,
95 async_rx: Receiver<Vec<u8>>,
96 sync_rx: Receiver<Vec<u8>>,
97 ) -> (Self, oneshot::Sender<()>) {
98 let (tx, rx) = oneshot::channel();
99
100 (
101 Self {
102 rx,
103 peer,
104 sync_rx,
105 async_rx,
106 inbound,
107 outbound,
108 event_handle,
109 conn_closed_tx,
110 next_notification: None,
111 notif_tx: PollSender::new(notif_tx),
112 },
113 tx,
114 )
115 }
116
117 async fn close_connection(self, notify_protocol: NotifyProtocol) {
122 tracing::trace!(
123 target: LOG_TARGET,
124 peer = ?self.peer,
125 ?notify_protocol,
126 "close notification protocol",
127 );
128
129 let _ = self.inbound.close().await;
130 let _ = self.outbound.close().await;
131
132 if std::matches!(notify_protocol, NotifyProtocol::Yes) {
133 let _ = self.conn_closed_tx.send(self.peer).await;
134 }
135
136 self.event_handle.report_notification_stream_closed(self.peer).await;
137 }
138
139 pub async fn start(mut self) {
140 tracing::debug!(
141 target: LOG_TARGET,
142 peer = ?self.peer,
143 "start connection event loop",
144 );
145
146 loop {
147 match self.next().await {
148 None
149 | Some(ConnectionEvent::CloseConnection {
150 notify: NotifyProtocol::Yes,
151 }) => return self.close_connection(NotifyProtocol::Yes).await,
152 Some(ConnectionEvent::CloseConnection {
153 notify: NotifyProtocol::No,
154 }) => return self.close_connection(NotifyProtocol::No).await,
155 Some(ConnectionEvent::NotificationReceived { notification }) => {
156 if let Err(_) = self.notif_tx.send_item((self.peer, notification)) {
157 return self.close_connection(NotifyProtocol::Yes).await;
158 }
159 }
160 }
161 }
162 }
163}
164
165pub enum ConnectionEvent {
167 CloseConnection {
174 notify: NotifyProtocol,
176 },
177
178 NotificationReceived {
186 notification: BytesMut,
188 },
189}
190
191impl Stream for Connection {
192 type Item = ConnectionEvent;
193
194 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195 let this = Pin::into_inner(self);
196
197 if let Poll::Ready(_) = this.rx.poll_unpin(cx) {
198 return Poll::Ready(Some(ConnectionEvent::CloseConnection {
199 notify: NotifyProtocol::No,
200 }));
201 }
202
203 loop {
204 let notification = match this.next_notification.take() {
205 Some(notification) => Some(notification),
206 None => {
207 let future = async {
208 tokio::select! {
209 notification = this.async_rx.recv() => notification,
210 notification = this.sync_rx.recv() => notification,
211 }
212 };
213 futures::pin_mut!(future);
214
215 match future.poll_unpin(cx) {
216 Poll::Pending => None,
217 Poll::Ready(None) =>
218 return Poll::Ready(Some(ConnectionEvent::CloseConnection {
219 notify: NotifyProtocol::Yes,
220 })),
221 Poll::Ready(Some(notification)) => Some(notification),
222 }
223 }
224 };
225
226 let Some(notification) = notification else {
227 break;
228 };
229
230 match this.outbound.poll_ready_unpin(cx) {
231 Poll::Ready(Ok(())) => {}
232 Poll::Pending => {
233 this.next_notification = Some(notification);
234 break;
235 }
236 Poll::Ready(Err(_)) =>
237 return Poll::Ready(Some(ConnectionEvent::CloseConnection {
238 notify: NotifyProtocol::Yes,
239 })),
240 }
241
242 if let Err(_) = this.outbound.start_send_unpin(notification.into()) {
243 return Poll::Ready(Some(ConnectionEvent::CloseConnection {
244 notify: NotifyProtocol::Yes,
245 }));
246 }
247 }
248
249 match this.outbound.poll_flush_unpin(cx) {
250 Poll::Ready(Err(_)) =>
251 return Poll::Ready(Some(ConnectionEvent::CloseConnection {
252 notify: NotifyProtocol::Yes,
253 })),
254 Poll::Ready(Ok(())) | Poll::Pending => {}
255 }
256
257 if let Err(_) = futures::ready!(this.notif_tx.poll_reserve(cx)) {
258 return Poll::Ready(Some(ConnectionEvent::CloseConnection {
259 notify: NotifyProtocol::Yes,
260 }));
261 }
262
263 match futures::ready!(this.inbound.poll_next_unpin(cx)) {
264 None | Some(Err(_)) => Poll::Ready(Some(ConnectionEvent::CloseConnection {
265 notify: NotifyProtocol::Yes,
266 })),
267 Some(Ok(notification)) =>
268 Poll::Ready(Some(ConnectionEvent::NotificationReceived { notification })),
269 }
270 }
271}