1use super::*;
2use crate::codec::UserError;
3use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto;
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::Instant;
12
13#[derive(Debug)]
14pub(super) struct Recv {
15 init_window_sz: WindowSize,
17
18 flow: FlowControl,
20
21 in_flight_data: WindowSize,
23
24 next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27 last_processed_id: StreamId,
29
30 max_stream_id: StreamId,
38
39 pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42 pending_accept: store::Queue<stream::NextAccept>,
44
45 pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48 reset_duration: Duration,
50
51 buffer: Buffer<Event>,
53
54 refused: Option<StreamId>,
56
57 is_push_enabled: bool,
59
60 is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66 Headers(peer::PollMessage),
67 Data(Bytes),
68 Trailers(HeaderMap),
69}
70
71#[derive(Debug)]
72pub(super) enum RecvHeaderBlockError<T> {
73 Oversize(T),
74 State(Error),
75}
76
77#[derive(Debug)]
78pub(crate) enum Open {
79 PushPromise,
80 Headers,
81}
82
83impl Recv {
84 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
85 let next_stream_id = if peer.is_server() { 1 } else { 2 };
86
87 let mut flow = FlowControl::new();
88
89 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
92 .expect("invalid initial remote window size");
93 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
94
95 Recv {
96 init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
97 flow,
98 in_flight_data: 0 as WindowSize,
99 next_stream_id: Ok(next_stream_id.into()),
100 pending_window_updates: store::Queue::new(),
101 last_processed_id: StreamId::ZERO,
102 max_stream_id: StreamId::MAX,
103 pending_accept: store::Queue::new(),
104 pending_reset_expired: store::Queue::new(),
105 reset_duration: config.local_reset_duration,
106 buffer: Buffer::new(),
107 refused: None,
108 is_push_enabled: config.local_push_enabled,
109 is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
110 }
111 }
112
113 pub fn init_window_sz(&self) -> WindowSize {
115 self.init_window_sz
116 }
117
118 pub fn last_processed_id(&self) -> StreamId {
120 self.last_processed_id
121 }
122
123 pub fn open(
127 &mut self,
128 id: StreamId,
129 mode: Open,
130 counts: &mut Counts,
131 ) -> Result<Option<StreamId>, Error> {
132 assert!(self.refused.is_none());
133
134 counts.peer().ensure_can_open(id, mode)?;
135
136 let next_id = self.next_stream_id()?;
137 if id < next_id {
138 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
139 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140 }
141
142 self.next_stream_id = id.next_id();
143
144 if !counts.can_inc_num_recv_streams() {
145 self.refused = Some(id);
146 return Ok(None);
147 }
148
149 Ok(Some(id))
150 }
151
152 pub fn recv_headers(
156 &mut self,
157 frame: frame::Headers,
158 stream: &mut store::Ptr,
159 counts: &mut Counts,
160 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
161 tracing::trace!("opening stream; init_window={}", self.init_window_sz);
162 let is_initial = stream.state.recv_open(&frame)?;
163
164 if is_initial {
165 if frame.stream_id() > self.last_processed_id {
167 self.last_processed_id = frame.stream_id();
168 }
169
170 counts.inc_num_recv_streams(stream);
172 }
173
174 if !stream.content_length.is_head() {
175 use super::stream::ContentLength;
176 use http::header;
177
178 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
179 let content_length = match frame::parse_u64(content_length.as_bytes()) {
180 Ok(v) => v,
181 Err(_) => {
182 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
183 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
184 }
185 };
186
187 stream.content_length = ContentLength::Remaining(content_length);
188 }
189 }
190
191 if frame.is_over_size() {
192 tracing::debug!(
204 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
205 recv_headers: frame is over size; stream={:?}",
206 stream.id
207 );
208 return if counts.peer().is_server() && is_initial {
209 let mut res = frame::Headers::new(
210 stream.id,
211 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
212 HeaderMap::new(),
213 );
214 res.set_end_stream();
215 Err(RecvHeaderBlockError::Oversize(Some(res)))
216 } else {
217 Err(RecvHeaderBlockError::Oversize(None))
218 };
219 }
220
221 let stream_id = frame.stream_id();
222 let (pseudo, fields) = frame.into_parts();
223
224 if pseudo.protocol.is_some()
225 && counts.peer().is_server()
226 && !self.is_extended_connect_protocol_enabled
227 {
228 proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
229 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
230 }
231
232 if pseudo.status.is_some() && counts.peer().is_server() {
233 proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
234 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
235 }
236
237 if !pseudo.is_informational() {
238 let message = counts
239 .peer()
240 .convert_poll_message(pseudo, fields, stream_id)?;
241
242 stream
244 .pending_recv
245 .push_back(&mut self.buffer, Event::Headers(message));
246 stream.notify_recv();
247
248 if counts.peer().is_server() {
251 self.pending_accept.push(stream);
254 }
255 }
256
257 Ok(())
258 }
259
260 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
267 use super::peer::PollMessage::*;
268
269 match stream.pending_recv.pop_front(&mut self.buffer) {
270 Some(Event::Headers(Server(request))) => request,
271 _ => unreachable!("server stream queue must start with Headers"),
272 }
273 }
274
275 pub fn poll_pushed(
277 &mut self,
278 cx: &Context,
279 stream: &mut store::Ptr,
280 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
281 use super::peer::PollMessage::*;
282
283 let mut ppp = stream.pending_push_promises.take();
284 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
285 match pushed.pending_recv.pop_front(&mut self.buffer) {
286 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
287 _ => panic!("Headers not set on pushed stream"),
290 }
291 });
292 stream.pending_push_promises = ppp;
293 if let Some(p) = pushed {
294 Poll::Ready(Some(Ok(p)))
295 } else {
296 let is_open = stream.state.ensure_recv_open()?;
297
298 if is_open {
299 stream.push_task = Some(cx.waker().clone());
300 Poll::Pending
301 } else {
302 Poll::Ready(None)
303 }
304 }
305 }
306
307 pub fn poll_response(
309 &mut self,
310 cx: &Context,
311 stream: &mut store::Ptr,
312 ) -> Poll<Result<Response<()>, proto::Error>> {
313 use super::peer::PollMessage::*;
314
315 match stream.pending_recv.pop_front(&mut self.buffer) {
318 Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
319 Some(_) => panic!("poll_response called after response returned"),
320 None => {
321 if !stream.state.ensure_recv_open()? {
322 proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
323 return Poll::Ready(Err(Error::library_reset(
324 stream.id,
325 Reason::PROTOCOL_ERROR,
326 )));
327 }
328
329 stream.recv_task = Some(cx.waker().clone());
330 Poll::Pending
331 }
332 }
333 }
334
335 pub fn recv_trailers(
337 &mut self,
338 frame: frame::Headers,
339 stream: &mut store::Ptr,
340 ) -> Result<(), Error> {
341 stream.state.recv_close()?;
343
344 if stream.ensure_content_length_zero().is_err() {
345 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id);
346 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
347 }
348
349 let trailers = frame.into_fields();
350
351 stream
353 .pending_recv
354 .push_back(&mut self.buffer, Event::Trailers(trailers));
355 stream.notify_recv();
356
357 Ok(())
358 }
359
360 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
362 tracing::trace!(
363 "release_connection_capacity; size={}, connection in_flight_data={}",
364 capacity,
365 self.in_flight_data,
366 );
367
368 self.in_flight_data -= capacity;
370
371 let _res = self.flow.assign_capacity(capacity);
374 debug_assert!(_res.is_ok());
375
376 if self.flow.unclaimed_capacity().is_some() {
377 if let Some(task) = task.take() {
378 task.wake();
379 }
380 }
381 }
382
383 pub fn release_capacity(
385 &mut self,
386 capacity: WindowSize,
387 stream: &mut store::Ptr,
388 task: &mut Option<Waker>,
389 ) -> Result<(), UserError> {
390 tracing::trace!("release_capacity; size={}", capacity);
391
392 if capacity > stream.in_flight_recv_data {
393 return Err(UserError::ReleaseCapacityTooBig);
394 }
395
396 self.release_connection_capacity(capacity, task);
397
398 stream.in_flight_recv_data -= capacity;
400
401 let _res = stream.recv_flow.assign_capacity(capacity);
404 debug_assert!(_res.is_ok());
405
406 if stream.recv_flow.unclaimed_capacity().is_some() {
407 self.pending_window_updates.push(stream);
409
410 if let Some(task) = task.take() {
411 task.wake();
412 }
413 }
414
415 Ok(())
416 }
417
418 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
420 debug_assert_eq!(stream.ref_count, 0);
421
422 if stream.in_flight_recv_data == 0 {
423 return;
424 }
425
426 tracing::trace!(
427 "auto-release closed stream ({:?}) capacity: {:?}",
428 stream.id,
429 stream.in_flight_recv_data,
430 );
431
432 self.release_connection_capacity(stream.in_flight_recv_data, task);
433 stream.in_flight_recv_data = 0;
434
435 self.clear_recv_buffer(stream);
436 }
437
438 pub fn set_target_connection_window(
451 &mut self,
452 target: WindowSize,
453 task: &mut Option<Waker>,
454 ) -> Result<(), Reason> {
455 tracing::trace!(
456 "set_target_connection_window; target={}; available={}, reserved={}",
457 target,
458 self.flow.available(),
459 self.in_flight_data,
460 );
461
462 let current = self
468 .flow
469 .available()
470 .add(self.in_flight_data)?
471 .checked_size();
472 if target > current {
473 self.flow.assign_capacity(target - current)?;
474 } else {
475 self.flow.claim_capacity(current - target)?;
476 }
477
478 if self.flow.unclaimed_capacity().is_some() {
482 if let Some(task) = task.take() {
483 task.wake();
484 }
485 }
486 Ok(())
487 }
488
489 pub(crate) fn apply_local_settings(
490 &mut self,
491 settings: &frame::Settings,
492 store: &mut Store,
493 ) -> Result<(), proto::Error> {
494 if let Some(val) = settings.is_extended_connect_protocol_enabled() {
495 self.is_extended_connect_protocol_enabled = val;
496 }
497
498 if let Some(target) = settings.initial_window_size() {
499 let old_sz = self.init_window_sz;
500 self.init_window_sz = target;
501
502 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
503
504 match target.cmp(&old_sz) {
521 Ordering::Less => {
522 let dec = old_sz - target;
524 tracing::trace!("decrementing all windows; dec={}", dec);
525
526 store.try_for_each(|mut stream| {
527 stream
528 .recv_flow
529 .dec_recv_window(dec)
530 .map_err(proto::Error::library_go_away)?;
531 Ok::<_, proto::Error>(())
532 })?;
533 }
534 Ordering::Greater => {
535 let inc = target - old_sz;
537 tracing::trace!("incrementing all windows; inc={}", inc);
538 store.try_for_each(|mut stream| {
539 stream
542 .recv_flow
543 .inc_window(inc)
544 .map_err(proto::Error::library_go_away)?;
545 stream
546 .recv_flow
547 .assign_capacity(inc)
548 .map_err(proto::Error::library_go_away)?;
549 Ok::<_, proto::Error>(())
550 })?;
551 }
552 Ordering::Equal => (),
553 }
554 }
555
556 Ok(())
557 }
558
559 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
560 if !stream.state.is_recv_closed() {
561 return false;
562 }
563
564 stream.pending_recv.is_empty()
565 }
566
567 pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
568 let sz = frame.payload().len();
569
570 assert!(sz <= MAX_WINDOW_SIZE as usize);
573
574 let sz = sz as WindowSize;
575
576 let is_ignoring_frame = stream.state.is_local_error();
577
578 if !is_ignoring_frame && !stream.state.is_recv_streaming() {
579 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
585 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
586 }
587
588 tracing::trace!(
589 "recv_data; size={}; connection={}; stream={}",
590 sz,
591 self.flow.window_size(),
592 stream.recv_flow.window_size()
593 );
594
595 if is_ignoring_frame {
596 tracing::trace!(
597 "recv_data; frame ignored on locally reset {:?} for some time",
598 stream.id,
599 );
600 return self.ignore_data(sz);
601 }
602
603 self.consume_connection_window(sz)?;
606
607 if stream.recv_flow.window_size() < sz {
608 return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
617 }
618
619 if stream.dec_content_length(frame.payload().len()).is_err() {
620 proto_err!(stream:
621 "recv_data: content-length overflow; stream={:?}; len={:?}",
622 stream.id,
623 frame.payload().len(),
624 );
625 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
626 }
627
628 if frame.is_end_stream() {
629 if stream.ensure_content_length_zero().is_err() {
630 proto_err!(stream:
631 "recv_data: content-length underflow; stream={:?}; len={:?}",
632 stream.id,
633 frame.payload().len(),
634 );
635 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
636 }
637
638 if stream.state.recv_close().is_err() {
639 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
640 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
641 }
642 }
643
644 if !stream.is_recv {
646 tracing::trace!(
647 "recv_data; frame ignored on stream release {:?} for some time",
648 stream.id,
649 );
650 self.release_connection_capacity(sz, &mut None);
651 return Ok(());
652 }
653
654 stream
656 .recv_flow
657 .send_data(sz)
658 .map_err(proto::Error::library_go_away)?;
659
660 stream.in_flight_recv_data += sz;
662
663 let event = Event::Data(frame.into_payload());
664
665 stream.pending_recv.push_back(&mut self.buffer, event);
667 stream.notify_recv();
668
669 Ok(())
670 }
671
672 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
673 self.consume_connection_window(sz)?;
675
676 self.release_connection_capacity(sz, &mut None);
685 Ok(())
686 }
687
688 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
689 if self.flow.window_size() < sz {
690 tracing::debug!(
691 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
692 self.flow.window_size(),
693 sz,
694 );
695 return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
696 }
697
698 self.flow.send_data(sz).map_err(Error::library_go_away)?;
700
701 self.in_flight_data += sz;
703 Ok(())
704 }
705
706 pub fn recv_push_promise(
707 &mut self,
708 frame: frame::PushPromise,
709 stream: &mut store::Ptr,
710 ) -> Result<(), Error> {
711 stream.state.reserve_remote()?;
712 if frame.is_over_size() {
713 tracing::debug!(
725 "stream error PROTOCOL_ERROR -- recv_push_promise: \
726 headers frame is over size; promised_id={:?};",
727 frame.promised_id(),
728 );
729 return Err(Error::library_reset(
730 frame.promised_id(),
731 Reason::PROTOCOL_ERROR,
732 ));
733 }
734
735 let promised_id = frame.promised_id();
736 let (pseudo, fields) = frame.into_parts();
737 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
738
739 if let Err(e) = frame::PushPromise::validate_request(&req) {
740 use PushPromiseHeaderError::*;
741 match e {
742 NotSafeAndCacheable => proto_err!(
743 stream:
744 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
745 req.method(),
746 promised_id,
747 ),
748 InvalidContentLength(e) => proto_err!(
749 stream:
750 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
751 e,
752 promised_id,
753 ),
754 }
755 return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
756 }
757
758 use super::peer::PollMessage::*;
759 stream
760 .pending_recv
761 .push_back(&mut self.buffer, Event::Headers(Server(req)));
762 stream.notify_recv();
763 stream.notify_push();
764 Ok(())
765 }
766
767 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
769 if let Ok(next) = self.next_stream_id {
770 if id >= next {
771 tracing::debug!(
772 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
773 id
774 );
775 return Err(Reason::PROTOCOL_ERROR);
776 }
777 }
778 Ok(())
781 }
782
783 pub fn recv_reset(
785 &mut self,
786 frame: frame::Reset,
787 stream: &mut Stream,
788 counts: &mut Counts,
789 ) -> Result<(), Error> {
790 if stream.is_pending_accept {
799 if counts.can_inc_num_remote_reset_streams() {
800 counts.inc_num_remote_reset_streams();
801 } else {
802 tracing::warn!(
803 "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
804 counts.max_remote_reset_streams(),
805 );
806 return Err(Error::library_go_away_data(
807 Reason::ENHANCE_YOUR_CALM,
808 "too_many_resets",
809 ));
810 }
811 }
812
813 stream.state.recv_reset(frame, stream.is_pending_send);
815
816 stream.notify_send();
817 stream.notify_recv();
818 stream.notify_push();
819
820 Ok(())
821 }
822
823 pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
825 stream.state.handle_error(err);
827
828 stream.notify_send();
830 stream.notify_recv();
831 stream.notify_push();
832 }
833
834 pub fn go_away(&mut self, last_processed_id: StreamId) {
835 assert!(self.max_stream_id >= last_processed_id);
836 self.max_stream_id = last_processed_id;
837 }
838
839 pub fn recv_eof(&mut self, stream: &mut Stream) {
840 stream.state.recv_eof();
841 stream.notify_send();
842 stream.notify_recv();
843 stream.notify_push();
844 }
845
846 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
847 while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
848 }
850 }
851
852 pub fn max_stream_id(&self) -> StreamId {
856 self.max_stream_id
857 }
858
859 pub fn next_stream_id(&self) -> Result<StreamId, Error> {
860 if let Ok(id) = self.next_stream_id {
861 Ok(id)
862 } else {
863 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
864 }
865 }
866
867 pub fn may_have_created_stream(&self, id: StreamId) -> bool {
868 if let Ok(next_id) = self.next_stream_id {
869 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
871 id < next_id
872 } else {
873 true
874 }
875 }
876
877 pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
878 if let Ok(next_id) = self.next_stream_id {
879 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
881 if id >= next_id {
882 self.next_stream_id = id.next_id();
883 }
884 }
885 }
886
887 pub fn ensure_can_reserve(&self) -> Result<(), Error> {
889 if !self.is_push_enabled {
890 proto_err!(conn: "recv_push_promise: push is disabled");
891 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
892 }
893
894 Ok(())
895 }
896
897 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
899 if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
900 return;
901 }
902
903 tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
904
905 if counts.can_inc_num_reset_streams() {
906 counts.inc_num_reset_streams();
907 self.pending_reset_expired.push(stream);
908 }
909 }
910
911 pub fn send_pending_refusal<T, B>(
913 &mut self,
914 cx: &mut Context,
915 dst: &mut Codec<T, Prioritized<B>>,
916 ) -> Poll<io::Result<()>>
917 where
918 T: AsyncWrite + Unpin,
919 B: Buf,
920 {
921 if let Some(stream_id) = self.refused {
922 ready!(dst.poll_ready(cx))?;
923
924 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
926
927 dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
929 }
930
931 self.refused = None;
932
933 Poll::Ready(Ok(()))
934 }
935
936 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
937 if !self.pending_reset_expired.is_empty() {
938 let now = Instant::now();
939 let reset_duration = self.reset_duration;
940 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
941 let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
942 now.saturating_duration_since(reset_at) > reset_duration
946 }) {
947 counts.transition_after(stream, true);
948 }
949 }
950 }
951
952 pub fn clear_queues(
953 &mut self,
954 clear_pending_accept: bool,
955 store: &mut Store,
956 counts: &mut Counts,
957 ) {
958 self.clear_stream_window_update_queue(store, counts);
959 self.clear_all_reset_streams(store, counts);
960
961 if clear_pending_accept {
962 self.clear_all_pending_accept(store, counts);
963 }
964 }
965
966 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
967 while let Some(stream) = self.pending_window_updates.pop(store) {
968 counts.transition(stream, |_, stream| {
969 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
970 })
971 }
972 }
973
974 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
976 while let Some(stream) = self.pending_reset_expired.pop(store) {
977 counts.transition_after(stream, true);
978 }
979 }
980
981 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
982 while let Some(stream) = self.pending_accept.pop(store) {
983 counts.transition_after(stream, false);
984 }
985 }
986
987 pub fn poll_complete<T, B>(
988 &mut self,
989 cx: &mut Context,
990 store: &mut Store,
991 counts: &mut Counts,
992 dst: &mut Codec<T, Prioritized<B>>,
993 ) -> Poll<io::Result<()>>
994 where
995 T: AsyncWrite + Unpin,
996 B: Buf,
997 {
998 ready!(self.send_connection_window_update(cx, dst))?;
1000
1001 ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1003
1004 Poll::Ready(Ok(()))
1005 }
1006
1007 fn send_connection_window_update<T, B>(
1009 &mut self,
1010 cx: &mut Context,
1011 dst: &mut Codec<T, Prioritized<B>>,
1012 ) -> Poll<io::Result<()>>
1013 where
1014 T: AsyncWrite + Unpin,
1015 B: Buf,
1016 {
1017 if let Some(incr) = self.flow.unclaimed_capacity() {
1018 let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1019
1020 ready!(dst.poll_ready(cx))?;
1022
1023 dst.buffer(frame.into())
1025 .expect("invalid WINDOW_UPDATE frame");
1026
1027 self.flow
1029 .inc_window(incr)
1030 .expect("unexpected flow control state");
1031 }
1032
1033 Poll::Ready(Ok(()))
1034 }
1035
1036 pub fn send_stream_window_updates<T, B>(
1038 &mut self,
1039 cx: &mut Context,
1040 store: &mut Store,
1041 counts: &mut Counts,
1042 dst: &mut Codec<T, Prioritized<B>>,
1043 ) -> Poll<io::Result<()>>
1044 where
1045 T: AsyncWrite + Unpin,
1046 B: Buf,
1047 {
1048 loop {
1049 ready!(dst.poll_ready(cx))?;
1051
1052 let stream = match self.pending_window_updates.pop(store) {
1054 Some(stream) => stream,
1055 None => return Poll::Ready(Ok(())),
1056 };
1057
1058 counts.transition(stream, |_, stream| {
1059 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1060 debug_assert!(!stream.is_pending_window_update);
1061
1062 if !stream.state.is_recv_streaming() {
1063 return;
1070 }
1071
1072 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1074 let frame = frame::WindowUpdate::new(stream.id, incr);
1076
1077 dst.buffer(frame.into())
1079 .expect("invalid WINDOW_UPDATE frame");
1080
1081 stream
1083 .recv_flow
1084 .inc_window(incr)
1085 .expect("unexpected flow control state");
1086 }
1087 })
1088 }
1089 }
1090
1091 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1092 self.pending_accept.pop(store).map(|ptr| ptr.key())
1093 }
1094
1095 pub fn poll_data(
1096 &mut self,
1097 cx: &Context,
1098 stream: &mut Stream,
1099 ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1100 match stream.pending_recv.pop_front(&mut self.buffer) {
1101 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1102 Some(event) => {
1103 stream.pending_recv.push_front(&mut self.buffer, event);
1105
1106 stream.notify_recv();
1115
1116 Poll::Ready(None)
1118 }
1119 None => self.schedule_recv(cx, stream),
1120 }
1121 }
1122
1123 pub fn poll_trailers(
1124 &mut self,
1125 cx: &Context,
1126 stream: &mut Stream,
1127 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1128 match stream.pending_recv.pop_front(&mut self.buffer) {
1129 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1130 Some(event) => {
1131 stream.pending_recv.push_front(&mut self.buffer, event);
1133
1134 Poll::Pending
1135 }
1136 None => self.schedule_recv(cx, stream),
1137 }
1138 }
1139
1140 fn schedule_recv<T>(
1141 &mut self,
1142 cx: &Context,
1143 stream: &mut Stream,
1144 ) -> Poll<Option<Result<T, proto::Error>>> {
1145 if stream.state.ensure_recv_open()? {
1146 stream.recv_task = Some(cx.waker().clone());
1148 Poll::Pending
1149 } else {
1150 Poll::Ready(None)
1152 }
1153 }
1154}
1155
1156impl Open {
1159 pub fn is_push_promise(&self) -> bool {
1160 matches!(*self, Self::PushPromise)
1161 }
1162}
1163
1164impl<T> From<Error> for RecvHeaderBlockError<T> {
1167 fn from(err: Error) -> Self {
1168 RecvHeaderBlockError::State(err)
1169 }
1170}