1mod cleanup;
84mod closing;
85mod stream;
86
87use crate::yamux::{
88 error::ConnectionError,
89 frame::{
90 self,
91 header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
92 Frame,
93 },
94 tagged_stream::TaggedStream,
95 Config, Result, WindowUpdateMode, DEFAULT_CREDIT, MAX_ACK_BACKLOG,
96};
97use cleanup::Cleanup;
98use closing::Closing;
99use futures::{
100 channel::mpsc,
101 future::Either,
102 prelude::*,
103 sink::SinkExt,
104 stream::{Fuse, SelectAll},
105};
106use nohash_hasher::IntMap;
107use parking_lot::Mutex;
108use std::{
109 collections::VecDeque,
110 fmt,
111 sync::Arc,
112 task::{Context, Poll, Waker},
113};
114
115pub use stream::{Packet, State, Stream};
116
117const LOG_TARGET: &str = "litep2p::yamux";
119
120#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
122pub enum Mode {
123 Client,
125 Server,
127}
128
129#[derive(Clone, Copy)]
133pub(crate) struct Id(u32);
134
135impl Id {
136 pub(crate) fn random() -> Self {
138 Id(rand::random())
139 }
140}
141
142impl fmt::Debug for Id {
143 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144 write!(f, "{:08x}", self.0)
145 }
146}
147
148impl fmt::Display for Id {
149 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
150 write!(f, "{:08x}", self.0)
151 }
152}
153
154#[derive(Debug)]
155pub struct Connection<T> {
156 inner: ConnectionState<T>,
157}
158
159impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
160 pub fn new(socket: T, cfg: Config, mode: Mode) -> Self {
161 Self {
162 inner: ConnectionState::Active(Active::new(socket, cfg, mode)),
163 }
164 }
165
166 pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
170 loop {
171 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
172 ConnectionState::Active(mut active) => match active.poll_new_outbound(cx) {
173 Poll::Ready(Ok(stream)) => {
174 self.inner = ConnectionState::Active(active);
175 return Poll::Ready(Ok(stream));
176 }
177 Poll::Pending => {
178 self.inner = ConnectionState::Active(active);
179 return Poll::Pending;
180 }
181 Poll::Ready(Err(e)) => {
182 self.inner = ConnectionState::Cleanup(active.cleanup(e));
183 continue;
184 }
185 },
186 ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) {
187 Poll::Ready(Ok(())) => {
188 self.inner = ConnectionState::Closed;
189 return Poll::Ready(Err(ConnectionError::Closed));
190 }
191 Poll::Ready(Err(e)) => {
192 self.inner = ConnectionState::Closed;
193 return Poll::Ready(Err(e));
194 }
195 Poll::Pending => {
196 self.inner = ConnectionState::Closing(inner);
197 return Poll::Pending;
198 }
199 },
200 ConnectionState::Cleanup(mut inner) => match inner.poll_unpin(cx) {
201 Poll::Ready(e) => {
202 self.inner = ConnectionState::Closed;
203 return Poll::Ready(Err(e));
204 }
205 Poll::Pending => {
206 self.inner = ConnectionState::Cleanup(inner);
207 return Poll::Pending;
208 }
209 },
210 ConnectionState::Closed => {
211 self.inner = ConnectionState::Closed;
212 return Poll::Ready(Err(ConnectionError::Closed));
213 }
214 ConnectionState::Poisoned => unreachable!(),
215 }
216 }
217 }
218
219 pub fn poll_next_inbound(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Stream>>> {
223 loop {
224 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
225 ConnectionState::Active(mut active) => match active.poll(cx) {
226 Poll::Ready(Ok(stream)) => {
227 self.inner = ConnectionState::Active(active);
228 return Poll::Ready(Some(Ok(stream)));
229 }
230 Poll::Ready(Err(e)) => {
231 self.inner = ConnectionState::Cleanup(active.cleanup(e));
232 continue;
233 }
234 Poll::Pending => {
235 self.inner = ConnectionState::Active(active);
236 return Poll::Pending;
237 }
238 },
239 ConnectionState::Closing(mut closing) => match closing.poll_unpin(cx) {
240 Poll::Ready(Ok(())) => {
241 self.inner = ConnectionState::Closed;
242 return Poll::Ready(None);
243 }
244 Poll::Ready(Err(e)) => {
245 self.inner = ConnectionState::Closed;
246 return Poll::Ready(Some(Err(e)));
247 }
248 Poll::Pending => {
249 self.inner = ConnectionState::Closing(closing);
250 return Poll::Pending;
251 }
252 },
253 ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
254 Poll::Ready(ConnectionError::Closed) => {
255 self.inner = ConnectionState::Closed;
256 return Poll::Ready(None);
257 }
258 Poll::Ready(other) => {
259 self.inner = ConnectionState::Closed;
260 return Poll::Ready(Some(Err(other)));
261 }
262 Poll::Pending => {
263 self.inner = ConnectionState::Cleanup(cleanup);
264 return Poll::Pending;
265 }
266 },
267 ConnectionState::Closed => {
268 self.inner = ConnectionState::Closed;
269 return Poll::Ready(None);
270 }
271 ConnectionState::Poisoned => unreachable!(),
272 }
273 }
274 }
275
276 pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
278 loop {
279 match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
280 ConnectionState::Active(active) => {
281 self.inner = ConnectionState::Closing(active.close());
282 }
283 ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx)? {
284 Poll::Ready(()) => {
285 self.inner = ConnectionState::Closed;
286 }
287 Poll::Pending => {
288 self.inner = ConnectionState::Closing(inner);
289 return Poll::Pending;
290 }
291 },
292 ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) {
293 Poll::Ready(reason) => {
294 tracing::warn!(target: LOG_TARGET, "Failure while closing connection: {}", reason);
295 self.inner = ConnectionState::Closed;
296 return Poll::Ready(Ok(()));
297 }
298 Poll::Pending => {
299 self.inner = ConnectionState::Cleanup(cleanup);
300 return Poll::Pending;
301 }
302 },
303 ConnectionState::Closed => {
304 self.inner = ConnectionState::Closed;
305 return Poll::Ready(Ok(()));
306 }
307 ConnectionState::Poisoned => {
308 unreachable!()
309 }
310 }
311 }
312 }
313}
314
315impl<T> Drop for Connection<T> {
316 fn drop(&mut self) {
317 match &mut self.inner {
318 ConnectionState::Active(active) => active.drop_all_streams(),
319 ConnectionState::Closing(_) => {}
320 ConnectionState::Cleanup(_) => {}
321 ConnectionState::Closed => {}
322 ConnectionState::Poisoned => {}
323 }
324 }
325}
326
327enum ConnectionState<T> {
328 Active(Active<T>),
330 Closing(Closing<T>),
332 Cleanup(Cleanup),
334 Closed,
336 Poisoned,
339}
340
341impl<T> fmt::Debug for ConnectionState<T> {
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 match self {
344 ConnectionState::Active(_) => write!(f, "Active"),
345 ConnectionState::Closing(_) => write!(f, "Closing"),
346 ConnectionState::Cleanup(_) => write!(f, "Cleanup"),
347 ConnectionState::Closed => write!(f, "Closed"),
348 ConnectionState::Poisoned => write!(f, "Poisoned"),
349 }
350 }
351}
352
353struct Active<T> {
359 id: Id,
360 mode: Mode,
361 config: Arc<Config>,
362 socket: Fuse<frame::Io<T>>,
363 next_id: u32,
364
365 streams: IntMap<StreamId, Arc<Mutex<stream::Shared>>>,
366 stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
367 no_streams_waker: Option<Waker>,
368
369 pending_frames: VecDeque<Frame<()>>,
370 new_outbound_stream_waker: Option<Waker>,
371}
372
373#[derive(Debug)]
375pub(crate) enum StreamCommand {
376 SendFrame(Frame<Either<Data, WindowUpdate>>),
378 CloseStream { ack: bool },
380}
381
382#[derive(Debug)]
384enum Action {
385 None,
387 New(Stream, Option<Frame<WindowUpdate>>),
389 Update(Frame<WindowUpdate>),
391 Ping(Frame<Ping>),
393 Reset(Frame<Data>),
395 Terminate(Frame<GoAway>),
397}
398
399impl<T> fmt::Debug for Active<T> {
400 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401 f.debug_struct("Connection")
402 .field("id", &self.id)
403 .field("mode", &self.mode)
404 .field("streams", &self.streams.len())
405 .field("next_id", &self.next_id)
406 .finish()
407 }
408}
409
410impl<T> fmt::Display for Active<T> {
411 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
412 write!(
413 f,
414 "(Connection {} {:?} (streams {}))",
415 self.id,
416 self.mode,
417 self.streams.len()
418 )
419 }
420}
421
422impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
423 fn new(socket: T, cfg: Config, mode: Mode) -> Self {
425 let id = Id::random();
426 tracing::debug!(target: LOG_TARGET, "new connection: {} ({:?})", id, mode);
427 let socket = frame::Io::new(id, socket, cfg.max_buffer_size).fuse();
428 Active {
429 id,
430 mode,
431 config: Arc::new(cfg),
432 socket,
433 streams: IntMap::default(),
434 stream_receivers: SelectAll::default(),
435 no_streams_waker: None,
436 next_id: match mode {
437 Mode::Client => 1,
438 Mode::Server => 2,
439 },
440 pending_frames: VecDeque::default(),
441 new_outbound_stream_waker: None,
442 }
443 }
444
445 fn close(self) -> Closing<T> {
447 Closing::new(self.stream_receivers, self.pending_frames, self.socket)
448 }
449
450 fn cleanup(mut self, error: ConnectionError) -> Cleanup {
454 self.drop_all_streams();
455
456 Cleanup::new(self.stream_receivers, error)
457 }
458
459 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
460 loop {
461 if self.socket.poll_ready_unpin(cx).is_ready() {
462 if let Some(frame) = self.pending_frames.pop_front() {
463 self.socket.start_send_unpin(frame)?;
464 continue;
465 }
466 }
467
468 match self.socket.poll_flush_unpin(cx)? {
469 Poll::Ready(()) => {}
470 Poll::Pending => {}
471 }
472
473 match self.stream_receivers.poll_next_unpin(cx) {
474 Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
475 self.on_send_frame(frame);
476 continue;
477 }
478 Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
479 self.on_close_stream(id, ack);
480 continue;
481 }
482 Poll::Ready(Some((id, None))) => {
483 self.on_drop_stream(id);
484 continue;
485 }
486 Poll::Ready(None) => {
487 self.no_streams_waker = Some(cx.waker().clone());
488 }
489 Poll::Pending => {}
490 }
491
492 match self.socket.poll_next_unpin(cx) {
493 Poll::Ready(Some(frame)) => {
494 if let Some(stream) = self.on_frame(frame?)? {
495 return Poll::Ready(Ok(stream));
496 }
497 continue;
498 }
499 Poll::Ready(None) => {
500 return Poll::Ready(Err(ConnectionError::Closed));
501 }
502 Poll::Pending => {}
503 }
504
505 return Poll::Pending;
507 }
508 }
509
510 fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
511 if self.streams.len() >= self.config.max_num_streams {
512 tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id);
513 return Poll::Ready(Err(ConnectionError::TooManyStreams));
514 }
515
516 if self.ack_backlog() >= MAX_ACK_BACKLOG {
517 tracing::debug!(target: LOG_TARGET, "{MAX_ACK_BACKLOG} streams waiting for ACK, registering task for wake-up until remote acknowledges at least one stream");
518 self.new_outbound_stream_waker = Some(cx.waker().clone());
519 return Poll::Pending;
520 }
521
522 tracing::trace!(target: LOG_TARGET, "{}: creating new outbound stream", self.id);
523
524 let id = self.next_stream_id()?;
525 let extra_credit = self.config.receive_window - DEFAULT_CREDIT;
526
527 if extra_credit > 0 {
528 let mut frame = Frame::window_update(id, extra_credit);
529 frame.header_mut().syn();
530 tracing::trace!(target: LOG_TARGET, "{}/{}: sending initial {}", self.id, id, frame.header());
531 self.pending_frames.push_back(frame.into());
532 }
533
534 let mut stream = self.make_new_outbound_stream(id, self.config.receive_window);
535
536 if extra_credit == 0 {
537 stream.set_flag(stream::Flag::Syn)
538 }
539
540 tracing::debug!(target: LOG_TARGET, "{}: new outbound {} of {}", self.id, stream, self);
541 self.streams.insert(id, stream.clone_shared());
542
543 Poll::Ready(Ok(stream))
544 }
545
546 fn on_send_frame(&mut self, frame: Frame<Either<Data, WindowUpdate>>) {
547 tracing::trace!(target: LOG_TARGET,
548 "{}/{}: sending: {}",
549 self.id,
550 frame.header().stream_id(),
551 frame.header()
552 );
553 self.pending_frames.push_back(frame.into());
554 }
555
556 fn on_close_stream(&mut self, id: StreamId, ack: bool) {
557 tracing::trace!(target: LOG_TARGET, "{}/{}: sending close", self.id, id);
558 self.pending_frames.push_back(Frame::close_stream(id, ack).into());
559 }
560
561 fn on_drop_stream(&mut self, stream_id: StreamId) {
562 let s = self.streams.remove(&stream_id).expect("stream not found");
563
564 tracing::trace!(target: LOG_TARGET, "{}: removing dropped stream {}", self.id, stream_id);
565 let frame = {
566 let mut shared = s.lock();
567 let frame = match shared.update_state(self.id, stream_id, State::Closed) {
568 State::Open { .. } => {
571 let mut header = Header::data(stream_id, 0);
572 header.rst();
573 Some(Frame::new(header))
574 }
575 State::RecvClosed => {
579 let mut header = Header::data(stream_id, 0);
580 header.fin();
581 Some(Frame::new(header))
582 }
583 State::SendClosed => {
587 if self.config.window_update_mode == WindowUpdateMode::OnRead
588 && shared.window == 0
589 {
590 let mut header = Header::data(stream_id, 0);
593 header.rst();
594 Some(Frame::new(header))
595 } else {
596 None
603 }
604 }
605 State::Closed => None,
608 };
609 if let Some(w) = shared.reader.take() {
610 w.wake()
611 }
612 if let Some(w) = shared.writer.take() {
613 w.wake()
614 }
615 frame
616 };
617 if let Some(f) = frame {
618 tracing::trace!(target: LOG_TARGET, "{}/{}: sending: {}", self.id, stream_id, f.header());
619 self.pending_frames.push_back(f.into());
620 }
621 }
622
623 fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
630 tracing::trace!(target: LOG_TARGET, "{}: received: {}", self.id, frame.header());
631
632 if frame.header().flags().contains(header::ACK) {
633 let id = frame.header().stream_id();
634 if let Some(stream) = self.streams.get(&id) {
635 stream.lock().update_state(self.id, id, State::Open { acknowledged: true });
636 }
637 if let Some(waker) = self.new_outbound_stream_waker.take() {
638 waker.wake();
639 }
640 }
641
642 let action = match frame.header().tag() {
643 Tag::Data => self.on_data(frame.into_data()),
644 Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()),
645 Tag::Ping => self.on_ping(&frame.into_ping()),
646 Tag::GoAway => return Err(ConnectionError::Closed),
647 };
648 match action {
649 Action::None => {}
650 Action::New(stream, update) => {
651 tracing::trace!(target: LOG_TARGET, "{}: new inbound {} of {}", self.id, stream, self);
652 if let Some(f) = update {
653 tracing::trace!(target: LOG_TARGET, "{}/{}: sending update", self.id, f.header().stream_id());
654 self.pending_frames.push_back(f.into());
655 }
656 return Ok(Some(stream));
657 }
658 Action::Update(f) => {
659 tracing::trace!(target: LOG_TARGET, "{}: sending update: {:?}", self.id, f.header());
660 self.pending_frames.push_back(f.into());
661 }
662 Action::Ping(f) => {
663 tracing::trace!(target: LOG_TARGET, "{}/{}: pong", self.id, f.header().stream_id());
664 self.pending_frames.push_back(f.into());
665 }
666 Action::Reset(f) => {
667 tracing::trace!(target: LOG_TARGET, "{}/{}: sending reset", self.id, f.header().stream_id());
668 self.pending_frames.push_back(f.into());
669 }
670 Action::Terminate(f) => {
671 tracing::trace!(target: LOG_TARGET, "{}: sending term", self.id);
672 self.pending_frames.push_back(f.into());
673 }
674 }
675
676 Ok(None)
677 }
678
679 fn on_data(&mut self, frame: Frame<Data>) -> Action {
680 let stream_id = frame.header().stream_id();
681
682 if frame.header().flags().contains(header::RST) {
683 if let Some(s) = self.streams.get_mut(&stream_id) {
685 let mut shared = s.lock();
686 shared.update_state(self.id, stream_id, State::Closed);
687 if let Some(w) = shared.reader.take() {
688 w.wake()
689 }
690 if let Some(w) = shared.writer.take() {
691 w.wake()
692 }
693 }
694 return Action::None;
695 }
696
697 let is_finish = frame.header().flags().contains(header::FIN); if frame.header().flags().contains(header::SYN) {
700 if !self.is_valid_remote_id(stream_id, Tag::Data) {
702 tracing::error!(target: LOG_TARGET, "{}: invalid stream id {}", self.id, stream_id);
703 return Action::Terminate(Frame::protocol_error());
704 }
705 if frame.body().len() > DEFAULT_CREDIT as usize {
706 tracing::error!(target: LOG_TARGET,
707 "{}/{}: 1st body of stream exceeds default credit",
708 self.id,
709 stream_id
710 );
711 return Action::Terminate(Frame::protocol_error());
712 }
713 if self.streams.contains_key(&stream_id) {
714 tracing::error!(target: LOG_TARGET, "{}/{}: stream already exists", self.id, stream_id);
715 return Action::Terminate(Frame::protocol_error());
716 }
717 if self.streams.len() == self.config.max_num_streams {
718 tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id);
719 return Action::Terminate(Frame::internal_error());
720 }
721 let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
722 let mut window_update = None;
723 {
724 let mut shared = stream.shared();
725 if is_finish {
726 shared.update_state(self.id, stream_id, State::RecvClosed);
727 }
728 shared.window = shared.window.saturating_sub(frame.body_len());
729 shared.buffer.push(frame.into_body());
730
731 if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
732 if let Some(credit) = shared.next_window_update() {
733 shared.window += credit;
734 let mut frame = Frame::window_update(stream_id, credit);
735 frame.header_mut().ack();
736 window_update = Some(frame)
737 }
738 }
739 }
740 if window_update.is_none() {
741 stream.set_flag(stream::Flag::Ack)
742 }
743 self.streams.insert(stream_id, stream.clone_shared());
744 return Action::New(stream, window_update);
745 }
746
747 if let Some(s) = self.streams.get_mut(&stream_id) {
748 let mut shared = s.lock();
749 if frame.body().len() > shared.window as usize {
750 tracing::error!(target: LOG_TARGET,
751 "{}/{}: frame body larger than window of stream",
752 self.id,
753 stream_id
754 );
755 return Action::Terminate(Frame::protocol_error());
756 }
757 if is_finish {
758 shared.update_state(self.id, stream_id, State::RecvClosed);
759 }
760 let max_buffer_size = self.config.max_buffer_size;
761 if shared.buffer.len() >= max_buffer_size {
762 tracing::error!(target: LOG_TARGET,
763 "{}/{}: buffer of stream grows beyond limit",
764 self.id,
765 stream_id
766 );
767 let mut header = Header::data(stream_id, 0);
768 header.rst();
769 return Action::Reset(Frame::new(header));
770 }
771 shared.window = shared.window.saturating_sub(frame.body_len());
772 shared.buffer.push(frame.into_body());
773 if let Some(w) = shared.reader.take() {
774 w.wake()
775 }
776 if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
777 if let Some(credit) = shared.next_window_update() {
778 shared.window += credit;
779 let frame = Frame::window_update(stream_id, credit);
780 return Action::Update(frame);
781 }
782 }
783 } else {
784 tracing::trace!(target: LOG_TARGET,
785 "{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}",
786 self.id,
787 stream_id,
788 frame
789 );
790 }
798
799 Action::None
800 }
801
802 fn on_window_update(&mut self, frame: &Frame<WindowUpdate>) -> Action {
803 let stream_id = frame.header().stream_id();
804
805 if frame.header().flags().contains(header::RST) {
806 if let Some(s) = self.streams.get_mut(&stream_id) {
808 let mut shared = s.lock();
809 shared.update_state(self.id, stream_id, State::Closed);
810 if let Some(w) = shared.reader.take() {
811 w.wake()
812 }
813 if let Some(w) = shared.writer.take() {
814 w.wake()
815 }
816 }
817 return Action::None;
818 }
819
820 let is_finish = frame.header().flags().contains(header::FIN); if frame.header().flags().contains(header::SYN) {
823 if !self.is_valid_remote_id(stream_id, Tag::WindowUpdate) {
825 tracing::error!(target: LOG_TARGET, "{}: invalid stream id {}", self.id, stream_id);
826 return Action::Terminate(Frame::protocol_error());
827 }
828 if self.streams.contains_key(&stream_id) {
829 tracing::error!(target: LOG_TARGET, "{}/{}: stream already exists", self.id, stream_id);
830 return Action::Terminate(Frame::protocol_error());
831 }
832 if self.streams.len() == self.config.max_num_streams {
833 tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id);
834 return Action::Terminate(Frame::protocol_error());
835 }
836
837 let credit = frame.header().credit() + DEFAULT_CREDIT;
838 let mut stream = self.make_new_inbound_stream(stream_id, credit);
839 stream.set_flag(stream::Flag::Ack);
840
841 if is_finish {
842 stream.shared().update_state(self.id, stream_id, State::RecvClosed);
843 }
844 self.streams.insert(stream_id, stream.clone_shared());
845 return Action::New(stream, None);
846 }
847
848 if let Some(s) = self.streams.get_mut(&stream_id) {
849 let mut shared = s.lock();
850 shared.credit += frame.header().credit();
851 if is_finish {
852 shared.update_state(self.id, stream_id, State::RecvClosed);
853 }
854 if let Some(w) = shared.writer.take() {
855 w.wake()
856 }
857 } else {
858 tracing::trace!(target: LOG_TARGET,
859 "{}/{}: window update for unknown stream, possibly dropped earlier: {:?}",
860 self.id,
861 stream_id,
862 frame
863 );
864 }
872
873 Action::None
874 }
875
876 fn on_ping(&mut self, frame: &Frame<Ping>) -> Action {
877 let stream_id = frame.header().stream_id();
878 if frame.header().flags().contains(header::ACK) {
879 return Action::None;
881 }
882 if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
883 let mut hdr = Header::ping(frame.header().nonce());
884 hdr.ack();
885 return Action::Ping(Frame::new(hdr));
886 }
887 tracing::trace!(target: LOG_TARGET,
888 "{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
889 self.id,
890 stream_id,
891 frame
892 );
893 Action::None
901 }
902
903 fn make_new_inbound_stream(&mut self, id: StreamId, credit: u32) -> Stream {
904 let config = self.config.clone();
905
906 let (sender, receiver) = mpsc::channel(10); self.stream_receivers.push(TaggedStream::new(id, receiver));
908 if let Some(waker) = self.no_streams_waker.take() {
909 waker.wake();
910 }
911
912 Stream::new_inbound(id, self.id, config, credit, sender)
913 }
914
915 fn make_new_outbound_stream(&mut self, id: StreamId, window: u32) -> Stream {
916 let config = self.config.clone();
917
918 let (sender, receiver) = mpsc::channel(10); self.stream_receivers.push(TaggedStream::new(id, receiver));
920 if let Some(waker) = self.no_streams_waker.take() {
921 waker.wake();
922 }
923
924 Stream::new_outbound(id, self.id, config, window, sender)
925 }
926
927 fn next_stream_id(&mut self) -> Result<StreamId> {
928 let proposed = StreamId::new(self.next_id);
929 self.next_id = self.next_id.checked_add(2).ok_or(ConnectionError::NoMoreStreamIds)?;
930 match self.mode {
931 Mode::Client => assert!(proposed.is_client()),
932 Mode::Server => assert!(proposed.is_server()),
933 }
934 Ok(proposed)
935 }
936
937 fn ack_backlog(&mut self) -> usize {
940 self.streams
941 .iter()
942 .filter(|(id, _)| match self.mode {
950 Mode::Client => id.is_client(),
951 Mode::Server => id.is_server(),
952 })
953 .filter(|(_, s)| s.lock().is_pending_ack())
954 .count()
955 }
956
957 fn is_valid_remote_id(&self, id: StreamId, tag: Tag) -> bool {
959 if tag == Tag::Ping || tag == Tag::GoAway {
960 return id.is_session();
961 }
962 match self.mode {
963 Mode::Client => id.is_server(),
964 Mode::Server => id.is_client(),
965 }
966 }
967}
968
969impl<T> Active<T> {
970 fn drop_all_streams(&mut self) {
972 for (id, s) in self.streams.drain() {
973 let mut shared = s.lock();
974 shared.update_state(self.id, id, State::Closed);
975 if let Some(w) = shared.reader.take() {
976 w.wake()
977 }
978 if let Some(w) = shared.writer.take() {
979 w.wake()
980 }
981 }
982 }
983}