yamux/
connection.rs

1// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd.
2//
3// Licensed under the Apache License, Version 2.0 or MIT license, at your option.
4//
5// A copy of the Apache License, Version 2.0 is included in the software as
6// LICENSE-APACHE and a copy of the MIT license is included in the software
7// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0
8// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
9// at https://opensource.org/licenses/MIT.
10
11//! This module contains the `Connection` type and associated helpers.
12//! A `Connection` wraps an underlying (async) I/O resource and multiplexes
13//! `Stream`s over it.
14
15mod cleanup;
16mod closing;
17mod rtt;
18mod stream;
19
20use crate::tagged_stream::TaggedStream;
21use crate::{
22    error::ConnectionError,
23    frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
24    frame::{self, Frame},
25    Config, DEFAULT_CREDIT,
26};
27use crate::{Result, MAX_ACK_BACKLOG};
28use cleanup::Cleanup;
29use closing::Closing;
30use futures::stream::SelectAll;
31use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse};
32use nohash_hasher::IntMap;
33use parking_lot::Mutex;
34use std::collections::VecDeque;
35use std::task::{Context, Waker};
36use std::{fmt, sync::Arc, task::Poll};
37
38pub use stream::{Packet, State, Stream};
39
40/// How the connection is used.
41#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
42pub enum Mode {
43    /// Client to server connection.
44    Client,
45    /// Server to client connection.
46    Server,
47}
48
49/// The connection identifier.
50///
51/// Randomly generated, this is mainly intended to improve log output.
52#[derive(Clone, Copy)]
53pub(crate) struct Id(u32);
54
55impl Id {
56    /// Create a random connection ID.
57    pub(crate) fn random() -> Self {
58        Id(rand::random())
59    }
60}
61
62impl fmt::Debug for Id {
63    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64        write!(f, "{:08x}", self.0)
65    }
66}
67
68impl fmt::Display for Id {
69    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70        write!(f, "{:08x}", self.0)
71    }
72}
73
74/// A Yamux connection object.
75///
76/// Wraps the underlying I/O resource and makes progress via its
77/// [`Connection::poll_next_inbound`] method which must be called repeatedly
78/// until `Ok(None)` signals EOF or an error is encountered.
79#[derive(Debug)]
80pub struct Connection<T> {
81    inner: ConnectionState<T>,
82}
83
84impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
85    pub fn new(socket: T, cfg: Config, mode: Mode) -> Self {
86        Self {
87            inner: ConnectionState::Active(Active::new(socket, cfg, mode)),
88        }
89    }
90
91    /// Poll for a new outbound stream.
92    ///
93    /// This function will fail if the current state does not allow opening new outbound streams.
94    pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
95        loop {
96            match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
97                ConnectionState::Active(mut active) => match active.poll_new_outbound(cx) {
98                    Poll::Ready(Ok(stream)) => {
99                        self.inner = ConnectionState::Active(active);
100                        return Poll::Ready(Ok(stream));
101                    }
102                    Poll::Pending => {
103                        self.inner = ConnectionState::Active(active);
104                        return Poll::Pending;
105                    }
106                    Poll::Ready(Err(e)) => {
107                        self.inner = ConnectionState::Cleanup(active.cleanup(e));
108                        continue;
109                    }
110                },
111                ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) {
112                    Poll::Ready(Ok(())) => {
113                        self.inner = ConnectionState::Closed;
114                        return Poll::Ready(Err(ConnectionError::Closed));
115                    }
116                    Poll::Ready(Err(e)) => {
117                        self.inner = ConnectionState::Closed;
118                        return Poll::Ready(Err(e));
119                    }
120                    Poll::Pending => {
121                        self.inner = ConnectionState::Closing(inner);
122                        return Poll::Pending;
123                    }
124                },
125                ConnectionState::Cleanup(mut inner) => match inner.poll_unpin(cx) {
126                    Poll::Ready(e) => {
127                        self.inner = ConnectionState::Closed;
128                        return Poll::Ready(Err(e));
129                    }
130                    Poll::Pending => {
131                        self.inner = ConnectionState::Cleanup(inner);
132                        return Poll::Pending;
133                    }
134                },
135                ConnectionState::Closed => {
136                    self.inner = ConnectionState::Closed;
137                    return Poll::Ready(Err(ConnectionError::Closed));
138                }
139                ConnectionState::Poisoned => unreachable!(),
140            }
141        }
142    }
143
144    /// Poll for the next inbound stream.
145    ///
146    /// If this function returns `None`, the underlying connection is closed.
147    pub fn poll_next_inbound(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
148        loop {
149            match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
150                ConnectionState::Active(mut active) => match active.poll(cx) {
151                    Poll::Ready(Ok(stream)) => {
152                        self.inner = ConnectionState::Active(active);
153                        return Poll::Ready(Some(Ok(stream)));
154                    }
155                    Poll::Ready(Err(e)) => {
156                        self.inner = ConnectionState::Cleanup(active.cleanup(e));
157                        continue;
158                    }
159                    Poll::Pending => {
160                        self.inner = ConnectionState::Active(active);
161                        return Poll::Pending;
162                    }
163                },
164                ConnectionState::Closing(mut closing) => match closing.poll_unpin(cx) {
165                    Poll::Ready(Ok(())) => {
166                        self.inner = ConnectionState::Closed;
167                        return Poll::Ready(None);
168                    }
169                    Poll::Ready(Err(e)) => {
170                        self.inner = ConnectionState::Closed;
171                        return Poll::Ready(Some(Err(e)));
172                    }
173                    Poll::Pending => {
174                        self.inner = ConnectionState::Closing(closing);
175                        return Poll::Pending;
176                    }
177                },
178                ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
179                    Poll::Ready(ConnectionError::Closed) => {
180                        self.inner = ConnectionState::Closed;
181                        return Poll::Ready(None);
182                    }
183                    Poll::Ready(other) => {
184                        self.inner = ConnectionState::Closed;
185                        return Poll::Ready(Some(Err(other)));
186                    }
187                    Poll::Pending => {
188                        self.inner = ConnectionState::Cleanup(cleanup);
189                        return Poll::Pending;
190                    }
191                },
192                ConnectionState::Closed => {
193                    self.inner = ConnectionState::Closed;
194                    return Poll::Ready(None);
195                }
196                ConnectionState::Poisoned => unreachable!(),
197            }
198        }
199    }
200
201    /// Close the connection.
202    pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
203        loop {
204            match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
205                ConnectionState::Active(active) => {
206                    self.inner = ConnectionState::Closing(active.close());
207                }
208                ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) {
209                    Poll::Ready(Ok(())) => {
210                        self.inner = ConnectionState::Closed;
211                    }
212                    Poll::Ready(Err(e)) => {
213                        log::warn!("Failure while closing connection: {e}");
214                        self.inner = ConnectionState::Closed;
215                        return Poll::Ready(Err(e));
216                    }
217                    Poll::Pending => {
218                        self.inner = ConnectionState::Closing(inner);
219                        return Poll::Pending;
220                    }
221                },
222                ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
223                    Poll::Ready(reason) => {
224                        log::warn!("Failure while closing connection: {reason}");
225                        self.inner = ConnectionState::Closed;
226                        return Poll::Ready(Ok(()));
227                    }
228                    Poll::Pending => {
229                        self.inner = ConnectionState::Cleanup(cleanup);
230                        return Poll::Pending;
231                    }
232                },
233                ConnectionState::Closed => {
234                    self.inner = ConnectionState::Closed;
235                    return Poll::Ready(Ok(()));
236                }
237                ConnectionState::Poisoned => {
238                    unreachable!()
239                }
240            }
241        }
242    }
243}
244
245impl<T> Drop for Connection<T> {
246    fn drop(&mut self) {
247        match &mut self.inner {
248            ConnectionState::Active(active) => active.drop_all_streams(),
249            ConnectionState::Closing(_) => {}
250            ConnectionState::Cleanup(_) => {}
251            ConnectionState::Closed => {}
252            ConnectionState::Poisoned => {}
253        }
254    }
255}
256
257enum ConnectionState<T> {
258    /// The connection is alive and healthy.
259    Active(Active<T>),
260    /// Our user requested to shutdown the connection, we are working on it.
261    Closing(Closing<T>),
262    /// An error occurred and we are cleaning up our resources.
263    Cleanup(Cleanup),
264    /// The connection is closed.
265    Closed,
266    /// Something went wrong during our state transitions. Should never happen unless there is a bug.
267    Poisoned,
268}
269
270impl<T> fmt::Debug for ConnectionState<T> {
271    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272        match self {
273            ConnectionState::Active(_) => write!(f, "Active"),
274            ConnectionState::Closing(_) => write!(f, "Closing"),
275            ConnectionState::Cleanup(_) => write!(f, "Cleanup"),
276            ConnectionState::Closed => write!(f, "Closed"),
277            ConnectionState::Poisoned => write!(f, "Poisoned"),
278        }
279    }
280}
281
282/// The active state of [`Connection`].
283struct Active<T> {
284    id: Id,
285    mode: Mode,
286    config: Arc<Config>,
287    socket: Fuse<frame::Io<T>>,
288    next_id: u32,
289
290    streams: IntMap<StreamId, Arc<Mutex<stream::Shared>>>,
291    stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
292    no_streams_waker: Option<Waker>,
293
294    pending_read_frame: Option<Frame<()>>,
295    pending_write_frame: Option<Frame<()>>,
296    new_outbound_stream_waker: Option<Waker>,
297
298    rtt: rtt::Rtt,
299
300    /// A stream's `max_stream_receive_window` can grow beyond [`DEFAULT_CREDIT`], see
301    /// [`Stream::next_window_update`]. This field is the sum of the bytes by which all streams'
302    /// `max_stream_receive_window` have each exceeded [`DEFAULT_CREDIT`]. Used to enforce
303    /// [`Config::max_connection_receive_window`].
304    accumulated_max_stream_windows: Arc<Mutex<usize>>,
305}
306/// `Stream` to `Connection` commands.
307#[derive(Debug)]
308pub(crate) enum StreamCommand {
309    /// A new frame should be sent to the remote.
310    SendFrame(Frame<Either<Data, WindowUpdate>>),
311    /// Close a stream.
312    CloseStream { ack: bool },
313}
314
315/// Possible actions as a result of incoming frame handling.
316#[derive(Debug)]
317pub(crate) enum Action {
318    /// Nothing to be done.
319    None,
320    /// A new stream has been opened by the remote.
321    New(Stream),
322    /// A ping should be answered.
323    Ping(Frame<Ping>),
324    /// The connection should be terminated.
325    Terminate(Frame<GoAway>),
326}
327
328impl<T> fmt::Debug for Active<T> {
329    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
330        f.debug_struct("Connection")
331            .field("id", &self.id)
332            .field("mode", &self.mode)
333            .field("streams", &self.streams.len())
334            .field("next_id", &self.next_id)
335            .finish()
336    }
337}
338
339impl<T> fmt::Display for Active<T> {
340    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
341        write!(
342            f,
343            "(Connection {} {:?} (streams {}))",
344            self.id,
345            self.mode,
346            self.streams.len()
347        )
348    }
349}
350
351impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
352    /// Create a new `Connection` from the given I/O resource.
353    fn new(socket: T, cfg: Config, mode: Mode) -> Self {
354        let id = Id::random();
355        log::debug!("new connection: {id} ({mode:?})");
356        let socket = frame::Io::new(id, socket).fuse();
357        Active {
358            id,
359            mode,
360            config: Arc::new(cfg),
361            socket,
362            streams: IntMap::default(),
363            stream_receivers: SelectAll::default(),
364            no_streams_waker: None,
365            next_id: match mode {
366                Mode::Client => 1,
367                Mode::Server => 2,
368            },
369            pending_read_frame: None,
370            pending_write_frame: None,
371            new_outbound_stream_waker: None,
372            rtt: rtt::Rtt::new(),
373            accumulated_max_stream_windows: Default::default(),
374        }
375    }
376
377    /// Gracefully close the connection to the remote.
378    fn close(self) -> Closing<T> {
379        let pending_frames = self
380            .pending_read_frame
381            .into_iter()
382            .chain(self.pending_write_frame)
383            .collect::<VecDeque<Frame<()>>>();
384        Closing::new(self.stream_receivers, pending_frames, self.socket)
385    }
386
387    /// Cleanup all our resources.
388    ///
389    /// This should be called in the context of an unrecoverable error on the connection.
390    fn cleanup(mut self, error: ConnectionError) -> Cleanup {
391        self.drop_all_streams();
392
393        Cleanup::new(self.stream_receivers, error)
394    }
395
396    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
397        loop {
398            if self.socket.poll_ready_unpin(cx).is_ready() {
399                // Note `next_ping` does not register a waker and thus if not called regularly (idle
400                // connection) no ping is sent. This is deliberate as an idle connection does not
401                // need RTT measurements to increase its stream receive window.
402                if let Some(frame) = self.rtt.next_ping() {
403                    self.socket.start_send_unpin(frame.into())?;
404                    continue;
405                }
406
407                // Privilege pending `Pong` and `GoAway` `Frame`s
408                // over `Frame`s from the receivers.
409                if let Some(frame) = self
410                    .pending_read_frame
411                    .take()
412                    .or_else(|| self.pending_write_frame.take())
413                {
414                    self.socket.start_send_unpin(frame)?;
415                    continue;
416                }
417            }
418
419            match self.socket.poll_flush_unpin(cx)? {
420                Poll::Ready(()) => {}
421                Poll::Pending => {}
422            }
423
424            if self.pending_write_frame.is_none() {
425                match self.stream_receivers.poll_next_unpin(cx) {
426                    Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
427                        log::trace!(
428                            "{}/{}: sending: {}",
429                            self.id,
430                            frame.header().stream_id(),
431                            frame.header()
432                        );
433                        self.pending_write_frame.replace(frame.into());
434                        continue;
435                    }
436                    Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
437                        log::trace!("{}/{}: sending close", self.id, id);
438                        self.pending_write_frame
439                            .replace(Frame::close_stream(id, ack).into());
440                        continue;
441                    }
442                    Poll::Ready(Some((id, None))) => {
443                        if let Some(frame) = self.on_drop_stream(id) {
444                            log::trace!("{}/{}: sending: {}", self.id, id, frame.header());
445                            self.pending_write_frame.replace(frame);
446                        };
447                        continue;
448                    }
449                    Poll::Ready(None) => {
450                        self.no_streams_waker = Some(cx.waker().clone());
451                    }
452                    Poll::Pending => {}
453                }
454            }
455
456            if self.pending_read_frame.is_none() {
457                match self.socket.poll_next_unpin(cx) {
458                    Poll::Ready(Some(frame)) => {
459                        match self.on_frame(frame?)? {
460                            Action::None => {}
461                            Action::New(stream) => {
462                                log::trace!("{}: new inbound {} of {}", self.id, stream, self);
463                                return Poll::Ready(Ok(stream));
464                            }
465                            Action::Ping(f) => {
466                                log::trace!("{}/{}: pong", self.id, f.header().stream_id());
467                                self.pending_read_frame.replace(f.into());
468                            }
469                            Action::Terminate(f) => {
470                                log::trace!("{}: sending term", self.id);
471                                self.pending_read_frame.replace(f.into());
472                            }
473                        }
474                        continue;
475                    }
476                    Poll::Ready(None) => {
477                        return Poll::Ready(Err(ConnectionError::Closed));
478                    }
479                    Poll::Pending => {}
480                }
481            }
482
483            // If we make it this far, at least one of the above must have registered a waker.
484            return Poll::Pending;
485        }
486    }
487
488    fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
489        if self.streams.len() >= self.config.max_num_streams {
490            log::error!("{}: maximum number of streams reached", self.id);
491            return Poll::Ready(Err(ConnectionError::TooManyStreams));
492        }
493
494        if self.ack_backlog() >= MAX_ACK_BACKLOG {
495            log::debug!("{MAX_ACK_BACKLOG} streams waiting for ACK, registering task for wake-up until remote acknowledges at least one stream");
496            self.new_outbound_stream_waker = Some(cx.waker().clone());
497            return Poll::Pending;
498        }
499
500        log::trace!("{}: creating new outbound stream", self.id);
501
502        let id = self.next_stream_id()?;
503        let stream = self.make_new_outbound_stream(id);
504
505        log::debug!("{}: new outbound {} of {}", self.id, stream, self);
506        self.streams.insert(id, stream.clone_shared());
507
508        Poll::Ready(Ok(stream))
509    }
510
511    fn on_drop_stream(&mut self, stream_id: StreamId) -> Option<Frame<()>> {
512        let s = self.streams.remove(&stream_id).expect("stream not found");
513
514        log::trace!("{}: removing dropped stream {}", self.id, stream_id);
515        let frame = {
516            let mut shared = s.lock();
517            let frame = match shared.update_state(self.id, stream_id, State::Closed) {
518                // The stream was dropped without calling `poll_close`.
519                // We reset the stream to inform the remote of the closure.
520                State::Open { .. } => {
521                    let mut header = Header::data(stream_id, 0);
522                    header.rst();
523                    Some(Frame::new(header))
524                }
525                // The stream was dropped without calling `poll_close`.
526                // We have already received a FIN from remote and send one
527                // back which closes the stream for good.
528                State::RecvClosed => {
529                    let mut header = Header::data(stream_id, 0);
530                    header.fin();
531                    Some(Frame::new(header))
532                }
533                // The stream was properly closed. We already sent our FIN frame.
534                // The remote may be out of credit though and blocked on
535                // writing more data. We may need to reset the stream.
536                State::SendClosed => {
537                    // The remote has either still credit or will be given more
538                    // due to an enqueued window update or we already have
539                    // inbound frames in the socket buffer which will be
540                    // processed later. In any case we will reply with an RST in
541                    // `Connection::on_data` because the stream will no longer
542                    // be known.
543                    None
544                }
545                // The stream was properly closed. We already have sent our FIN frame. The
546                // remote end has already done so in the past.
547                State::Closed => None,
548            };
549            if let Some(w) = shared.reader.take() {
550                w.wake()
551            }
552            if let Some(w) = shared.writer.take() {
553                w.wake()
554            }
555            frame
556        };
557        frame.map(Into::into)
558    }
559
560    /// Process the result of reading from the socket.
561    ///
562    /// Unless `frame` is `Ok(Some(_))` we will assume the connection got closed
563    /// and return a corresponding error, which terminates the connection.
564    /// Otherwise we process the frame and potentially return a new `Stream`
565    /// if one was opened by the remote.
566    fn on_frame(&mut self, frame: Frame<()>) -> Result<Action> {
567        log::trace!("{}: received: {}", self.id, frame.header());
568
569        if frame.header().flags().contains(header::ACK)
570            && matches!(frame.header().tag(), Tag::Data | Tag::WindowUpdate)
571        {
572            let id = frame.header().stream_id();
573            if let Some(stream) = self.streams.get(&id) {
574                stream
575                    .lock()
576                    .update_state(self.id, id, State::Open { acknowledged: true });
577            }
578            if let Some(waker) = self.new_outbound_stream_waker.take() {
579                waker.wake();
580            }
581        }
582
583        let action = match frame.header().tag() {
584            Tag::Data => self.on_data(frame.into_data()),
585            Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()),
586            Tag::Ping => self.on_ping(&frame.into_ping()),
587            Tag::GoAway => return Err(ConnectionError::Closed),
588        };
589        Ok(action)
590    }
591
592    fn on_data(&mut self, frame: Frame<Data>) -> Action {
593        let stream_id = frame.header().stream_id();
594
595        if frame.header().flags().contains(header::RST) {
596            // stream reset
597            if let Some(s) = self.streams.get_mut(&stream_id) {
598                let mut shared = s.lock();
599                shared.update_state(self.id, stream_id, State::Closed);
600                if let Some(w) = shared.reader.take() {
601                    w.wake()
602                }
603                if let Some(w) = shared.writer.take() {
604                    w.wake()
605                }
606            }
607            return Action::None;
608        }
609
610        let is_finish = frame.header().flags().contains(header::FIN); // half-close
611
612        if frame.header().flags().contains(header::SYN) {
613            // new stream
614            if !self.is_valid_remote_id(stream_id, Tag::Data) {
615                log::error!("{}: invalid stream id {}", self.id, stream_id);
616                return Action::Terminate(Frame::protocol_error());
617            }
618            if frame.body().len() > DEFAULT_CREDIT as usize {
619                log::error!(
620                    "{}/{}: 1st body of stream exceeds default credit",
621                    self.id,
622                    stream_id
623                );
624                return Action::Terminate(Frame::protocol_error());
625            }
626            if self.streams.contains_key(&stream_id) {
627                log::error!("{}/{}: stream already exists", self.id, stream_id);
628                return Action::Terminate(Frame::protocol_error());
629            }
630            if self.streams.len() == self.config.max_num_streams {
631                log::error!("{}: maximum number of streams reached", self.id);
632                return Action::Terminate(Frame::internal_error());
633            }
634            let stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
635            {
636                let mut shared = stream.shared();
637                if is_finish {
638                    shared.update_state(self.id, stream_id, State::RecvClosed);
639                }
640                shared.consume_receive_window(frame.body_len());
641                shared.buffer.push(frame.into_body());
642            }
643            self.streams.insert(stream_id, stream.clone_shared());
644            return Action::New(stream);
645        }
646
647        if let Some(s) = self.streams.get_mut(&stream_id) {
648            let mut shared = s.lock();
649            if frame.body_len() > shared.receive_window() {
650                log::error!(
651                    "{}/{}: frame body larger than window of stream",
652                    self.id,
653                    stream_id
654                );
655                return Action::Terminate(Frame::protocol_error());
656            }
657            if is_finish {
658                shared.update_state(self.id, stream_id, State::RecvClosed);
659            }
660            shared.consume_receive_window(frame.body_len());
661            shared.buffer.push(frame.into_body());
662            if let Some(w) = shared.reader.take() {
663                w.wake()
664            }
665        } else {
666            log::trace!(
667                "{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}",
668                self.id,
669                stream_id,
670                frame
671            );
672            // We do not consider this a protocol violation and thus do not send a stream reset
673            // because we may still be processing pending `StreamCommand`s of this stream that were
674            // sent before it has been dropped and "garbage collected". Such a stream reset would
675            // interfere with the frames that still need to be sent, causing premature stream
676            // termination for the remote.
677            //
678            // See https://github.com/paritytech/yamux/issues/110 for details.
679        }
680
681        Action::None
682    }
683
684    fn on_window_update(&mut self, frame: &Frame<WindowUpdate>) -> Action {
685        let stream_id = frame.header().stream_id();
686
687        if frame.header().flags().contains(header::RST) {
688            // stream reset
689            if let Some(s) = self.streams.get_mut(&stream_id) {
690                let mut shared = s.lock();
691                shared.update_state(self.id, stream_id, State::Closed);
692                if let Some(w) = shared.reader.take() {
693                    w.wake()
694                }
695                if let Some(w) = shared.writer.take() {
696                    w.wake()
697                }
698            }
699            return Action::None;
700        }
701
702        let is_finish = frame.header().flags().contains(header::FIN); // half-close
703
704        if frame.header().flags().contains(header::SYN) {
705            // new stream
706            if !self.is_valid_remote_id(stream_id, Tag::WindowUpdate) {
707                log::error!("{}: invalid stream id {}", self.id, stream_id);
708                return Action::Terminate(Frame::protocol_error());
709            }
710            if self.streams.contains_key(&stream_id) {
711                log::error!("{}/{}: stream already exists", self.id, stream_id);
712                return Action::Terminate(Frame::protocol_error());
713            }
714            if self.streams.len() == self.config.max_num_streams {
715                log::error!("{}: maximum number of streams reached", self.id);
716                return Action::Terminate(Frame::protocol_error());
717            }
718
719            let credit = frame.header().credit() + DEFAULT_CREDIT;
720            let stream = self.make_new_inbound_stream(stream_id, credit);
721
722            if is_finish {
723                stream
724                    .shared()
725                    .update_state(self.id, stream_id, State::RecvClosed);
726            }
727            self.streams.insert(stream_id, stream.clone_shared());
728            return Action::New(stream);
729        }
730
731        if let Some(s) = self.streams.get_mut(&stream_id) {
732            let mut shared = s.lock();
733            shared.increase_send_window_by(frame.header().credit());
734            if is_finish {
735                shared.update_state(self.id, stream_id, State::RecvClosed);
736
737                if let Some(w) = shared.reader.take() {
738                    w.wake()
739                }
740            }
741            if let Some(w) = shared.writer.take() {
742                w.wake()
743            }
744        } else {
745            log::trace!(
746                "{}/{}: window update for unknown stream, possibly dropped earlier: {:?}",
747                self.id,
748                stream_id,
749                frame
750            );
751            // We do not consider this a protocol violation and thus do not send a stream reset
752            // because we may still be processing pending `StreamCommand`s of this stream that were
753            // sent before it has been dropped and "garbage collected". Such a stream reset would
754            // interfere with the frames that still need to be sent, causing premature stream
755            // termination for the remote.
756            //
757            // See https://github.com/paritytech/yamux/issues/110 for details.
758        }
759
760        Action::None
761    }
762
763    fn on_ping(&mut self, frame: &Frame<Ping>) -> Action {
764        let stream_id = frame.header().stream_id();
765        if frame.header().flags().contains(header::ACK) {
766            return self.rtt.handle_pong(frame.nonce());
767        }
768        if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
769            let mut hdr = Header::ping(frame.header().nonce());
770            hdr.ack();
771            return Action::Ping(Frame::new(hdr));
772        }
773        log::debug!(
774            "{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
775            self.id,
776            stream_id,
777            frame
778        );
779        // We do not consider this a protocol violation and thus do not send a stream reset because
780        // we may still be processing pending `StreamCommand`s of this stream that were sent before
781        // it has been dropped and "garbage collected". Such a stream reset would interfere with the
782        // frames that still need to be sent, causing premature stream termination for the remote.
783        //
784        // See https://github.com/paritytech/yamux/issues/110 for details.
785
786        Action::None
787    }
788
789    fn make_new_inbound_stream(&mut self, id: StreamId, credit: u32) -> Stream {
790        let config = self.config.clone();
791
792        let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number.
793        self.stream_receivers.push(TaggedStream::new(id, receiver));
794        if let Some(waker) = self.no_streams_waker.take() {
795            waker.wake();
796        }
797
798        Stream::new_inbound(
799            id,
800            self.id,
801            config,
802            credit,
803            sender,
804            self.rtt.clone(),
805            self.accumulated_max_stream_windows.clone(),
806        )
807    }
808
809    fn make_new_outbound_stream(&mut self, id: StreamId) -> Stream {
810        let config = self.config.clone();
811
812        let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number.
813        self.stream_receivers.push(TaggedStream::new(id, receiver));
814        if let Some(waker) = self.no_streams_waker.take() {
815            waker.wake();
816        }
817
818        Stream::new_outbound(
819            id,
820            self.id,
821            config,
822            sender,
823            self.rtt.clone(),
824            self.accumulated_max_stream_windows.clone(),
825        )
826    }
827
828    fn next_stream_id(&mut self) -> Result<StreamId> {
829        let proposed = StreamId::new(self.next_id);
830        self.next_id = self
831            .next_id
832            .checked_add(2)
833            .ok_or(ConnectionError::NoMoreStreamIds)?;
834        match self.mode {
835            Mode::Client => assert!(proposed.is_client()),
836            Mode::Server => assert!(proposed.is_server()),
837        }
838        Ok(proposed)
839    }
840
841    /// The ACK backlog is defined as the number of outbound streams that have not yet been acknowledged.
842    fn ack_backlog(&mut self) -> usize {
843        self.streams
844            .iter()
845            // Whether this is an outbound stream.
846            //
847            // Clients use odd IDs and servers use even IDs.
848            // A stream is outbound if:
849            //
850            // - Its ID is odd and we are the client.
851            // - Its ID is even and we are the server.
852            .filter(|(id, _)| match self.mode {
853                Mode::Client => id.is_client(),
854                Mode::Server => id.is_server(),
855            })
856            .filter(|(_, s)| s.lock().is_pending_ack())
857            .count()
858    }
859
860    // Check if the given stream ID is valid w.r.t. the provided tag and our connection mode.
861    fn is_valid_remote_id(&self, id: StreamId, tag: Tag) -> bool {
862        if tag == Tag::Ping || tag == Tag::GoAway {
863            return id.is_session();
864        }
865        match self.mode {
866            Mode::Client => id.is_server(),
867            Mode::Server => id.is_client(),
868        }
869    }
870}
871
872impl<T> Active<T> {
873    /// Close and drop all `Stream`s and wake any pending `Waker`s.
874    fn drop_all_streams(&mut self) {
875        for (id, s) in self.streams.drain() {
876            let mut shared = s.lock();
877            shared.update_state(self.id, id, State::Closed);
878            if let Some(w) = shared.reader.take() {
879                w.wake()
880            }
881            if let Some(w) = shared.writer.take() {
882                w.wake()
883            }
884        }
885    }
886}