1mod cleanup;
16mod closing;
17mod rtt;
18mod stream;
19
20use crate::tagged_stream::TaggedStream;
21use crate::{
22 error::ConnectionError,
23 frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
24 frame::{self, Frame},
25 Config, DEFAULT_CREDIT,
26};
27use crate::{Result, MAX_ACK_BACKLOG};
28use cleanup::Cleanup;
29use closing::Closing;
30use futures::stream::SelectAll;
31use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse};
32use nohash_hasher::IntMap;
33use parking_lot::Mutex;
34use std::collections::VecDeque;
35use std::task::{Context, Waker};
36use std::{fmt, sync::Arc, task::Poll};
37
38pub use stream::{Packet, State, Stream};
39
40#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
42pub enum Mode {
43 Client,
45 Server,
47}
48
49#[derive(Clone, Copy)]
53pub(crate) struct Id(u32);
54
55impl Id {
56 pub(crate) fn random() -> Self {
58 Id(rand::random())
59 }
60}
61
62impl fmt::Debug for Id {
63 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64 write!(f, "{:08x}", self.0)
65 }
66}
67
68impl fmt::Display for Id {
69 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70 write!(f, "{:08x}", self.0)
71 }
72}
73
74#[derive(Debug)]
80pub struct Connection<T> {
81 inner: ConnectionState<T>,
82}
83
84impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
85 pub fn new(socket: T, cfg: Config, mode: Mode) -> Self {
86 Self {
87 inner: ConnectionState::Active(Active::new(socket, cfg, mode)),
88 }
89 }
90
91 pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
95 loop {
96 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
97 ConnectionState::Active(mut active) => match active.poll_new_outbound(cx) {
98 Poll::Ready(Ok(stream)) => {
99 self.inner = ConnectionState::Active(active);
100 return Poll::Ready(Ok(stream));
101 }
102 Poll::Pending => {
103 self.inner = ConnectionState::Active(active);
104 return Poll::Pending;
105 }
106 Poll::Ready(Err(e)) => {
107 self.inner = ConnectionState::Cleanup(active.cleanup(e));
108 continue;
109 }
110 },
111 ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) {
112 Poll::Ready(Ok(())) => {
113 self.inner = ConnectionState::Closed;
114 return Poll::Ready(Err(ConnectionError::Closed));
115 }
116 Poll::Ready(Err(e)) => {
117 self.inner = ConnectionState::Closed;
118 return Poll::Ready(Err(e));
119 }
120 Poll::Pending => {
121 self.inner = ConnectionState::Closing(inner);
122 return Poll::Pending;
123 }
124 },
125 ConnectionState::Cleanup(mut inner) => match inner.poll_unpin(cx) {
126 Poll::Ready(e) => {
127 self.inner = ConnectionState::Closed;
128 return Poll::Ready(Err(e));
129 }
130 Poll::Pending => {
131 self.inner = ConnectionState::Cleanup(inner);
132 return Poll::Pending;
133 }
134 },
135 ConnectionState::Closed => {
136 self.inner = ConnectionState::Closed;
137 return Poll::Ready(Err(ConnectionError::Closed));
138 }
139 ConnectionState::Poisoned => unreachable!(),
140 }
141 }
142 }
143
144 pub fn poll_next_inbound(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
148 loop {
149 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
150 ConnectionState::Active(mut active) => match active.poll(cx) {
151 Poll::Ready(Ok(stream)) => {
152 self.inner = ConnectionState::Active(active);
153 return Poll::Ready(Some(Ok(stream)));
154 }
155 Poll::Ready(Err(e)) => {
156 self.inner = ConnectionState::Cleanup(active.cleanup(e));
157 continue;
158 }
159 Poll::Pending => {
160 self.inner = ConnectionState::Active(active);
161 return Poll::Pending;
162 }
163 },
164 ConnectionState::Closing(mut closing) => match closing.poll_unpin(cx) {
165 Poll::Ready(Ok(())) => {
166 self.inner = ConnectionState::Closed;
167 return Poll::Ready(None);
168 }
169 Poll::Ready(Err(e)) => {
170 self.inner = ConnectionState::Closed;
171 return Poll::Ready(Some(Err(e)));
172 }
173 Poll::Pending => {
174 self.inner = ConnectionState::Closing(closing);
175 return Poll::Pending;
176 }
177 },
178 ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
179 Poll::Ready(ConnectionError::Closed) => {
180 self.inner = ConnectionState::Closed;
181 return Poll::Ready(None);
182 }
183 Poll::Ready(other) => {
184 self.inner = ConnectionState::Closed;
185 return Poll::Ready(Some(Err(other)));
186 }
187 Poll::Pending => {
188 self.inner = ConnectionState::Cleanup(cleanup);
189 return Poll::Pending;
190 }
191 },
192 ConnectionState::Closed => {
193 self.inner = ConnectionState::Closed;
194 return Poll::Ready(None);
195 }
196 ConnectionState::Poisoned => unreachable!(),
197 }
198 }
199 }
200
201 pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
203 loop {
204 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
205 ConnectionState::Active(active) => {
206 self.inner = ConnectionState::Closing(active.close());
207 }
208 ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) {
209 Poll::Ready(Ok(())) => {
210 self.inner = ConnectionState::Closed;
211 }
212 Poll::Ready(Err(e)) => {
213 log::warn!("Failure while closing connection: {e}");
214 self.inner = ConnectionState::Closed;
215 return Poll::Ready(Err(e));
216 }
217 Poll::Pending => {
218 self.inner = ConnectionState::Closing(inner);
219 return Poll::Pending;
220 }
221 },
222 ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
223 Poll::Ready(reason) => {
224 log::warn!("Failure while closing connection: {reason}");
225 self.inner = ConnectionState::Closed;
226 return Poll::Ready(Ok(()));
227 }
228 Poll::Pending => {
229 self.inner = ConnectionState::Cleanup(cleanup);
230 return Poll::Pending;
231 }
232 },
233 ConnectionState::Closed => {
234 self.inner = ConnectionState::Closed;
235 return Poll::Ready(Ok(()));
236 }
237 ConnectionState::Poisoned => {
238 unreachable!()
239 }
240 }
241 }
242 }
243}
244
245impl<T> Drop for Connection<T> {
246 fn drop(&mut self) {
247 match &mut self.inner {
248 ConnectionState::Active(active) => active.drop_all_streams(),
249 ConnectionState::Closing(_) => {}
250 ConnectionState::Cleanup(_) => {}
251 ConnectionState::Closed => {}
252 ConnectionState::Poisoned => {}
253 }
254 }
255}
256
257enum ConnectionState<T> {
258 Active(Active<T>),
260 Closing(Closing<T>),
262 Cleanup(Cleanup),
264 Closed,
266 Poisoned,
268}
269
270impl<T> fmt::Debug for ConnectionState<T> {
271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272 match self {
273 ConnectionState::Active(_) => write!(f, "Active"),
274 ConnectionState::Closing(_) => write!(f, "Closing"),
275 ConnectionState::Cleanup(_) => write!(f, "Cleanup"),
276 ConnectionState::Closed => write!(f, "Closed"),
277 ConnectionState::Poisoned => write!(f, "Poisoned"),
278 }
279 }
280}
281
282struct Active<T> {
284 id: Id,
285 mode: Mode,
286 config: Arc<Config>,
287 socket: Fuse<frame::Io<T>>,
288 next_id: u32,
289
290 streams: IntMap<StreamId, Arc<Mutex<stream::Shared>>>,
291 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
292 no_streams_waker: Option<Waker>,
293
294 pending_read_frame: Option<Frame<()>>,
295 pending_write_frame: Option<Frame<()>>,
296 new_outbound_stream_waker: Option<Waker>,
297
298 rtt: rtt::Rtt,
299
300 accumulated_max_stream_windows: Arc<Mutex<usize>>,
305}
306#[derive(Debug)]
308pub(crate) enum StreamCommand {
309 SendFrame(Frame<Either<Data, WindowUpdate>>),
311 CloseStream { ack: bool },
313}
314
315#[derive(Debug)]
317pub(crate) enum Action {
318 None,
320 New(Stream),
322 Ping(Frame<Ping>),
324 Terminate(Frame<GoAway>),
326}
327
328impl<T> fmt::Debug for Active<T> {
329 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
330 f.debug_struct("Connection")
331 .field("id", &self.id)
332 .field("mode", &self.mode)
333 .field("streams", &self.streams.len())
334 .field("next_id", &self.next_id)
335 .finish()
336 }
337}
338
339impl<T> fmt::Display for Active<T> {
340 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
341 write!(
342 f,
343 "(Connection {} {:?} (streams {}))",
344 self.id,
345 self.mode,
346 self.streams.len()
347 )
348 }
349}
350
351impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
352 fn new(socket: T, cfg: Config, mode: Mode) -> Self {
354 let id = Id::random();
355 log::debug!("new connection: {id} ({mode:?})");
356 let socket = frame::Io::new(id, socket).fuse();
357 Active {
358 id,
359 mode,
360 config: Arc::new(cfg),
361 socket,
362 streams: IntMap::default(),
363 stream_receivers: SelectAll::default(),
364 no_streams_waker: None,
365 next_id: match mode {
366 Mode::Client => 1,
367 Mode::Server => 2,
368 },
369 pending_read_frame: None,
370 pending_write_frame: None,
371 new_outbound_stream_waker: None,
372 rtt: rtt::Rtt::new(),
373 accumulated_max_stream_windows: Default::default(),
374 }
375 }
376
377 fn close(self) -> Closing<T> {
379 let pending_frames = self
380 .pending_read_frame
381 .into_iter()
382 .chain(self.pending_write_frame)
383 .collect::<VecDeque<Frame<()>>>();
384 Closing::new(self.stream_receivers, pending_frames, self.socket)
385 }
386
387 fn cleanup(mut self, error: ConnectionError) -> Cleanup {
391 self.drop_all_streams();
392
393 Cleanup::new(self.stream_receivers, error)
394 }
395
396 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
397 loop {
398 if self.socket.poll_ready_unpin(cx).is_ready() {
399 if let Some(frame) = self.rtt.next_ping() {
403 self.socket.start_send_unpin(frame.into())?;
404 continue;
405 }
406
407 if let Some(frame) = self
410 .pending_read_frame
411 .take()
412 .or_else(|| self.pending_write_frame.take())
413 {
414 self.socket.start_send_unpin(frame)?;
415 continue;
416 }
417 }
418
419 match self.socket.poll_flush_unpin(cx)? {
420 Poll::Ready(()) => {}
421 Poll::Pending => {}
422 }
423
424 if self.pending_write_frame.is_none() {
425 match self.stream_receivers.poll_next_unpin(cx) {
426 Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
427 log::trace!(
428 "{}/{}: sending: {}",
429 self.id,
430 frame.header().stream_id(),
431 frame.header()
432 );
433 self.pending_write_frame.replace(frame.into());
434 continue;
435 }
436 Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
437 log::trace!("{}/{}: sending close", self.id, id);
438 self.pending_write_frame
439 .replace(Frame::close_stream(id, ack).into());
440 continue;
441 }
442 Poll::Ready(Some((id, None))) => {
443 if let Some(frame) = self.on_drop_stream(id) {
444 log::trace!("{}/{}: sending: {}", self.id, id, frame.header());
445 self.pending_write_frame.replace(frame);
446 };
447 continue;
448 }
449 Poll::Ready(None) => {
450 self.no_streams_waker = Some(cx.waker().clone());
451 }
452 Poll::Pending => {}
453 }
454 }
455
456 if self.pending_read_frame.is_none() {
457 match self.socket.poll_next_unpin(cx) {
458 Poll::Ready(Some(frame)) => {
459 match self.on_frame(frame?)? {
460 Action::None => {}
461 Action::New(stream) => {
462 log::trace!("{}: new inbound {} of {}", self.id, stream, self);
463 return Poll::Ready(Ok(stream));
464 }
465 Action::Ping(f) => {
466 log::trace!("{}/{}: pong", self.id, f.header().stream_id());
467 self.pending_read_frame.replace(f.into());
468 }
469 Action::Terminate(f) => {
470 log::trace!("{}: sending term", self.id);
471 self.pending_read_frame.replace(f.into());
472 }
473 }
474 continue;
475 }
476 Poll::Ready(None) => {
477 return Poll::Ready(Err(ConnectionError::Closed));
478 }
479 Poll::Pending => {}
480 }
481 }
482
483 return Poll::Pending;
485 }
486 }
487
488 fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
489 if self.streams.len() >= self.config.max_num_streams {
490 log::error!("{}: maximum number of streams reached", self.id);
491 return Poll::Ready(Err(ConnectionError::TooManyStreams));
492 }
493
494 if self.ack_backlog() >= MAX_ACK_BACKLOG {
495 log::debug!("{MAX_ACK_BACKLOG} streams waiting for ACK, registering task for wake-up until remote acknowledges at least one stream");
496 self.new_outbound_stream_waker = Some(cx.waker().clone());
497 return Poll::Pending;
498 }
499
500 log::trace!("{}: creating new outbound stream", self.id);
501
502 let id = self.next_stream_id()?;
503 let stream = self.make_new_outbound_stream(id);
504
505 log::debug!("{}: new outbound {} of {}", self.id, stream, self);
506 self.streams.insert(id, stream.clone_shared());
507
508 Poll::Ready(Ok(stream))
509 }
510
511 fn on_drop_stream(&mut self, stream_id: StreamId) -> Option<Frame<()>> {
512 let s = self.streams.remove(&stream_id).expect("stream not found");
513
514 log::trace!("{}: removing dropped stream {}", self.id, stream_id);
515 let frame = {
516 let mut shared = s.lock();
517 let frame = match shared.update_state(self.id, stream_id, State::Closed) {
518 State::Open { .. } => {
521 let mut header = Header::data(stream_id, 0);
522 header.rst();
523 Some(Frame::new(header))
524 }
525 State::RecvClosed => {
529 let mut header = Header::data(stream_id, 0);
530 header.fin();
531 Some(Frame::new(header))
532 }
533 State::SendClosed => {
537 None
544 }
545 State::Closed => None,
548 };
549 if let Some(w) = shared.reader.take() {
550 w.wake()
551 }
552 if let Some(w) = shared.writer.take() {
553 w.wake()
554 }
555 frame
556 };
557 frame.map(Into::into)
558 }
559
560 fn on_frame(&mut self, frame: Frame<()>) -> Result<Action> {
567 log::trace!("{}: received: {}", self.id, frame.header());
568
569 if frame.header().flags().contains(header::ACK)
570 && matches!(frame.header().tag(), Tag::Data | Tag::WindowUpdate)
571 {
572 let id = frame.header().stream_id();
573 if let Some(stream) = self.streams.get(&id) {
574 stream
575 .lock()
576 .update_state(self.id, id, State::Open { acknowledged: true });
577 }
578 if let Some(waker) = self.new_outbound_stream_waker.take() {
579 waker.wake();
580 }
581 }
582
583 let action = match frame.header().tag() {
584 Tag::Data => self.on_data(frame.into_data()),
585 Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()),
586 Tag::Ping => self.on_ping(&frame.into_ping()),
587 Tag::GoAway => return Err(ConnectionError::Closed),
588 };
589 Ok(action)
590 }
591
592 fn on_data(&mut self, frame: Frame<Data>) -> Action {
593 let stream_id = frame.header().stream_id();
594
595 if frame.header().flags().contains(header::RST) {
596 if let Some(s) = self.streams.get_mut(&stream_id) {
598 let mut shared = s.lock();
599 shared.update_state(self.id, stream_id, State::Closed);
600 if let Some(w) = shared.reader.take() {
601 w.wake()
602 }
603 if let Some(w) = shared.writer.take() {
604 w.wake()
605 }
606 }
607 return Action::None;
608 }
609
610 let is_finish = frame.header().flags().contains(header::FIN); if frame.header().flags().contains(header::SYN) {
613 if !self.is_valid_remote_id(stream_id, Tag::Data) {
615 log::error!("{}: invalid stream id {}", self.id, stream_id);
616 return Action::Terminate(Frame::protocol_error());
617 }
618 if frame.body().len() > DEFAULT_CREDIT as usize {
619 log::error!(
620 "{}/{}: 1st body of stream exceeds default credit",
621 self.id,
622 stream_id
623 );
624 return Action::Terminate(Frame::protocol_error());
625 }
626 if self.streams.contains_key(&stream_id) {
627 log::error!("{}/{}: stream already exists", self.id, stream_id);
628 return Action::Terminate(Frame::protocol_error());
629 }
630 if self.streams.len() == self.config.max_num_streams {
631 log::error!("{}: maximum number of streams reached", self.id);
632 return Action::Terminate(Frame::internal_error());
633 }
634 let stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
635 {
636 let mut shared = stream.shared();
637 if is_finish {
638 shared.update_state(self.id, stream_id, State::RecvClosed);
639 }
640 shared.consume_receive_window(frame.body_len());
641 shared.buffer.push(frame.into_body());
642 }
643 self.streams.insert(stream_id, stream.clone_shared());
644 return Action::New(stream);
645 }
646
647 if let Some(s) = self.streams.get_mut(&stream_id) {
648 let mut shared = s.lock();
649 if frame.body_len() > shared.receive_window() {
650 log::error!(
651 "{}/{}: frame body larger than window of stream",
652 self.id,
653 stream_id
654 );
655 return Action::Terminate(Frame::protocol_error());
656 }
657 if is_finish {
658 shared.update_state(self.id, stream_id, State::RecvClosed);
659 }
660 shared.consume_receive_window(frame.body_len());
661 shared.buffer.push(frame.into_body());
662 if let Some(w) = shared.reader.take() {
663 w.wake()
664 }
665 } else {
666 log::trace!(
667 "{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}",
668 self.id,
669 stream_id,
670 frame
671 );
672 }
680
681 Action::None
682 }
683
684 fn on_window_update(&mut self, frame: &Frame<WindowUpdate>) -> Action {
685 let stream_id = frame.header().stream_id();
686
687 if frame.header().flags().contains(header::RST) {
688 if let Some(s) = self.streams.get_mut(&stream_id) {
690 let mut shared = s.lock();
691 shared.update_state(self.id, stream_id, State::Closed);
692 if let Some(w) = shared.reader.take() {
693 w.wake()
694 }
695 if let Some(w) = shared.writer.take() {
696 w.wake()
697 }
698 }
699 return Action::None;
700 }
701
702 let is_finish = frame.header().flags().contains(header::FIN); if frame.header().flags().contains(header::SYN) {
705 if !self.is_valid_remote_id(stream_id, Tag::WindowUpdate) {
707 log::error!("{}: invalid stream id {}", self.id, stream_id);
708 return Action::Terminate(Frame::protocol_error());
709 }
710 if self.streams.contains_key(&stream_id) {
711 log::error!("{}/{}: stream already exists", self.id, stream_id);
712 return Action::Terminate(Frame::protocol_error());
713 }
714 if self.streams.len() == self.config.max_num_streams {
715 log::error!("{}: maximum number of streams reached", self.id);
716 return Action::Terminate(Frame::protocol_error());
717 }
718
719 let credit = frame.header().credit() + DEFAULT_CREDIT;
720 let stream = self.make_new_inbound_stream(stream_id, credit);
721
722 if is_finish {
723 stream
724 .shared()
725 .update_state(self.id, stream_id, State::RecvClosed);
726 }
727 self.streams.insert(stream_id, stream.clone_shared());
728 return Action::New(stream);
729 }
730
731 if let Some(s) = self.streams.get_mut(&stream_id) {
732 let mut shared = s.lock();
733 shared.increase_send_window_by(frame.header().credit());
734 if is_finish {
735 shared.update_state(self.id, stream_id, State::RecvClosed);
736
737 if let Some(w) = shared.reader.take() {
738 w.wake()
739 }
740 }
741 if let Some(w) = shared.writer.take() {
742 w.wake()
743 }
744 } else {
745 log::trace!(
746 "{}/{}: window update for unknown stream, possibly dropped earlier: {:?}",
747 self.id,
748 stream_id,
749 frame
750 );
751 }
759
760 Action::None
761 }
762
763 fn on_ping(&mut self, frame: &Frame<Ping>) -> Action {
764 let stream_id = frame.header().stream_id();
765 if frame.header().flags().contains(header::ACK) {
766 return self.rtt.handle_pong(frame.nonce());
767 }
768 if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
769 let mut hdr = Header::ping(frame.header().nonce());
770 hdr.ack();
771 return Action::Ping(Frame::new(hdr));
772 }
773 log::debug!(
774 "{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
775 self.id,
776 stream_id,
777 frame
778 );
779 Action::None
787 }
788
789 fn make_new_inbound_stream(&mut self, id: StreamId, credit: u32) -> Stream {
790 let config = self.config.clone();
791
792 let (sender, receiver) = mpsc::channel(10); self.stream_receivers.push(TaggedStream::new(id, receiver));
794 if let Some(waker) = self.no_streams_waker.take() {
795 waker.wake();
796 }
797
798 Stream::new_inbound(
799 id,
800 self.id,
801 config,
802 credit,
803 sender,
804 self.rtt.clone(),
805 self.accumulated_max_stream_windows.clone(),
806 )
807 }
808
809 fn make_new_outbound_stream(&mut self, id: StreamId) -> Stream {
810 let config = self.config.clone();
811
812 let (sender, receiver) = mpsc::channel(10); self.stream_receivers.push(TaggedStream::new(id, receiver));
814 if let Some(waker) = self.no_streams_waker.take() {
815 waker.wake();
816 }
817
818 Stream::new_outbound(
819 id,
820 self.id,
821 config,
822 sender,
823 self.rtt.clone(),
824 self.accumulated_max_stream_windows.clone(),
825 )
826 }
827
828 fn next_stream_id(&mut self) -> Result<StreamId> {
829 let proposed = StreamId::new(self.next_id);
830 self.next_id = self
831 .next_id
832 .checked_add(2)
833 .ok_or(ConnectionError::NoMoreStreamIds)?;
834 match self.mode {
835 Mode::Client => assert!(proposed.is_client()),
836 Mode::Server => assert!(proposed.is_server()),
837 }
838 Ok(proposed)
839 }
840
841 fn ack_backlog(&mut self) -> usize {
843 self.streams
844 .iter()
845 .filter(|(id, _)| match self.mode {
853 Mode::Client => id.is_client(),
854 Mode::Server => id.is_server(),
855 })
856 .filter(|(_, s)| s.lock().is_pending_ack())
857 .count()
858 }
859
860 fn is_valid_remote_id(&self, id: StreamId, tag: Tag) -> bool {
862 if tag == Tag::Ping || tag == Tag::GoAway {
863 return id.is_session();
864 }
865 match self.mode {
866 Mode::Client => id.is_server(),
867 Mode::Server => id.is_client(),
868 }
869 }
870}
871
872impl<T> Active<T> {
873 fn drop_all_streams(&mut self) {
875 for (id, s) in self.streams.drain() {
876 let mut shared = s.lock();
877 shared.update_state(self.id, id, State::Closed);
878 if let Some(w) = shared.reader.take() {
879 w.wake()
880 }
881 if let Some(w) = shared.writer.take() {
882 w.wake()
883 }
884 }
885 }
886}