1mod cleanup;
16mod closing;
17mod rtt;
18mod stream;
19
20use crate::tagged_stream::TaggedStream;
21use crate::{
22 error::ConnectionError,
23 frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
24 frame::{self, Frame},
25 Config, DEFAULT_CREDIT,
26};
27use crate::{Result, MAX_ACK_BACKLOG};
28use cleanup::Cleanup;
29use closing::Closing;
30use futures::stream::SelectAll;
31use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse};
32use nohash_hasher::IntMap;
33use parking_lot::Mutex;
34use std::collections::VecDeque;
35use std::task::{Context, Waker};
36use std::{fmt, sync::Arc, task::Poll};
37
38pub use stream::{Packet, State, Stream};
39
40#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
42pub enum Mode {
43 Client,
45 Server,
47}
48
49#[derive(Clone, Copy)]
53pub(crate) struct Id(u32);
54
55impl Id {
56 pub(crate) fn random() -> Self {
58 Id(rand::random())
59 }
60}
61
62impl fmt::Debug for Id {
63 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64 write!(f, "{:08x}", self.0)
65 }
66}
67
68impl fmt::Display for Id {
69 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70 write!(f, "{:08x}", self.0)
71 }
72}
73
74#[derive(Debug)]
80pub struct Connection<T> {
81 inner: ConnectionState<T>,
82}
83
84impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
85 pub fn new(socket: T, cfg: Config, mode: Mode) -> Self {
86 Self {
87 inner: ConnectionState::Active(Active::new(socket, cfg, mode)),
88 }
89 }
90
91 pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
95 loop {
96 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
97 ConnectionState::Active(mut active) => match active.poll_new_outbound(cx) {
98 Poll::Ready(Ok(stream)) => {
99 self.inner = ConnectionState::Active(active);
100 return Poll::Ready(Ok(stream));
101 }
102 Poll::Pending => {
103 self.inner = ConnectionState::Active(active);
104 return Poll::Pending;
105 }
106 Poll::Ready(Err(e)) => {
107 self.inner = ConnectionState::Cleanup(active.cleanup(e));
108 continue;
109 }
110 },
111 ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) {
112 Poll::Ready(Ok(())) => {
113 self.inner = ConnectionState::Closed;
114 return Poll::Ready(Err(ConnectionError::Closed));
115 }
116 Poll::Ready(Err(e)) => {
117 self.inner = ConnectionState::Closed;
118 return Poll::Ready(Err(e));
119 }
120 Poll::Pending => {
121 self.inner = ConnectionState::Closing(inner);
122 return Poll::Pending;
123 }
124 },
125 ConnectionState::Cleanup(mut inner) => match inner.poll_unpin(cx) {
126 Poll::Ready(e) => {
127 self.inner = ConnectionState::Closed;
128 return Poll::Ready(Err(e));
129 }
130 Poll::Pending => {
131 self.inner = ConnectionState::Cleanup(inner);
132 return Poll::Pending;
133 }
134 },
135 ConnectionState::Closed => {
136 self.inner = ConnectionState::Closed;
137 return Poll::Ready(Err(ConnectionError::Closed));
138 }
139 ConnectionState::Poisoned => unreachable!(),
140 }
141 }
142 }
143
144 pub fn poll_next_inbound(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
148 loop {
149 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
150 ConnectionState::Active(mut active) => match active.poll(cx) {
151 Poll::Ready(Ok(stream)) => {
152 self.inner = ConnectionState::Active(active);
153 return Poll::Ready(Some(Ok(stream)));
154 }
155 Poll::Ready(Err(e)) => {
156 self.inner = ConnectionState::Cleanup(active.cleanup(e));
157 continue;
158 }
159 Poll::Pending => {
160 self.inner = ConnectionState::Active(active);
161 return Poll::Pending;
162 }
163 },
164 ConnectionState::Closing(mut closing) => match closing.poll_unpin(cx) {
165 Poll::Ready(Ok(())) => {
166 self.inner = ConnectionState::Closed;
167 return Poll::Ready(None);
168 }
169 Poll::Ready(Err(e)) => {
170 self.inner = ConnectionState::Closed;
171 return Poll::Ready(Some(Err(e)));
172 }
173 Poll::Pending => {
174 self.inner = ConnectionState::Closing(closing);
175 return Poll::Pending;
176 }
177 },
178 ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
179 Poll::Ready(ConnectionError::Closed) => {
180 self.inner = ConnectionState::Closed;
181 return Poll::Ready(None);
182 }
183 Poll::Ready(other) => {
184 self.inner = ConnectionState::Closed;
185 return Poll::Ready(Some(Err(other)));
186 }
187 Poll::Pending => {
188 self.inner = ConnectionState::Cleanup(cleanup);
189 return Poll::Pending;
190 }
191 },
192 ConnectionState::Closed => {
193 self.inner = ConnectionState::Closed;
194 return Poll::Ready(None);
195 }
196 ConnectionState::Poisoned => unreachable!(),
197 }
198 }
199 }
200
201 pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
203 loop {
204 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
205 ConnectionState::Active(active) => {
206 self.inner = ConnectionState::Closing(active.close());
207 }
208 ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) {
209 Poll::Ready(Ok(())) => {
210 self.inner = ConnectionState::Closed;
211 }
212 Poll::Ready(Err(e)) => {
213 log::warn!("Failure while closing connection: {e}");
214 self.inner = ConnectionState::Closed;
215 return Poll::Ready(Err(e));
216 }
217 Poll::Pending => {
218 self.inner = ConnectionState::Closing(inner);
219 return Poll::Pending;
220 }
221 },
222 ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
223 Poll::Ready(reason) => {
224 log::warn!("Failure while closing connection: {reason}");
225 self.inner = ConnectionState::Closed;
226 return Poll::Ready(Ok(()));
227 }
228 Poll::Pending => {
229 self.inner = ConnectionState::Cleanup(cleanup);
230 return Poll::Pending;
231 }
232 },
233 ConnectionState::Closed => {
234 self.inner = ConnectionState::Closed;
235 return Poll::Ready(Ok(()));
236 }
237 ConnectionState::Poisoned => {
238 unreachable!()
239 }
240 }
241 }
242 }
243}
244
245impl<T> Drop for Connection<T> {
246 fn drop(&mut self) {
247 match &mut self.inner {
248 ConnectionState::Active(active) => active.drop_all_streams(),
249 ConnectionState::Closing(_) => {}
250 ConnectionState::Cleanup(_) => {}
251 ConnectionState::Closed => {}
252 ConnectionState::Poisoned => {}
253 }
254 }
255}
256
257enum ConnectionState<T> {
258 Active(Active<T>),
260 Closing(Closing<T>),
262 Cleanup(Cleanup),
264 Closed,
266 Poisoned,
268}
269
270impl<T> fmt::Debug for ConnectionState<T> {
271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272 match self {
273 ConnectionState::Active(_) => write!(f, "Active"),
274 ConnectionState::Closing(_) => write!(f, "Closing"),
275 ConnectionState::Cleanup(_) => write!(f, "Cleanup"),
276 ConnectionState::Closed => write!(f, "Closed"),
277 ConnectionState::Poisoned => write!(f, "Poisoned"),
278 }
279 }
280}
281
282struct Active<T> {
284 id: Id,
285 mode: Mode,
286 config: Arc<Config>,
287 socket: Fuse<frame::Io<T>>,
288 next_id: u32,
289
290 streams: IntMap<StreamId, Arc<Mutex<stream::Shared>>>,
291 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
292 no_streams_waker: Option<Waker>,
293
294 pending_read_frame: Option<Frame<()>>,
295 pending_write_frame: Option<Frame<()>>,
296 new_outbound_stream_waker: Option<Waker>,
297
298 rtt: rtt::Rtt,
299
300 accumulated_max_stream_windows: Arc<Mutex<usize>>,
305}
306#[derive(Debug)]
308pub(crate) enum StreamCommand {
309 SendFrame(Frame<Either<Data, WindowUpdate>>),
311 CloseStream { ack: bool },
313}
314
315#[derive(Debug)]
317pub(crate) enum Action {
318 None,
320 New(Stream),
322 Ping(Frame<Ping>),
324 Terminate(Frame<GoAway>),
326}
327
328impl<T> fmt::Debug for Active<T> {
329 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
330 f.debug_struct("Connection")
331 .field("id", &self.id)
332 .field("mode", &self.mode)
333 .field("streams", &self.streams.len())
334 .field("next_id", &self.next_id)
335 .finish()
336 }
337}
338
339impl<T> fmt::Display for Active<T> {
340 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
341 write!(
342 f,
343 "(Connection {} {:?} (streams {}))",
344 self.id,
345 self.mode,
346 self.streams.len()
347 )
348 }
349}
350
351impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
352 fn new(socket: T, cfg: Config, mode: Mode) -> Self {
354 let id = Id::random();
355 log::debug!("new connection: {id} ({mode:?})");
356 let socket = frame::Io::new(id, socket).fuse();
357 Active {
358 id,
359 mode,
360 config: Arc::new(cfg),
361 socket,
362 streams: IntMap::default(),
363 stream_receivers: SelectAll::default(),
364 no_streams_waker: None,
365 next_id: match mode {
366 Mode::Client => 1,
367 Mode::Server => 2,
368 },
369 pending_read_frame: None,
370 pending_write_frame: None,
371 new_outbound_stream_waker: None,
372 rtt: rtt::Rtt::new(),
373 accumulated_max_stream_windows: Default::default(),
374 }
375 }
376
377 fn close(self) -> Closing<T> {
379 let pending_frames = self
380 .pending_read_frame
381 .into_iter()
382 .chain(self.pending_write_frame)
383 .collect::<VecDeque<Frame<()>>>();
384 Closing::new(self.stream_receivers, pending_frames, self.socket)
385 }
386
387 fn cleanup(mut self, error: ConnectionError) -> Cleanup {
391 self.drop_all_streams();
392
393 Cleanup::new(self.stream_receivers, error)
394 }
395
396 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
397 loop {
398 if self.socket.poll_ready_unpin(cx).is_ready() {
399 if let Some(frame) = self.rtt.next_ping() {
403 self.socket.start_send_unpin(frame.into())?;
404 continue;
405 }
406
407 if let Some(frame) = self
410 .pending_read_frame
411 .take()
412 .or_else(|| self.pending_write_frame.take())
413 {
414 self.socket.start_send_unpin(frame)?;
415 continue;
416 }
417 }
418
419 match self.socket.poll_flush_unpin(cx)? {
420 Poll::Ready(()) => {}
421 Poll::Pending => {}
422 }
423
424 if self.pending_write_frame.is_none() {
425 match self.stream_receivers.poll_next_unpin(cx) {
426 Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
427 log::trace!(
428 "{}/{}: sending: {}",
429 self.id,
430 frame.header().stream_id(),
431 frame.header()
432 );
433 self.pending_write_frame.replace(frame.into());
434 continue;
435 }
436 Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
437 log::trace!("{}/{}: sending close", self.id, id);
438 self.pending_write_frame
439 .replace(Frame::close_stream(id, ack).into());
440 continue;
441 }
442 Poll::Ready(Some((id, None))) => {
443 if let Some(frame) = self.on_drop_stream(id) {
444 log::trace!("{}/{}: sending: {}", self.id, id, frame.header());
445 self.pending_write_frame.replace(frame);
446 };
447 continue;
448 }
449 Poll::Ready(None) => {
450 self.no_streams_waker = Some(cx.waker().clone());
451 }
452 Poll::Pending => {}
453 }
454 }
455
456 if self.pending_read_frame.is_none() {
457 match self.socket.poll_next_unpin(cx) {
458 Poll::Ready(Some(frame)) => {
459 match self.on_frame(frame?)? {
460 Action::None => {}
461 Action::New(stream) => {
462 log::trace!("{}: new inbound {} of {}", self.id, stream, self);
463 return Poll::Ready(Ok(stream));
464 }
465 Action::Ping(f) => {
466 log::trace!("{}/{}: pong", self.id, f.header().stream_id());
467 self.pending_read_frame.replace(f.into());
468 }
469 Action::Terminate(f) => {
470 log::trace!("{}: sending term", self.id);
471 self.pending_read_frame.replace(f.into());
472 }
473 }
474 continue;
475 }
476 Poll::Ready(None) => {
477 return Poll::Ready(Err(ConnectionError::Closed));
478 }
479 Poll::Pending => {}
480 }
481 }
482
483 return Poll::Pending;
485 }
486 }
487
488 fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
489 if self.streams.len() >= self.config.max_num_streams {
490 log::error!("{}: maximum number of streams reached", self.id);
491 return Poll::Ready(Err(ConnectionError::TooManyStreams));
492 }
493
494 if self.ack_backlog() >= MAX_ACK_BACKLOG {
495 log::debug!("{MAX_ACK_BACKLOG} streams waiting for ACK, registering task for wake-up until remote acknowledges at least one stream");
496 self.new_outbound_stream_waker = Some(cx.waker().clone());
497 return Poll::Pending;
498 }
499
500 log::trace!("{}: creating new outbound stream", self.id);
501
502 let id = self.next_stream_id()?;
503 let stream = self.make_new_outbound_stream(id);
504
505 log::debug!("{}: new outbound {} of {}", self.id, stream, self);
506 self.streams.insert(id, stream.clone_shared());
507
508 Poll::Ready(Ok(stream))
509 }
510
511 fn on_drop_stream(&mut self, stream_id: StreamId) -> Option<Frame<()>> {
512 let s = self.streams.remove(&stream_id).expect("stream not found");
513
514 log::trace!("{}: removing dropped stream {}", self.id, stream_id);
515 let frame = {
516 let mut shared = s.lock();
517 let frame = match shared.update_state(self.id, stream_id, State::Closed) {
518 State::Open { .. } => {
521 let mut header = Header::data(stream_id, 0);
522 header.rst();
523 Some(Frame::new(header))
524 }
525 State::RecvClosed => {
529 let mut header = Header::data(stream_id, 0);
530 header.fin();
531 Some(Frame::new(header))
532 }
533 State::SendClosed => {
537 None
544 }
545 State::Closed => None,
548 };
549 if let Some(w) = shared.reader.take() {
550 w.wake()
551 }
552 if let Some(w) = shared.writer.take() {
553 w.wake()
554 }
555 frame
556 };
557 frame.map(Into::into)
558 }
559
560 fn on_frame(&mut self, frame: Frame<()>) -> Result<Action> {
567 log::trace!("{}: received: {}", self.id, frame.header());
568
569 if frame.header().flags().contains(header::ACK)
570 && matches!(frame.header().tag(), Tag::Data | Tag::WindowUpdate)
571 {
572 let id = frame.header().stream_id();
573 if let Some(stream) = self.streams.get(&id) {
574 stream
575 .lock()
576 .update_state(self.id, id, State::Open { acknowledged: true });
577 }
578 if let Some(waker) = self.new_outbound_stream_waker.take() {
579 waker.wake();
580 }
581 }
582
583 let action = match frame.header().tag() {
584 Tag::Data => self.on_data(frame.into_data()),
585 Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()),
586 Tag::Ping => self.on_ping(&frame.into_ping()),
587 Tag::GoAway => return Err(ConnectionError::Closed),
588 };
589 Ok(action)
590 }
591
592 fn on_data(&mut self, frame: Frame<Data>) -> Action {
593 let stream_id = frame.header().stream_id();
594
595 if frame.header().flags().contains(header::RST) {
596 if let Some(s) = self.streams.get_mut(&stream_id) {
598 let mut shared = s.lock();
599 shared.update_state(self.id, stream_id, State::Closed);
600 if let Some(w) = shared.reader.take() {
601 w.wake()
602 }
603 if let Some(w) = shared.writer.take() {
604 w.wake()
605 }
606 }
607 return Action::None;
608 }
609
610 let is_finish = frame.header().flags().contains(header::FIN); if frame.header().flags().contains(header::SYN) {
613 if !self.is_valid_remote_id(stream_id, Tag::Data) {
615 log::error!("{}: invalid stream id {}", self.id, stream_id);
616 return Action::Terminate(Frame::protocol_error());
617 }
618 if self.streams.contains_key(&stream_id) {
619 log::error!("{}/{}: stream already exists", self.id, stream_id);
620 return Action::Terminate(Frame::protocol_error());
621 }
622 if self.streams.len() == self.config.max_num_streams {
623 log::error!("{}: maximum number of streams reached", self.id);
624 return Action::Terminate(Frame::internal_error());
625 }
626 if frame.body().len() > DEFAULT_CREDIT as usize {
627 log::error!(
628 "{}/{}: 1st body of stream exceeds default credit",
629 self.id,
630 stream_id
631 );
632 return Action::Terminate(Frame::protocol_error());
633 }
634 let stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
635 {
636 let mut shared = stream.shared();
637 if is_finish {
638 shared.update_state(self.id, stream_id, State::RecvClosed);
639 }
640 if let Err(_err) = shared.consume_receive_window(frame.body_len()) {
641 log::error!(
642 "{}/{}: 1st body of stream exceeds default credit",
643 self.id,
644 stream_id
645 );
646
647 return Action::Terminate(Frame::protocol_error());
648 }
649 shared.buffer.push(frame.into_body());
650 }
651 self.streams.insert(stream_id, stream.clone_shared());
652 return Action::New(stream);
653 }
654
655 if let Some(s) = self.streams.get_mut(&stream_id) {
656 let mut shared = s.lock();
657
658 if let Err(_err) = shared.consume_receive_window(frame.body_len()) {
659 log::error!(
660 "{}/{}: frame body larger than window of stream",
661 self.id,
662 stream_id
663 );
664
665 return Action::Terminate(Frame::protocol_error());
666 }
667
668 if is_finish {
669 shared.update_state(self.id, stream_id, State::RecvClosed);
670 }
671
672 shared.buffer.push(frame.into_body());
673 if let Some(w) = shared.reader.take() {
674 w.wake()
675 }
676 } else {
677 log::trace!(
678 "{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}",
679 self.id,
680 stream_id,
681 frame
682 );
683 }
691
692 Action::None
693 }
694
695 fn on_window_update(&mut self, frame: &Frame<WindowUpdate>) -> Action {
696 let stream_id = frame.header().stream_id();
697
698 if frame.header().flags().contains(header::RST) {
699 if let Some(s) = self.streams.get_mut(&stream_id) {
701 let mut shared = s.lock();
702 shared.update_state(self.id, stream_id, State::Closed);
703 if let Some(w) = shared.reader.take() {
704 w.wake()
705 }
706 if let Some(w) = shared.writer.take() {
707 w.wake()
708 }
709 }
710 return Action::None;
711 }
712
713 let is_finish = frame.header().flags().contains(header::FIN); if frame.header().flags().contains(header::SYN) {
716 if !self.is_valid_remote_id(stream_id, Tag::WindowUpdate) {
718 log::error!("{}: invalid stream id {}", self.id, stream_id);
719 return Action::Terminate(Frame::protocol_error());
720 }
721 if self.streams.contains_key(&stream_id) {
722 log::error!("{}/{}: stream already exists", self.id, stream_id);
723 return Action::Terminate(Frame::protocol_error());
724 }
725 if self.streams.len() == self.config.max_num_streams {
726 log::error!("{}: maximum number of streams reached", self.id);
727 return Action::Terminate(Frame::protocol_error());
728 }
729
730 let credit = frame.header().credit() + DEFAULT_CREDIT;
731 let stream = self.make_new_inbound_stream(stream_id, credit);
732
733 if is_finish {
734 stream
735 .shared()
736 .update_state(self.id, stream_id, State::RecvClosed);
737 }
738 self.streams.insert(stream_id, stream.clone_shared());
739 return Action::New(stream);
740 }
741
742 if let Some(s) = self.streams.get_mut(&stream_id) {
743 let mut shared = s.lock();
744 if let Err(err) = shared.increase_send_window_by(frame.header().credit()) {
745 log::error!(
746 "{}/{}: could not increase the send window, {err}",
747 self.id,
748 stream_id
749 );
750 return Action::Terminate(Frame::protocol_error());
751 }
752 if is_finish {
753 shared.update_state(self.id, stream_id, State::RecvClosed);
754
755 if let Some(w) = shared.reader.take() {
756 w.wake()
757 }
758 }
759 if let Some(w) = shared.writer.take() {
760 w.wake()
761 }
762 } else {
763 log::trace!(
764 "{}/{}: window update for unknown stream, possibly dropped earlier: {:?}",
765 self.id,
766 stream_id,
767 frame
768 );
769 }
777
778 Action::None
779 }
780
781 fn on_ping(&mut self, frame: &Frame<Ping>) -> Action {
782 let stream_id = frame.header().stream_id();
783 if frame.header().flags().contains(header::ACK) {
784 return self.rtt.handle_pong(frame.nonce());
785 }
786 if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
787 let mut hdr = Header::ping(frame.header().nonce());
788 hdr.ack();
789 return Action::Ping(Frame::new(hdr));
790 }
791 log::debug!(
792 "{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
793 self.id,
794 stream_id,
795 frame
796 );
797 Action::None
805 }
806
807 fn make_new_inbound_stream(&mut self, id: StreamId, credit: u32) -> Stream {
808 let config = self.config.clone();
809
810 let (sender, receiver) = mpsc::channel(10); self.stream_receivers.push(TaggedStream::new(id, receiver));
812 if let Some(waker) = self.no_streams_waker.take() {
813 waker.wake();
814 }
815
816 Stream::new_inbound(
817 id,
818 self.id,
819 config,
820 credit,
821 sender,
822 self.rtt.clone(),
823 self.accumulated_max_stream_windows.clone(),
824 )
825 }
826
827 fn make_new_outbound_stream(&mut self, id: StreamId) -> Stream {
828 let config = self.config.clone();
829
830 let (sender, receiver) = mpsc::channel(10); self.stream_receivers.push(TaggedStream::new(id, receiver));
832 if let Some(waker) = self.no_streams_waker.take() {
833 waker.wake();
834 }
835
836 Stream::new_outbound(
837 id,
838 self.id,
839 config,
840 sender,
841 self.rtt.clone(),
842 self.accumulated_max_stream_windows.clone(),
843 )
844 }
845
846 fn next_stream_id(&mut self) -> Result<StreamId> {
847 let proposed = StreamId::new(self.next_id);
848 self.next_id = self
849 .next_id
850 .checked_add(2)
851 .ok_or(ConnectionError::NoMoreStreamIds)?;
852 match self.mode {
853 Mode::Client => assert!(proposed.is_client()),
854 Mode::Server => assert!(proposed.is_server()),
855 }
856 Ok(proposed)
857 }
858
859 fn ack_backlog(&mut self) -> usize {
861 self.streams
862 .iter()
863 .filter(|(id, _)| match self.mode {
871 Mode::Client => id.is_client(),
872 Mode::Server => id.is_server(),
873 })
874 .filter(|(_, s)| s.lock().is_pending_ack())
875 .count()
876 }
877
878 fn is_valid_remote_id(&self, id: StreamId, tag: Tag) -> bool {
880 if tag == Tag::Ping || tag == Tag::GoAway {
881 return id.is_session();
882 }
883 match self.mode {
884 Mode::Client => id.is_server(),
885 Mode::Server => id.is_client(),
886 }
887 }
888}
889
890impl<T> Active<T> {
891 fn drop_all_streams(&mut self) {
893 for (id, s) in self.streams.drain() {
894 let mut shared = s.lock();
895 shared.update_state(self.id, id, State::Closed);
896 if let Some(w) = shared.reader.take() {
897 w.wake()
898 }
899 if let Some(w) = shared.writer.take() {
900 w.wake()
901 }
902 }
903 }
904}