1use std::error::Error as StdError;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use futures_util::ready;
9use h2::server::{Connection, Handshake, SendResponse};
10use h2::{Reason, RecvStream};
11use http::{Method, Request};
12use pin_project_lite::pin_project;
13
14use super::{ping, PipeToSendStream, SendBuf};
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::common::date;
17use crate::common::io::Compat;
18use crate::common::time::Time;
19use crate::ext::Protocol;
20use crate::headers;
21use crate::proto::h2::ping::Recorder;
22use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
23use crate::proto::Dispatched;
24use crate::rt::bounds::Http2ServerConnExec;
25use crate::rt::{Read, Write};
26use crate::service::HttpService;
27
28use crate::upgrade::{OnUpgrade, Pending, Upgraded};
29use crate::Response;
30
31const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
43
44#[derive(Clone, Debug)]
45pub(crate) struct Config {
46 pub(crate) adaptive_window: bool,
47 pub(crate) initial_conn_window_size: u32,
48 pub(crate) initial_stream_window_size: u32,
49 pub(crate) max_frame_size: u32,
50 pub(crate) enable_connect_protocol: bool,
51 pub(crate) max_concurrent_streams: Option<u32>,
52 pub(crate) max_pending_accept_reset_streams: Option<usize>,
53 pub(crate) max_local_error_reset_streams: Option<usize>,
54 pub(crate) keep_alive_interval: Option<Duration>,
55 pub(crate) keep_alive_timeout: Duration,
56 pub(crate) max_send_buffer_size: usize,
57 pub(crate) max_header_list_size: u32,
58 pub(crate) date_header: bool,
59}
60
61impl Default for Config {
62 fn default() -> Config {
63 Config {
64 adaptive_window: false,
65 initial_conn_window_size: DEFAULT_CONN_WINDOW,
66 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
67 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
68 enable_connect_protocol: false,
69 max_concurrent_streams: Some(200),
70 max_pending_accept_reset_streams: None,
71 max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
72 keep_alive_interval: None,
73 keep_alive_timeout: Duration::from_secs(20),
74 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
75 max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
76 date_header: true,
77 }
78 }
79}
80
81pin_project! {
82 pub(crate) struct Server<T, S, B, E>
83 where
84 S: HttpService<IncomingBody>,
85 B: Body,
86 {
87 exec: E,
88 timer: Time,
89 service: S,
90 state: State<T, B>,
91 date_header: bool,
92 }
93}
94
95enum State<T, B>
96where
97 B: Body,
98{
99 Handshaking {
100 ping_config: ping::Config,
101 hs: Handshake<Compat<T>, SendBuf<B::Data>>,
102 },
103 Serving(Serving<T, B>),
104 Closed,
105}
106
107struct Serving<T, B>
108where
109 B: Body,
110{
111 ping: Option<(ping::Recorder, ping::Ponger)>,
112 conn: Connection<Compat<T>, SendBuf<B::Data>>,
113 closing: Option<crate::Error>,
114 date_header: bool,
115}
116
117impl<T, S, B, E> Server<T, S, B, E>
118where
119 T: Read + Write + Unpin,
120 S: HttpService<IncomingBody, ResBody = B>,
121 S::Error: Into<Box<dyn StdError + Send + Sync>>,
122 B: Body + 'static,
123 E: Http2ServerConnExec<S::Future, B>,
124{
125 pub(crate) fn new(
126 io: T,
127 service: S,
128 config: &Config,
129 exec: E,
130 timer: Time,
131 ) -> Server<T, S, B, E> {
132 let mut builder = h2::server::Builder::default();
133 builder
134 .initial_window_size(config.initial_stream_window_size)
135 .initial_connection_window_size(config.initial_conn_window_size)
136 .max_frame_size(config.max_frame_size)
137 .max_header_list_size(config.max_header_list_size)
138 .max_local_error_reset_streams(config.max_pending_accept_reset_streams)
139 .max_send_buffer_size(config.max_send_buffer_size);
140 if let Some(max) = config.max_concurrent_streams {
141 builder.max_concurrent_streams(max);
142 }
143 if let Some(max) = config.max_pending_accept_reset_streams {
144 builder.max_pending_accept_reset_streams(max);
145 }
146 if config.enable_connect_protocol {
147 builder.enable_connect_protocol();
148 }
149 let handshake = builder.handshake(Compat::new(io));
150
151 let bdp = if config.adaptive_window {
152 Some(config.initial_stream_window_size)
153 } else {
154 None
155 };
156
157 let ping_config = ping::Config {
158 bdp_initial_window: bdp,
159 keep_alive_interval: config.keep_alive_interval,
160 keep_alive_timeout: config.keep_alive_timeout,
161 keep_alive_while_idle: true,
164 };
165
166 Server {
167 exec,
168 timer,
169 state: State::Handshaking {
170 ping_config,
171 hs: handshake,
172 },
173 service,
174 date_header: config.date_header,
175 }
176 }
177
178 pub(crate) fn graceful_shutdown(&mut self) {
179 trace!("graceful_shutdown");
180 match self.state {
181 State::Handshaking { .. } => {
182 }
184 State::Serving(ref mut srv) => {
185 if srv.closing.is_none() {
186 srv.conn.graceful_shutdown();
187 }
188 return;
189 }
190 State::Closed => {
191 return;
192 }
193 }
194 self.state = State::Closed;
195 }
196}
197
198impl<T, S, B, E> Future for Server<T, S, B, E>
199where
200 T: Read + Write + Unpin,
201 S: HttpService<IncomingBody, ResBody = B>,
202 S::Error: Into<Box<dyn StdError + Send + Sync>>,
203 B: Body + 'static,
204 E: Http2ServerConnExec<S::Future, B>,
205{
206 type Output = crate::Result<Dispatched>;
207
208 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
209 let me = &mut *self;
210 loop {
211 let next = match me.state {
212 State::Handshaking {
213 ref mut hs,
214 ref ping_config,
215 } => {
216 let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
217 let ping = if ping_config.is_enabled() {
218 let pp = conn.ping_pong().expect("conn.ping_pong");
219 Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
220 } else {
221 None
222 };
223 State::Serving(Serving {
224 ping,
225 conn,
226 closing: None,
227 date_header: me.date_header,
228 })
229 }
230 State::Serving(ref mut srv) => {
231 ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
232 return Poll::Ready(Ok(Dispatched::Shutdown));
233 }
234 State::Closed => {
235 return Poll::Ready(Ok(Dispatched::Shutdown));
238 }
239 };
240 me.state = next;
241 }
242 }
243}
244
245impl<T, B> Serving<T, B>
246where
247 T: Read + Write + Unpin,
248 B: Body + 'static,
249{
250 fn poll_server<S, E>(
251 &mut self,
252 cx: &mut Context<'_>,
253 service: &mut S,
254 exec: &mut E,
255 ) -> Poll<crate::Result<()>>
256 where
257 S: HttpService<IncomingBody, ResBody = B>,
258 S::Error: Into<Box<dyn StdError + Send + Sync>>,
259 E: Http2ServerConnExec<S::Future, B>,
260 {
261 if self.closing.is_none() {
262 loop {
263 self.poll_ping(cx);
264
265 match ready!(self.conn.poll_accept(cx)) {
266 Some(Ok((req, mut respond))) => {
267 trace!("incoming request");
268 let content_length = headers::content_length_parse_all(req.headers());
269 let ping = self
270 .ping
271 .as_ref()
272 .map(|ping| ping.0.clone())
273 .unwrap_or_else(ping::disabled);
274
275 ping.record_non_data();
277
278 let is_connect = req.method() == Method::CONNECT;
279 let (mut parts, stream) = req.into_parts();
280 let (mut req, connect_parts) = if !is_connect {
281 (
282 Request::from_parts(
283 parts,
284 IncomingBody::h2(stream, content_length.into(), ping),
285 ),
286 None,
287 )
288 } else {
289 if content_length.map_or(false, |len| len != 0) {
290 warn!("h2 connect request with non-zero body not supported");
291 respond.send_reset(h2::Reason::INTERNAL_ERROR);
292 return Poll::Ready(Ok(()));
293 }
294 let (pending, upgrade) = crate::upgrade::pending();
295 debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
296 parts.extensions.insert(upgrade);
297 (
298 Request::from_parts(parts, IncomingBody::empty()),
299 Some(ConnectParts {
300 pending,
301 ping,
302 recv_stream: stream,
303 }),
304 )
305 };
306
307 if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
308 req.extensions_mut().insert(Protocol::from_inner(protocol));
309 }
310
311 let fut = H2Stream::new(
312 service.call(req),
313 connect_parts,
314 respond,
315 self.date_header,
316 );
317
318 exec.execute_h2stream(fut);
319 }
320 Some(Err(e)) => {
321 return Poll::Ready(Err(crate::Error::new_h2(e)));
322 }
323 None => {
324 if let Some((ref ping, _)) = self.ping {
326 ping.ensure_not_timed_out()?;
327 }
328
329 trace!("incoming connection complete");
330 return Poll::Ready(Ok(()));
331 }
332 }
333 }
334 }
335
336 debug_assert!(
337 self.closing.is_some(),
338 "poll_server broke loop without closing"
339 );
340
341 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
342
343 Poll::Ready(Err(self.closing.take().expect("polled after error")))
344 }
345
346 fn poll_ping(&mut self, cx: &mut Context<'_>) {
347 if let Some((_, ref mut estimator)) = self.ping {
348 match estimator.poll(cx) {
349 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
350 self.conn.set_target_window_size(wnd);
351 let _ = self.conn.set_initial_window_size(wnd);
352 }
353 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
354 debug!("keep-alive timed out, closing connection");
355 self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
356 }
357 Poll::Pending => {}
358 }
359 }
360 }
361}
362
363pin_project! {
364 #[allow(missing_debug_implementations)]
365 pub struct H2Stream<F, B>
366 where
367 B: Body,
368 {
369 reply: SendResponse<SendBuf<B::Data>>,
370 #[pin]
371 state: H2StreamState<F, B>,
372 date_header: bool,
373 }
374}
375
376pin_project! {
377 #[project = H2StreamStateProj]
378 enum H2StreamState<F, B>
379 where
380 B: Body,
381 {
382 Service {
383 #[pin]
384 fut: F,
385 connect_parts: Option<ConnectParts>,
386 },
387 Body {
388 #[pin]
389 pipe: PipeToSendStream<B>,
390 },
391 }
392}
393
394struct ConnectParts {
395 pending: Pending,
396 ping: Recorder,
397 recv_stream: RecvStream,
398}
399
400impl<F, B> H2Stream<F, B>
401where
402 B: Body,
403{
404 fn new(
405 fut: F,
406 connect_parts: Option<ConnectParts>,
407 respond: SendResponse<SendBuf<B::Data>>,
408 date_header: bool,
409 ) -> H2Stream<F, B> {
410 H2Stream {
411 reply: respond,
412 state: H2StreamState::Service { fut, connect_parts },
413 date_header,
414 }
415 }
416}
417
418macro_rules! reply {
419 ($me:expr, $res:expr, $eos:expr) => {{
420 match $me.reply.send_response($res, $eos) {
421 Ok(tx) => tx,
422 Err(e) => {
423 debug!("send response error: {}", e);
424 $me.reply.send_reset(Reason::INTERNAL_ERROR);
425 return Poll::Ready(Err(crate::Error::new_h2(e)));
426 }
427 }
428 }};
429}
430
431impl<F, B, E> H2Stream<F, B>
432where
433 F: Future<Output = Result<Response<B>, E>>,
434 B: Body,
435 B::Data: 'static,
436 B::Error: Into<Box<dyn StdError + Send + Sync>>,
437 E: Into<Box<dyn StdError + Send + Sync>>,
438{
439 fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
440 let mut me = self.project();
441 loop {
442 let next = match me.state.as_mut().project() {
443 H2StreamStateProj::Service {
444 fut: h,
445 connect_parts,
446 } => {
447 let res = match h.poll(cx) {
448 Poll::Ready(Ok(r)) => r,
449 Poll::Pending => {
450 if let Poll::Ready(reason) =
453 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
454 {
455 debug!("stream received RST_STREAM: {:?}", reason);
456 return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
457 }
458 return Poll::Pending;
459 }
460 Poll::Ready(Err(e)) => {
461 let err = crate::Error::new_user_service(e);
462 warn!("http2 service errored: {}", err);
463 me.reply.send_reset(err.h2_reason());
464 return Poll::Ready(Err(err));
465 }
466 };
467
468 let (head, body) = res.into_parts();
469 let mut res = ::http::Response::from_parts(head, ());
470 super::strip_connection_headers(res.headers_mut(), false);
471
472 if *me.date_header {
474 res.headers_mut()
475 .entry(::http::header::DATE)
476 .or_insert_with(date::update_and_header_value);
477 }
478
479 if let Some(connect_parts) = connect_parts.take() {
480 if res.status().is_success() {
481 if headers::content_length_parse_all(res.headers())
482 .map_or(false, |len| len != 0)
483 {
484 warn!("h2 successful response to CONNECT request with body not supported");
485 me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
486 return Poll::Ready(Err(crate::Error::new_user_header()));
487 }
488 let send_stream = reply!(me, res, false);
489 connect_parts.pending.fulfill(Upgraded::new(
490 H2Upgraded {
491 ping: connect_parts.ping,
492 recv_stream: connect_parts.recv_stream,
493 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
494 buf: Bytes::new(),
495 },
496 Bytes::new(),
497 ));
498 return Poll::Ready(Ok(()));
499 }
500 }
501
502 if !body.is_end_stream() {
503 if let Some(len) = body.size_hint().exact() {
505 headers::set_content_length_if_missing(res.headers_mut(), len);
506 }
507
508 let body_tx = reply!(me, res, false);
509 H2StreamState::Body {
510 pipe: PipeToSendStream::new(body, body_tx),
511 }
512 } else {
513 reply!(me, res, true);
514 return Poll::Ready(Ok(()));
515 }
516 }
517 H2StreamStateProj::Body { pipe } => {
518 return pipe.poll(cx);
519 }
520 };
521 me.state.set(next);
522 }
523 }
524}
525
526impl<F, B, E> Future for H2Stream<F, B>
527where
528 F: Future<Output = Result<Response<B>, E>>,
529 B: Body,
530 B::Data: 'static,
531 B::Error: Into<Box<dyn StdError + Send + Sync>>,
532 E: Into<Box<dyn StdError + Send + Sync>>,
533{
534 type Output = ();
535
536 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
537 self.poll2(cx).map(|res| {
538 if let Err(_e) = res {
539 debug!("stream error: {}", _e);
540 }
541 })
542 }
543}