litep2p/protocol/notification/
connection.rs

1// Copyright 2023 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    protocol::notification::handle::NotificationEventHandle, substream::Substream, PeerId,
23};
24
25use bytes::BytesMut;
26use futures::{FutureExt, SinkExt, Stream, StreamExt};
27use tokio::sync::{
28    mpsc::{Receiver, Sender},
29    oneshot,
30};
31use tokio_util::sync::PollSender;
32
33use std::{
34    pin::Pin,
35    task::{Context, Poll},
36};
37
38/// Logging target for the file.
39const LOG_TARGET: &str = "litep2p::notification::connection";
40
41/// Bidirectional substream pair representing a connection to a remote peer.
42pub(crate) struct Connection {
43    /// Remote peer ID.
44    peer: PeerId,
45
46    /// Inbound substreams for receiving notifications.
47    inbound: Substream,
48
49    /// Outbound substream for sending notifications.
50    outbound: Substream,
51
52    /// Handle for sending notification events to user.
53    event_handle: NotificationEventHandle,
54
55    /// TX channel used to notify [`NotificationProtocol`](super::NotificationProtocol)
56    /// that the connection has been closed.
57    conn_closed_tx: Sender<PeerId>,
58
59    /// TX channel for sending notifications.
60    notif_tx: PollSender<(PeerId, BytesMut)>,
61
62    /// Receiver for asynchronously sent notifications.
63    async_rx: Receiver<Vec<u8>>,
64
65    /// Receiver for synchronously sent notifications.
66    sync_rx: Receiver<Vec<u8>>,
67
68    /// Oneshot receiver used by [`NotificationProtocol`](super::NotificationProtocol)
69    /// to signal that local node wishes the close the connection.
70    rx: oneshot::Receiver<()>,
71
72    /// Next notification to send, if any.
73    next_notification: Option<Vec<u8>>,
74}
75
76/// Notify [`NotificationProtocol`](super::NotificationProtocol) that the connection was closed.
77#[derive(Debug)]
78pub enum NotifyProtocol {
79    /// Notify the protocol handler.
80    Yes,
81
82    /// Do not notify protocol handler.
83    No,
84}
85
86impl Connection {
87    /// Create new [`Connection`].
88    pub(crate) fn new(
89        peer: PeerId,
90        inbound: Substream,
91        outbound: Substream,
92        event_handle: NotificationEventHandle,
93        conn_closed_tx: Sender<PeerId>,
94        notif_tx: Sender<(PeerId, BytesMut)>,
95        async_rx: Receiver<Vec<u8>>,
96        sync_rx: Receiver<Vec<u8>>,
97    ) -> (Self, oneshot::Sender<()>) {
98        let (tx, rx) = oneshot::channel();
99
100        (
101            Self {
102                rx,
103                peer,
104                sync_rx,
105                async_rx,
106                inbound,
107                outbound,
108                event_handle,
109                conn_closed_tx,
110                next_notification: None,
111                notif_tx: PollSender::new(notif_tx),
112            },
113            tx,
114        )
115    }
116
117    /// Connection closed, clean up state.
118    ///
119    /// If [`NotificationProtocol`](super::NotificationProtocol) was the one that initiated
120    /// shut down, it's not notified of connection getting closed.
121    async fn close_connection(self, notify_protocol: NotifyProtocol) {
122        tracing::trace!(
123            target: LOG_TARGET,
124            peer = ?self.peer,
125            ?notify_protocol,
126            "close notification protocol",
127        );
128
129        let _ = self.inbound.close().await;
130        let _ = self.outbound.close().await;
131
132        if std::matches!(notify_protocol, NotifyProtocol::Yes) {
133            let _ = self.conn_closed_tx.send(self.peer).await;
134        }
135
136        self.event_handle.report_notification_stream_closed(self.peer).await;
137    }
138
139    pub async fn start(mut self) {
140        tracing::debug!(
141            target: LOG_TARGET,
142            peer = ?self.peer,
143            "start connection event loop",
144        );
145
146        loop {
147            match self.next().await {
148                None
149                | Some(ConnectionEvent::CloseConnection {
150                    notify: NotifyProtocol::Yes,
151                }) => return self.close_connection(NotifyProtocol::Yes).await,
152                Some(ConnectionEvent::CloseConnection {
153                    notify: NotifyProtocol::No,
154                }) => return self.close_connection(NotifyProtocol::No).await,
155                Some(ConnectionEvent::NotificationReceived { notification }) => {
156                    if let Err(_) = self.notif_tx.send_item((self.peer, notification)) {
157                        return self.close_connection(NotifyProtocol::Yes).await;
158                    }
159                }
160            }
161        }
162    }
163}
164
165/// Connection events.
166pub enum ConnectionEvent {
167    /// Close connection.
168    ///
169    /// If `NotificationProtocol` requested [`Connection`] to be closed, it doesn't need to be
170    /// notified. If, on the other hand, connection closes because it encountered an error or one
171    /// of the substreams was closed, `NotificationProtocol` must be informed so it can inform the
172    /// user.
173    CloseConnection {
174        /// Whether to notify `NotificationProtocol` or not.
175        notify: NotifyProtocol,
176    },
177
178    /// Notification read from the inbound substream.
179    ///
180    /// NOTE: [`Connection`] uses `PollSender::send_item()` to send the notification to user.
181    /// `PollSender::poll_reserve()` must be called before calling `PollSender::send_item()` or it
182    /// will panic. `PollSender::poll_reserve()` is called in the `Stream` implementation below
183    /// before polling the inbound substream to ensure the channel has capacity to receive a
184    /// notification.
185    NotificationReceived {
186        /// Notification.
187        notification: BytesMut,
188    },
189}
190
191impl Stream for Connection {
192    type Item = ConnectionEvent;
193
194    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195        let this = Pin::into_inner(self);
196
197        if let Poll::Ready(_) = this.rx.poll_unpin(cx) {
198            return Poll::Ready(Some(ConnectionEvent::CloseConnection {
199                notify: NotifyProtocol::No,
200            }));
201        }
202
203        loop {
204            let notification = match this.next_notification.take() {
205                Some(notification) => Some(notification),
206                None => {
207                    let future = async {
208                        tokio::select! {
209                            notification = this.async_rx.recv() => notification,
210                            notification = this.sync_rx.recv() => notification,
211                        }
212                    };
213                    futures::pin_mut!(future);
214
215                    match future.poll_unpin(cx) {
216                        Poll::Pending => None,
217                        Poll::Ready(None) =>
218                            return Poll::Ready(Some(ConnectionEvent::CloseConnection {
219                                notify: NotifyProtocol::Yes,
220                            })),
221                        Poll::Ready(Some(notification)) => Some(notification),
222                    }
223                }
224            };
225
226            let Some(notification) = notification else {
227                break;
228            };
229
230            match this.outbound.poll_ready_unpin(cx) {
231                Poll::Ready(Ok(())) => {}
232                Poll::Pending => {
233                    this.next_notification = Some(notification);
234                    break;
235                }
236                Poll::Ready(Err(_)) =>
237                    return Poll::Ready(Some(ConnectionEvent::CloseConnection {
238                        notify: NotifyProtocol::Yes,
239                    })),
240            }
241
242            if let Err(_) = this.outbound.start_send_unpin(notification.into()) {
243                return Poll::Ready(Some(ConnectionEvent::CloseConnection {
244                    notify: NotifyProtocol::Yes,
245                }));
246            }
247        }
248
249        match this.outbound.poll_flush_unpin(cx) {
250            Poll::Ready(Err(_)) =>
251                return Poll::Ready(Some(ConnectionEvent::CloseConnection {
252                    notify: NotifyProtocol::Yes,
253                })),
254            Poll::Ready(Ok(())) | Poll::Pending => {}
255        }
256
257        if let Err(_) = futures::ready!(this.notif_tx.poll_reserve(cx)) {
258            return Poll::Ready(Some(ConnectionEvent::CloseConnection {
259                notify: NotifyProtocol::Yes,
260            }));
261        }
262
263        match futures::ready!(this.inbound.poll_next_unpin(cx)) {
264            None | Some(Err(_)) => Poll::Ready(Some(ConnectionEvent::CloseConnection {
265                notify: NotifyProtocol::Yes,
266            })),
267            Some(Ok(notification)) =>
268                Poll::Ready(Some(ConnectionEvent::NotificationReceived { notification })),
269        }
270    }
271}