litep2p/yamux/
connection.rs

1// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd.
2//
3// Licensed under the Apache License, Version 2.0 or MIT license, at your option.
4//
5// A copy of the Apache License, Version 2.0 is included in the software as
6// LICENSE-APACHE and a copy of the MIT license is included in the software
7// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0
8// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
9// at https://opensource.org/licenses/MIT.
10
11// This module contains the `Connection` type and associated helpers.
12// A `Connection` wraps an underlying (async) I/O resource and multiplexes
13// `Stream`s over it.
14//
15// The overall idea is as follows: The `Connection` makes progress via calls
16// to its `next_stream` method which polls several futures, one that decodes
17// `Frame`s from the I/O resource, one that consumes `ControlCommand`s
18// from an MPSC channel and another one that consumes `StreamCommand`s from
19// yet another MPSC channel. The latter channel is shared with every `Stream`
20// created and whenever a `Stream` wishes to send a `Frame` to the remote end,
21// it enqueues it into this channel (waiting if the channel is full). The
22// former is shared with every `Control` clone and used to open new outbound
23// streams or to trigger a connection close.
24//
25// The `Connection` updates the `Stream` state based on incoming frames, e.g.
26// it pushes incoming data to the `Stream`'s buffer or increases the sending
27// credit if the remote has sent us a corresponding `Frame::<WindowUpdate>`.
28// Updating a `Stream`'s state acquires a `Mutex`, which every `Stream` has
29// around its `Shared` state. While blocking, we make sure the lock is only
30// held for brief moments and *never* while doing I/O. The only contention is
31// between the `Connection` and a single `Stream`, which should resolve
32// quickly. Ideally, we could use `futures::lock::Mutex` but it does not offer
33// a poll-based API as of futures-preview 0.3.0-alpha.19, which makes it
34// difficult to use in a `Stream`'s `AsyncRead` and `AsyncWrite` trait
35// implementations.
36//
37// Closing a `Connection`
38// ----------------------
39//
40// Every `Control` may send a `ControlCommand::Close` at any time and then
41// waits on a `oneshot::Receiver` for confirmation that the connection is
42// closed. The closing proceeds as follows:
43//
44// 1. As soon as we receive the close command we close the MPSC receiver of `StreamCommand`s. We
45//    want to process any stream commands which are already enqueued at this point but no more.
46// 2. We change the internal shutdown state to `Shutdown::InProgress` which contains the
47//    `oneshot::Sender` of the `Control` which triggered the closure and which we need to notify
48//    eventually.
49// 3. Crucially -- while closing -- we no longer process further control commands, because opening
50//    new streams should no longer be allowed and further close commands would mean we need to save
51//    those `oneshot::Sender`s for later. On the other hand we also do not simply close the control
52//    channel as this would signal to `Control`s that try to send close commands, that the
53//    connection is already closed, which it is not. So we just pause processing control commands
54//    which means such `Control`s will wait.
55// 4. We keep processing I/O and stream commands until the remaining stream commands have all been
56//    consumed, at which point we transition the shutdown state to `Shutdown::Complete`, which
57//    entails sending the final termination frame to the remote, informing the `Control` and now
58//    also closing the control channel.
59// 5. Now that we are closed we go through all pending control commands and tell the `Control`s that
60//    we are closed and we are finally done.
61//
62// While all of this may look complicated, it ensures that `Control`s are
63// only informed about a closed connection when it really is closed.
64//
65// Potential improvements
66// ----------------------
67//
68// There is always more work that can be done to make this a better crate,
69// for example:
70//
71// - Instead of `futures::mpsc` a more efficient channel implementation could be used, e.g.
72//   `tokio-sync`. Unfortunately `tokio-sync` is about to be merged into `tokio` and depending on
73//   this large crate is not attractive, especially given the dire situation around cargo's flag
74//   resolution.
75// - Flushing could be optimised. This would also require adding a `StreamCommand::Flush` so that
76//   `Stream`s can trigger a flush, which they would have to when they run out of credit, or else a
77//   series of send operations might never finish.
78// - If Rust gets async destructors, the `garbage_collect()` method can be removed. Instead a
79//   `Stream` would send a `StreamCommand::Dropped(..)` or something similar and the removal logic
80//   could happen within regular command processing instead of having to scan the whole collection
81//   of `Stream`s on each loop iteration, which is not great.
82
83mod cleanup;
84mod closing;
85mod stream;
86
87use crate::yamux::{
88    error::ConnectionError,
89    frame::{
90        self,
91        header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
92        Frame,
93    },
94    tagged_stream::TaggedStream,
95    Config, Result, WindowUpdateMode, DEFAULT_CREDIT, MAX_ACK_BACKLOG,
96};
97use cleanup::Cleanup;
98use closing::Closing;
99use futures::{
100    channel::mpsc,
101    future::Either,
102    prelude::*,
103    sink::SinkExt,
104    stream::{Fuse, SelectAll},
105};
106use nohash_hasher::IntMap;
107use parking_lot::Mutex;
108use std::{
109    collections::VecDeque,
110    fmt,
111    sync::Arc,
112    task::{Context, Poll, Waker},
113};
114
115pub use stream::{Packet, State, Stream};
116
117/// Logging target for the file.
118const LOG_TARGET: &str = "litep2p::yamux";
119
120/// How the connection is used.
121#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
122pub enum Mode {
123    /// Client to server connection.
124    Client,
125    /// Server to client connection.
126    Server,
127}
128
129/// The connection identifier.
130///
131/// Randomly generated, this is mainly intended to improve log output.
132#[derive(Clone, Copy)]
133pub(crate) struct Id(u32);
134
135impl Id {
136    /// Create a random connection ID.
137    pub(crate) fn random() -> Self {
138        Id(rand::random())
139    }
140}
141
142impl fmt::Debug for Id {
143    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144        write!(f, "{:08x}", self.0)
145    }
146}
147
148impl fmt::Display for Id {
149    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
150        write!(f, "{:08x}", self.0)
151    }
152}
153
154#[derive(Debug)]
155pub struct Connection<T> {
156    inner: ConnectionState<T>,
157}
158
159impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
160    pub fn new(socket: T, cfg: Config, mode: Mode) -> Self {
161        Self {
162            inner: ConnectionState::Active(Active::new(socket, cfg, mode)),
163        }
164    }
165
166    /// Poll for a new outbound stream.
167    ///
168    /// This function will fail if the current state does not allow opening new outbound streams.
169    pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
170        loop {
171            match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
172                ConnectionState::Active(mut active) => match active.poll_new_outbound(cx) {
173                    Poll::Ready(Ok(stream)) => {
174                        self.inner = ConnectionState::Active(active);
175                        return Poll::Ready(Ok(stream));
176                    }
177                    Poll::Pending => {
178                        self.inner = ConnectionState::Active(active);
179                        return Poll::Pending;
180                    }
181                    Poll::Ready(Err(e)) => {
182                        self.inner = ConnectionState::Cleanup(active.cleanup(e));
183                        continue;
184                    }
185                },
186                ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) {
187                    Poll::Ready(Ok(())) => {
188                        self.inner = ConnectionState::Closed;
189                        return Poll::Ready(Err(ConnectionError::Closed));
190                    }
191                    Poll::Ready(Err(e)) => {
192                        self.inner = ConnectionState::Closed;
193                        return Poll::Ready(Err(e));
194                    }
195                    Poll::Pending => {
196                        self.inner = ConnectionState::Closing(inner);
197                        return Poll::Pending;
198                    }
199                },
200                ConnectionState::Cleanup(mut inner) => match inner.poll_unpin(cx) {
201                    Poll::Ready(e) => {
202                        self.inner = ConnectionState::Closed;
203                        return Poll::Ready(Err(e));
204                    }
205                    Poll::Pending => {
206                        self.inner = ConnectionState::Cleanup(inner);
207                        return Poll::Pending;
208                    }
209                },
210                ConnectionState::Closed => {
211                    self.inner = ConnectionState::Closed;
212                    return Poll::Ready(Err(ConnectionError::Closed));
213                }
214                ConnectionState::Poisoned => unreachable!(),
215            }
216        }
217    }
218
219    /// Poll for the next inbound stream.
220    ///
221    /// If this function returns `None`, the underlying connection is closed.
222    pub fn poll_next_inbound(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
223        loop {
224            match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
225                ConnectionState::Active(mut active) => match active.poll(cx) {
226                    Poll::Ready(Ok(stream)) => {
227                        self.inner = ConnectionState::Active(active);
228                        return Poll::Ready(Some(Ok(stream)));
229                    }
230                    Poll::Ready(Err(e)) => {
231                        self.inner = ConnectionState::Cleanup(active.cleanup(e));
232                        continue;
233                    }
234                    Poll::Pending => {
235                        self.inner = ConnectionState::Active(active);
236                        return Poll::Pending;
237                    }
238                },
239                ConnectionState::Closing(mut closing) => match closing.poll_unpin(cx) {
240                    Poll::Ready(Ok(())) => {
241                        self.inner = ConnectionState::Closed;
242                        return Poll::Ready(None);
243                    }
244                    Poll::Ready(Err(e)) => {
245                        self.inner = ConnectionState::Closed;
246                        return Poll::Ready(Some(Err(e)));
247                    }
248                    Poll::Pending => {
249                        self.inner = ConnectionState::Closing(closing);
250                        return Poll::Pending;
251                    }
252                },
253                ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
254                    Poll::Ready(ConnectionError::Closed) => {
255                        self.inner = ConnectionState::Closed;
256                        return Poll::Ready(None);
257                    }
258                    Poll::Ready(other) => {
259                        self.inner = ConnectionState::Closed;
260                        return Poll::Ready(Some(Err(other)));
261                    }
262                    Poll::Pending => {
263                        self.inner = ConnectionState::Cleanup(cleanup);
264                        return Poll::Pending;
265                    }
266                },
267                ConnectionState::Closed => {
268                    self.inner = ConnectionState::Closed;
269                    return Poll::Ready(None);
270                }
271                ConnectionState::Poisoned => unreachable!(),
272            }
273        }
274    }
275
276    /// Close the connection.
277    pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
278        loop {
279            match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
280                ConnectionState::Active(active) => {
281                    self.inner = ConnectionState::Closing(active.close());
282                }
283                ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx)? {
284                    Poll::Ready(()) => {
285                        self.inner = ConnectionState::Closed;
286                    }
287                    Poll::Pending => {
288                        self.inner = ConnectionState::Closing(inner);
289                        return Poll::Pending;
290                    }
291                },
292                ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
293                    Poll::Ready(reason) => {
294                        tracing::warn!(target: LOG_TARGET, "Failure while closing connection: {}", reason);
295                        self.inner = ConnectionState::Closed;
296                        return Poll::Ready(Ok(()));
297                    }
298                    Poll::Pending => {
299                        self.inner = ConnectionState::Cleanup(cleanup);
300                        return Poll::Pending;
301                    }
302                },
303                ConnectionState::Closed => {
304                    self.inner = ConnectionState::Closed;
305                    return Poll::Ready(Ok(()));
306                }
307                ConnectionState::Poisoned => {
308                    unreachable!()
309                }
310            }
311        }
312    }
313}
314
315impl<T> Drop for Connection<T> {
316    fn drop(&mut self) {
317        match &mut self.inner {
318            ConnectionState::Active(active) => active.drop_all_streams(),
319            ConnectionState::Closing(_) => {}
320            ConnectionState::Cleanup(_) => {}
321            ConnectionState::Closed => {}
322            ConnectionState::Poisoned => {}
323        }
324    }
325}
326
327enum ConnectionState<T> {
328    /// The connection is alive and healthy.
329    Active(Active<T>),
330    /// Our user requested to shutdown the connection, we are working on it.
331    Closing(Closing<T>),
332    /// An error occurred and we are cleaning up our resources.
333    Cleanup(Cleanup),
334    /// The connection is closed.
335    Closed,
336    /// Something went wrong during our state transitions. Should never happen unless there is a
337    /// bug.
338    Poisoned,
339}
340
341impl<T> fmt::Debug for ConnectionState<T> {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        match self {
344            ConnectionState::Active(_) => write!(f, "Active"),
345            ConnectionState::Closing(_) => write!(f, "Closing"),
346            ConnectionState::Cleanup(_) => write!(f, "Cleanup"),
347            ConnectionState::Closed => write!(f, "Closed"),
348            ConnectionState::Poisoned => write!(f, "Poisoned"),
349        }
350    }
351}
352
353/// A Yamux connection object.
354///
355/// Wraps the underlying I/O resource and makes progress via its
356/// [`Connection::poll_next_inbound`] method which must be called repeatedly
357/// until `Ok(None)` signals EOF or an error is encountered.
358struct Active<T> {
359    id: Id,
360    mode: Mode,
361    config: Arc<Config>,
362    socket: Fuse<frame::Io<T>>,
363    next_id: u32,
364
365    streams: IntMap<StreamId, Arc<Mutex<stream::Shared>>>,
366    stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
367    no_streams_waker: Option<Waker>,
368
369    pending_frames: VecDeque<Frame<()>>,
370    new_outbound_stream_waker: Option<Waker>,
371}
372
373/// `Stream` to `Connection` commands.
374#[derive(Debug)]
375pub(crate) enum StreamCommand {
376    /// A new frame should be sent to the remote.
377    SendFrame(Frame<Either<Data, WindowUpdate>>),
378    /// Close a stream.
379    CloseStream { ack: bool },
380}
381
382/// Possible actions as a result of incoming frame handling.
383#[derive(Debug)]
384enum Action {
385    /// Nothing to be done.
386    None,
387    /// A new stream has been opened by the remote.
388    New(Stream, Option<Frame<WindowUpdate>>),
389    /// A window update should be sent to the remote.
390    Update(Frame<WindowUpdate>),
391    /// A ping should be answered.
392    Ping(Frame<Ping>),
393    /// A stream should be reset.
394    Reset(Frame<Data>),
395    /// The connection should be terminated.
396    Terminate(Frame<GoAway>),
397}
398
399impl<T> fmt::Debug for Active<T> {
400    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401        f.debug_struct("Connection")
402            .field("id", &self.id)
403            .field("mode", &self.mode)
404            .field("streams", &self.streams.len())
405            .field("next_id", &self.next_id)
406            .finish()
407    }
408}
409
410impl<T> fmt::Display for Active<T> {
411    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
412        write!(
413            f,
414            "(Connection {} {:?} (streams {}))",
415            self.id,
416            self.mode,
417            self.streams.len()
418        )
419    }
420}
421
422impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
423    /// Create a new `Connection` from the given I/O resource.
424    fn new(socket: T, cfg: Config, mode: Mode) -> Self {
425        let id = Id::random();
426        tracing::debug!(target: LOG_TARGET, "new connection: {} ({:?})", id, mode);
427        let socket = frame::Io::new(id, socket, cfg.max_buffer_size).fuse();
428        Active {
429            id,
430            mode,
431            config: Arc::new(cfg),
432            socket,
433            streams: IntMap::default(),
434            stream_receivers: SelectAll::default(),
435            no_streams_waker: None,
436            next_id: match mode {
437                Mode::Client => 1,
438                Mode::Server => 2,
439            },
440            pending_frames: VecDeque::default(),
441            new_outbound_stream_waker: None,
442        }
443    }
444
445    /// Gracefully close the connection to the remote.
446    fn close(self) -> Closing<T> {
447        Closing::new(self.stream_receivers, self.pending_frames, self.socket)
448    }
449
450    /// Cleanup all our resources.
451    ///
452    /// This should be called in the context of an unrecoverable error on the connection.
453    fn cleanup(mut self, error: ConnectionError) -> Cleanup {
454        self.drop_all_streams();
455
456        Cleanup::new(self.stream_receivers, error)
457    }
458
459    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
460        loop {
461            if self.socket.poll_ready_unpin(cx).is_ready() {
462                if let Some(frame) = self.pending_frames.pop_front() {
463                    self.socket.start_send_unpin(frame)?;
464                    continue;
465                }
466            }
467
468            match self.socket.poll_flush_unpin(cx)? {
469                Poll::Ready(()) => {}
470                Poll::Pending => {}
471            }
472
473            match self.stream_receivers.poll_next_unpin(cx) {
474                Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
475                    self.on_send_frame(frame);
476                    continue;
477                }
478                Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
479                    self.on_close_stream(id, ack);
480                    continue;
481                }
482                Poll::Ready(Some((id, None))) => {
483                    self.on_drop_stream(id);
484                    continue;
485                }
486                Poll::Ready(None) => {
487                    self.no_streams_waker = Some(cx.waker().clone());
488                }
489                Poll::Pending => {}
490            }
491
492            match self.socket.poll_next_unpin(cx) {
493                Poll::Ready(Some(frame)) => {
494                    if let Some(stream) = self.on_frame(frame?)? {
495                        return Poll::Ready(Ok(stream));
496                    }
497                    continue;
498                }
499                Poll::Ready(None) => {
500                    return Poll::Ready(Err(ConnectionError::Closed));
501                }
502                Poll::Pending => {}
503            }
504
505            // If we make it this far, at least one of the above must have registered a waker.
506            return Poll::Pending;
507        }
508    }
509
510    fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
511        if self.streams.len() >= self.config.max_num_streams {
512            tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id);
513            return Poll::Ready(Err(ConnectionError::TooManyStreams));
514        }
515
516        if self.ack_backlog() >= MAX_ACK_BACKLOG {
517            tracing::debug!(target: LOG_TARGET, "{MAX_ACK_BACKLOG} streams waiting for ACK, registering task for wake-up until remote acknowledges at least one stream");
518            self.new_outbound_stream_waker = Some(cx.waker().clone());
519            return Poll::Pending;
520        }
521
522        tracing::trace!(target: LOG_TARGET, "{}: creating new outbound stream", self.id);
523
524        let id = self.next_stream_id()?;
525        let extra_credit = self.config.receive_window - DEFAULT_CREDIT;
526
527        if extra_credit > 0 {
528            let mut frame = Frame::window_update(id, extra_credit);
529            frame.header_mut().syn();
530            tracing::trace!(target: LOG_TARGET, "{}/{}: sending initial {}", self.id, id, frame.header());
531            self.pending_frames.push_back(frame.into());
532        }
533
534        let mut stream = self.make_new_outbound_stream(id, self.config.receive_window);
535
536        if extra_credit == 0 {
537            stream.set_flag(stream::Flag::Syn)
538        }
539
540        tracing::debug!(target: LOG_TARGET, "{}: new outbound {} of {}", self.id, stream, self);
541        self.streams.insert(id, stream.clone_shared());
542
543        Poll::Ready(Ok(stream))
544    }
545
546    fn on_send_frame(&mut self, frame: Frame<Either<Data, WindowUpdate>>) {
547        tracing::trace!(target: LOG_TARGET,
548            "{}/{}: sending: {}",
549            self.id,
550            frame.header().stream_id(),
551            frame.header()
552        );
553        self.pending_frames.push_back(frame.into());
554    }
555
556    fn on_close_stream(&mut self, id: StreamId, ack: bool) {
557        tracing::trace!(target: LOG_TARGET, "{}/{}: sending close", self.id, id);
558        self.pending_frames.push_back(Frame::close_stream(id, ack).into());
559    }
560
561    fn on_drop_stream(&mut self, stream_id: StreamId) {
562        let s = self.streams.remove(&stream_id).expect("stream not found");
563
564        tracing::trace!(target: LOG_TARGET, "{}: removing dropped stream {}", self.id, stream_id);
565        let frame = {
566            let mut shared = s.lock();
567            let frame = match shared.update_state(self.id, stream_id, State::Closed) {
568                // The stream was dropped without calling `poll_close`.
569                // We reset the stream to inform the remote of the closure.
570                State::Open { .. } => {
571                    let mut header = Header::data(stream_id, 0);
572                    header.rst();
573                    Some(Frame::new(header))
574                }
575                // The stream was dropped without calling `poll_close`.
576                // We have already received a FIN from remote and send one
577                // back which closes the stream for good.
578                State::RecvClosed => {
579                    let mut header = Header::data(stream_id, 0);
580                    header.fin();
581                    Some(Frame::new(header))
582                }
583                // The stream was properly closed. We already sent our FIN frame.
584                // The remote may be out of credit though and blocked on
585                // writing more data. We may need to reset the stream.
586                State::SendClosed => {
587                    if self.config.window_update_mode == WindowUpdateMode::OnRead
588                        && shared.window == 0
589                    {
590                        // The remote may be waiting for a window update
591                        // which we will never send, so reset the stream now.
592                        let mut header = Header::data(stream_id, 0);
593                        header.rst();
594                        Some(Frame::new(header))
595                    } else {
596                        // The remote has either still credit or will be given more
597                        // (due to an enqueued window update or because the update
598                        // mode is `OnReceive`) or we already have inbound frames in
599                        // the socket buffer which will be processed later. In any
600                        // case we will reply with an RST in `Connection::on_data`
601                        // because the stream will no longer be known.
602                        None
603                    }
604                }
605                // The stream was properly closed. We already have sent our FIN frame. The
606                // remote end has already done so in the past.
607                State::Closed => None,
608            };
609            if let Some(w) = shared.reader.take() {
610                w.wake()
611            }
612            if let Some(w) = shared.writer.take() {
613                w.wake()
614            }
615            frame
616        };
617        if let Some(f) = frame {
618            tracing::trace!(target: LOG_TARGET, "{}/{}: sending: {}", self.id, stream_id, f.header());
619            self.pending_frames.push_back(f.into());
620        }
621    }
622
623    /// Process the result of reading from the socket.
624    ///
625    /// Unless `frame` is `Ok(Some(_))` we will assume the connection got closed
626    /// and return a corresponding error, which terminates the connection.
627    /// Otherwise we process the frame and potentially return a new `Stream`
628    /// if one was opened by the remote.
629    fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
630        tracing::trace!(target: LOG_TARGET, "{}: received: {}", self.id, frame.header());
631
632        if frame.header().flags().contains(header::ACK) {
633            let id = frame.header().stream_id();
634            if let Some(stream) = self.streams.get(&id) {
635                stream.lock().update_state(self.id, id, State::Open { acknowledged: true });
636            }
637            if let Some(waker) = self.new_outbound_stream_waker.take() {
638                waker.wake();
639            }
640        }
641
642        let action = match frame.header().tag() {
643            Tag::Data => self.on_data(frame.into_data()),
644            Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()),
645            Tag::Ping => self.on_ping(&frame.into_ping()),
646            Tag::GoAway => return Err(ConnectionError::Closed),
647        };
648        match action {
649            Action::None => {}
650            Action::New(stream, update) => {
651                tracing::trace!(target: LOG_TARGET, "{}: new inbound {} of {}", self.id, stream, self);
652                if let Some(f) = update {
653                    tracing::trace!(target: LOG_TARGET, "{}/{}: sending update", self.id, f.header().stream_id());
654                    self.pending_frames.push_back(f.into());
655                }
656                return Ok(Some(stream));
657            }
658            Action::Update(f) => {
659                tracing::trace!(target: LOG_TARGET, "{}: sending update: {:?}", self.id, f.header());
660                self.pending_frames.push_back(f.into());
661            }
662            Action::Ping(f) => {
663                tracing::trace!(target: LOG_TARGET, "{}/{}: pong", self.id, f.header().stream_id());
664                self.pending_frames.push_back(f.into());
665            }
666            Action::Reset(f) => {
667                tracing::trace!(target: LOG_TARGET, "{}/{}: sending reset", self.id, f.header().stream_id());
668                self.pending_frames.push_back(f.into());
669            }
670            Action::Terminate(f) => {
671                tracing::trace!(target: LOG_TARGET, "{}: sending term", self.id);
672                self.pending_frames.push_back(f.into());
673            }
674        }
675
676        Ok(None)
677    }
678
679    fn on_data(&mut self, frame: Frame<Data>) -> Action {
680        let stream_id = frame.header().stream_id();
681
682        if frame.header().flags().contains(header::RST) {
683            // stream reset
684            if let Some(s) = self.streams.get_mut(&stream_id) {
685                let mut shared = s.lock();
686                shared.update_state(self.id, stream_id, State::Closed);
687                if let Some(w) = shared.reader.take() {
688                    w.wake()
689                }
690                if let Some(w) = shared.writer.take() {
691                    w.wake()
692                }
693            }
694            return Action::None;
695        }
696
697        let is_finish = frame.header().flags().contains(header::FIN); // half-close
698
699        if frame.header().flags().contains(header::SYN) {
700            // new stream
701            if !self.is_valid_remote_id(stream_id, Tag::Data) {
702                tracing::error!(target: LOG_TARGET, "{}: invalid stream id {}", self.id, stream_id);
703                return Action::Terminate(Frame::protocol_error());
704            }
705            if frame.body().len() > DEFAULT_CREDIT as usize {
706                tracing::error!(target: LOG_TARGET,
707                    "{}/{}: 1st body of stream exceeds default credit",
708                    self.id,
709                    stream_id
710                );
711                return Action::Terminate(Frame::protocol_error());
712            }
713            if self.streams.contains_key(&stream_id) {
714                tracing::error!(target: LOG_TARGET, "{}/{}: stream already exists", self.id, stream_id);
715                return Action::Terminate(Frame::protocol_error());
716            }
717            if self.streams.len() == self.config.max_num_streams {
718                tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id);
719                return Action::Terminate(Frame::internal_error());
720            }
721            let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
722            let mut window_update = None;
723            {
724                let mut shared = stream.shared();
725                if is_finish {
726                    shared.update_state(self.id, stream_id, State::RecvClosed);
727                }
728                shared.window = shared.window.saturating_sub(frame.body_len());
729                shared.buffer.push(frame.into_body());
730
731                if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
732                    if let Some(credit) = shared.next_window_update() {
733                        shared.window += credit;
734                        let mut frame = Frame::window_update(stream_id, credit);
735                        frame.header_mut().ack();
736                        window_update = Some(frame)
737                    }
738                }
739            }
740            if window_update.is_none() {
741                stream.set_flag(stream::Flag::Ack)
742            }
743            self.streams.insert(stream_id, stream.clone_shared());
744            return Action::New(stream, window_update);
745        }
746
747        if let Some(s) = self.streams.get_mut(&stream_id) {
748            let mut shared = s.lock();
749            if frame.body().len() > shared.window as usize {
750                tracing::error!(target: LOG_TARGET,
751                    "{}/{}: frame body larger than window of stream",
752                    self.id,
753                    stream_id
754                );
755                return Action::Terminate(Frame::protocol_error());
756            }
757            if is_finish {
758                shared.update_state(self.id, stream_id, State::RecvClosed);
759            }
760            let max_buffer_size = self.config.max_buffer_size;
761            if shared.buffer.len() >= max_buffer_size {
762                tracing::error!(target: LOG_TARGET,
763                    "{}/{}: buffer of stream grows beyond limit",
764                    self.id,
765                    stream_id
766                );
767                let mut header = Header::data(stream_id, 0);
768                header.rst();
769                return Action::Reset(Frame::new(header));
770            }
771            shared.window = shared.window.saturating_sub(frame.body_len());
772            shared.buffer.push(frame.into_body());
773            if let Some(w) = shared.reader.take() {
774                w.wake()
775            }
776            if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
777                if let Some(credit) = shared.next_window_update() {
778                    shared.window += credit;
779                    let frame = Frame::window_update(stream_id, credit);
780                    return Action::Update(frame);
781                }
782            }
783        } else {
784            tracing::trace!(target: LOG_TARGET,
785                "{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}",
786                self.id,
787                stream_id,
788                frame
789            );
790            // We do not consider this a protocol violation and thus do not send a stream reset
791            // because we may still be processing pending `StreamCommand`s of this stream that were
792            // sent before it has been dropped and "garbage collected". Such a stream reset would
793            // interfere with the frames that still need to be sent, causing premature stream
794            // termination for the remote.
795            //
796            // See https://github.com/paritytech/yamux/issues/110 for details.
797        }
798
799        Action::None
800    }
801
802    fn on_window_update(&mut self, frame: &Frame<WindowUpdate>) -> Action {
803        let stream_id = frame.header().stream_id();
804
805        if frame.header().flags().contains(header::RST) {
806            // stream reset
807            if let Some(s) = self.streams.get_mut(&stream_id) {
808                let mut shared = s.lock();
809                shared.update_state(self.id, stream_id, State::Closed);
810                if let Some(w) = shared.reader.take() {
811                    w.wake()
812                }
813                if let Some(w) = shared.writer.take() {
814                    w.wake()
815                }
816            }
817            return Action::None;
818        }
819
820        let is_finish = frame.header().flags().contains(header::FIN); // half-close
821
822        if frame.header().flags().contains(header::SYN) {
823            // new stream
824            if !self.is_valid_remote_id(stream_id, Tag::WindowUpdate) {
825                tracing::error!(target: LOG_TARGET, "{}: invalid stream id {}", self.id, stream_id);
826                return Action::Terminate(Frame::protocol_error());
827            }
828            if self.streams.contains_key(&stream_id) {
829                tracing::error!(target: LOG_TARGET, "{}/{}: stream already exists", self.id, stream_id);
830                return Action::Terminate(Frame::protocol_error());
831            }
832            if self.streams.len() == self.config.max_num_streams {
833                tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id);
834                return Action::Terminate(Frame::protocol_error());
835            }
836
837            let credit = frame.header().credit() + DEFAULT_CREDIT;
838            let mut stream = self.make_new_inbound_stream(stream_id, credit);
839            stream.set_flag(stream::Flag::Ack);
840
841            if is_finish {
842                stream.shared().update_state(self.id, stream_id, State::RecvClosed);
843            }
844            self.streams.insert(stream_id, stream.clone_shared());
845            return Action::New(stream, None);
846        }
847
848        if let Some(s) = self.streams.get_mut(&stream_id) {
849            let mut shared = s.lock();
850            shared.credit += frame.header().credit();
851            if is_finish {
852                shared.update_state(self.id, stream_id, State::RecvClosed);
853            }
854            if let Some(w) = shared.writer.take() {
855                w.wake()
856            }
857        } else {
858            tracing::trace!(target: LOG_TARGET,
859                "{}/{}: window update for unknown stream, possibly dropped earlier: {:?}",
860                self.id,
861                stream_id,
862                frame
863            );
864            // We do not consider this a protocol violation and thus do not send a stream reset
865            // because we may still be processing pending `StreamCommand`s of this stream that were
866            // sent before it has been dropped and "garbage collected". Such a stream reset would
867            // interfere with the frames that still need to be sent, causing premature stream
868            // termination for the remote.
869            //
870            // See https://github.com/paritytech/yamux/issues/110 for details.
871        }
872
873        Action::None
874    }
875
876    fn on_ping(&mut self, frame: &Frame<Ping>) -> Action {
877        let stream_id = frame.header().stream_id();
878        if frame.header().flags().contains(header::ACK) {
879            // pong
880            return Action::None;
881        }
882        if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
883            let mut hdr = Header::ping(frame.header().nonce());
884            hdr.ack();
885            return Action::Ping(Frame::new(hdr));
886        }
887        tracing::trace!(target: LOG_TARGET,
888            "{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
889            self.id,
890            stream_id,
891            frame
892        );
893        // We do not consider this a protocol violation and thus do not send a stream reset because
894        // we may still be processing pending `StreamCommand`s of this stream that were sent before
895        // it has been dropped and "garbage collected". Such a stream reset would interfere with the
896        // frames that still need to be sent, causing premature stream termination for the remote.
897        //
898        // See https://github.com/paritytech/yamux/issues/110 for details.
899
900        Action::None
901    }
902
903    fn make_new_inbound_stream(&mut self, id: StreamId, credit: u32) -> Stream {
904        let config = self.config.clone();
905
906        let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number.
907        self.stream_receivers.push(TaggedStream::new(id, receiver));
908        if let Some(waker) = self.no_streams_waker.take() {
909            waker.wake();
910        }
911
912        Stream::new_inbound(id, self.id, config, credit, sender)
913    }
914
915    fn make_new_outbound_stream(&mut self, id: StreamId, window: u32) -> Stream {
916        let config = self.config.clone();
917
918        let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number.
919        self.stream_receivers.push(TaggedStream::new(id, receiver));
920        if let Some(waker) = self.no_streams_waker.take() {
921            waker.wake();
922        }
923
924        Stream::new_outbound(id, self.id, config, window, sender)
925    }
926
927    fn next_stream_id(&mut self) -> Result<StreamId> {
928        let proposed = StreamId::new(self.next_id);
929        self.next_id = self.next_id.checked_add(2).ok_or(ConnectionError::NoMoreStreamIds)?;
930        match self.mode {
931            Mode::Client => assert!(proposed.is_client()),
932            Mode::Server => assert!(proposed.is_server()),
933        }
934        Ok(proposed)
935    }
936
937    /// The ACK backlog is defined as the number of outbound streams that have not yet been
938    /// acknowledged.
939    fn ack_backlog(&mut self) -> usize {
940        self.streams
941            .iter()
942            // Whether this is an outbound stream.
943            //
944            // Clients use odd IDs and servers use even IDs.
945            // A stream is outbound if:
946            //
947            // - Its ID is odd and we are the client.
948            // - Its ID is even and we are the server.
949            .filter(|(id, _)| match self.mode {
950                Mode::Client => id.is_client(),
951                Mode::Server => id.is_server(),
952            })
953            .filter(|(_, s)| s.lock().is_pending_ack())
954            .count()
955    }
956
957    // Check if the given stream ID is valid w.r.t. the provided tag and our connection mode.
958    fn is_valid_remote_id(&self, id: StreamId, tag: Tag) -> bool {
959        if tag == Tag::Ping || tag == Tag::GoAway {
960            return id.is_session();
961        }
962        match self.mode {
963            Mode::Client => id.is_server(),
964            Mode::Server => id.is_client(),
965        }
966    }
967}
968
969impl<T> Active<T> {
970    /// Close and drop all `Stream`s and wake any pending `Waker`s.
971    fn drop_all_streams(&mut self) {
972        for (id, s) in self.streams.drain() {
973            let mut shared = s.lock();
974            shared.update_state(self.id, id, State::Closed);
975            if let Some(w) = shared.reader.take() {
976                w.wake()
977            }
978            if let Some(w) = shared.writer.take() {
979                w.wake()
980            }
981        }
982    }
983}