tungstenite/protocol/mod.rs
1//! Generic WebSocket message stream.
2
3pub mod frame;
4
5mod message;
6
7pub use self::{frame::CloseFrame, message::Message};
8
9use self::{
10 frame::{
11 coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode},
12 Frame, FrameCodec,
13 },
14 message::{IncompleteMessage, IncompleteMessageType},
15};
16use crate::{
17 error::{Error, ProtocolError, Result},
18 util::NonBlockingResult,
19};
20use log::*;
21use std::{
22 io::{ErrorKind as IoErrorKind, Read, Write},
23 mem::replace,
24};
25
26/// Indicates a Client or Server role of the websocket
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum Role {
29 /// This socket is a server
30 Server,
31 /// This socket is a client
32 Client,
33}
34
35/// The configuration for WebSocket connection.
36#[derive(Debug, Clone, Copy)]
37pub struct WebSocketConfig {
38 /// Does nothing, instead use `max_write_buffer_size`.
39 #[deprecated]
40 pub max_send_queue: Option<usize>,
41 /// The target minimum size of the write buffer to reach before writing the data
42 /// to the underlying stream.
43 /// The default value is 128 KiB.
44 ///
45 /// If set to `0` each message will be eagerly written to the underlying stream.
46 /// It is often more optimal to allow them to buffer a little, hence the default value.
47 ///
48 /// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
49 pub write_buffer_size: usize,
50 /// The max size of the write buffer in bytes. Setting this can provide backpressure
51 /// in the case the write buffer is filling up due to write errors.
52 /// The default value is unlimited.
53 ///
54 /// Note: The write buffer only builds up past [`write_buffer_size`](Self::write_buffer_size)
55 /// when writes to the underlying stream are failing. So the **write buffer can not
56 /// fill up if you are not observing write errors even if not flushing**.
57 ///
58 /// Note: Should always be at least [`write_buffer_size + 1 message`](Self::write_buffer_size)
59 /// and probably a little more depending on error handling strategy.
60 pub max_write_buffer_size: usize,
61 /// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
62 /// which should be reasonably big for all normal use-cases but small enough to prevent
63 /// memory eating by a malicious user.
64 pub max_message_size: Option<usize>,
65 /// The maximum size of a single message frame. `None` means no size limit. The limit is for
66 /// frame payload NOT including the frame header. The default value is 16 MiB which should
67 /// be reasonably big for all normal use-cases but small enough to prevent memory eating
68 /// by a malicious user.
69 pub max_frame_size: Option<usize>,
70 /// When set to `true`, the server will accept and handle unmasked frames
71 /// from the client. According to the RFC 6455, the server must close the
72 /// connection to the client in such cases, however it seems like there are
73 /// some popular libraries that are sending unmasked frames, ignoring the RFC.
74 /// By default this option is set to `false`, i.e. according to RFC 6455.
75 pub accept_unmasked_frames: bool,
76}
77
78impl Default for WebSocketConfig {
79 fn default() -> Self {
80 #[allow(deprecated)]
81 WebSocketConfig {
82 max_send_queue: None,
83 write_buffer_size: 128 * 1024,
84 max_write_buffer_size: usize::MAX,
85 max_message_size: Some(64 << 20),
86 max_frame_size: Some(16 << 20),
87 accept_unmasked_frames: false,
88 }
89 }
90}
91
92impl WebSocketConfig {
93 /// Panic if values are invalid.
94 pub(crate) fn assert_valid(&self) {
95 assert!(
96 self.max_write_buffer_size > self.write_buffer_size,
97 "WebSocketConfig::max_write_buffer_size must be greater than write_buffer_size, \
98 see WebSocketConfig docs`"
99 );
100 }
101}
102
103/// WebSocket input-output stream.
104///
105/// This is THE structure you want to create to be able to speak the WebSocket protocol.
106/// It may be created by calling `connect`, `accept` or `client` functions.
107///
108/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
109#[derive(Debug)]
110pub struct WebSocket<Stream> {
111 /// The underlying socket.
112 socket: Stream,
113 /// The context for managing a WebSocket.
114 context: WebSocketContext,
115}
116
117impl<Stream> WebSocket<Stream> {
118 /// Convert a raw socket into a WebSocket without performing a handshake.
119 ///
120 /// Call this function if you're using Tungstenite as a part of a web framework
121 /// or together with an existing one. If you need an initial handshake, use
122 /// `connect()` or `accept()` functions of the crate to construct a websocket.
123 ///
124 /// # Panics
125 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
126 pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self {
127 WebSocket { socket: stream, context: WebSocketContext::new(role, config) }
128 }
129
130 /// Convert a raw socket into a WebSocket without performing a handshake.
131 ///
132 /// Call this function if you're using Tungstenite as a part of a web framework
133 /// or together with an existing one. If you need an initial handshake, use
134 /// `connect()` or `accept()` functions of the crate to construct a websocket.
135 ///
136 /// # Panics
137 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
138 pub fn from_partially_read(
139 stream: Stream,
140 part: Vec<u8>,
141 role: Role,
142 config: Option<WebSocketConfig>,
143 ) -> Self {
144 WebSocket {
145 socket: stream,
146 context: WebSocketContext::from_partially_read(part, role, config),
147 }
148 }
149
150 /// Returns a shared reference to the inner stream.
151 pub fn get_ref(&self) -> &Stream {
152 &self.socket
153 }
154 /// Returns a mutable reference to the inner stream.
155 pub fn get_mut(&mut self) -> &mut Stream {
156 &mut self.socket
157 }
158
159 /// Change the configuration.
160 ///
161 /// # Panics
162 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
163 pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
164 self.context.set_config(set_func)
165 }
166
167 /// Read the configuration.
168 pub fn get_config(&self) -> &WebSocketConfig {
169 self.context.get_config()
170 }
171
172 /// Check if it is possible to read messages.
173 ///
174 /// Reading is impossible after receiving `Message::Close`. It is still possible after
175 /// sending close frame since the peer still may send some data before confirming close.
176 pub fn can_read(&self) -> bool {
177 self.context.can_read()
178 }
179
180 /// Check if it is possible to write messages.
181 ///
182 /// Writing gets impossible immediately after sending or receiving `Message::Close`.
183 pub fn can_write(&self) -> bool {
184 self.context.can_write()
185 }
186}
187
188impl<Stream: Read + Write> WebSocket<Stream> {
189 /// Read a message from stream, if possible.
190 ///
191 /// This will also queue responses to ping and close messages. These responses
192 /// will be written and flushed on the next call to [`read`](Self::read),
193 /// [`write`](Self::write) or [`flush`](Self::flush).
194 ///
195 /// # Closing the connection
196 /// When the remote endpoint decides to close the connection this will return
197 /// the close message with an optional close frame.
198 ///
199 /// You should continue calling [`read`](Self::read), [`write`](Self::write) or
200 /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
201 /// is returned. Once that happens it is safe to drop the underlying connection.
202 pub fn read(&mut self) -> Result<Message> {
203 self.context.read(&mut self.socket)
204 }
205
206 /// Writes and immediately flushes a message.
207 /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
208 pub fn send(&mut self, message: Message) -> Result<()> {
209 self.write(message)?;
210 self.flush()
211 }
212
213 /// Write a message to the provided stream, if possible.
214 ///
215 /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
216 ///
217 /// In the event of stream write failure the message frame will be stored
218 /// in the write buffer and will try again on the next call to [`write`](Self::write)
219 /// or [`flush`](Self::flush).
220 ///
221 /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
222 /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
223 ///
224 /// This call will generally not flush. However, if there are queued automatic messages
225 /// they will be written and eagerly flushed.
226 ///
227 /// For example, upon receiving ping messages tungstenite queues pong replies automatically.
228 /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
229 /// will write & flush the pong reply. This means you should not respond to ping frames manually.
230 ///
231 /// You can however send pong frames manually in order to indicate a unidirectional heartbeat
232 /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
233 /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
234 /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
235 /// ping will not be sent as it will be replaced by your custom pong message.
236 ///
237 /// # Errors
238 /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
239 /// along with the equivalent passed message frame.
240 /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
241 /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
242 /// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
243 /// error on your part.
244 /// - [`Error::Io`] is returned if the underlying connection returns an error
245 /// (consider these fatal except for WouldBlock).
246 /// - [`Error::Capacity`] if your message size is bigger than the configured max message size.
247 pub fn write(&mut self, message: Message) -> Result<()> {
248 self.context.write(&mut self.socket, message)
249 }
250
251 /// Flush writes.
252 ///
253 /// Ensures all messages previously passed to [`write`](Self::write) and automatic
254 /// queued pong responses are written & flushed into the underlying stream.
255 pub fn flush(&mut self) -> Result<()> {
256 self.context.flush(&mut self.socket)
257 }
258
259 /// Close the connection.
260 ///
261 /// This function guarantees that the close frame will be queued.
262 /// There is no need to call it again. Calling this function is
263 /// the same as calling `write(Message::Close(..))`.
264 ///
265 /// After queuing the close frame you should continue calling [`read`](Self::read) or
266 /// [`flush`](Self::flush) to drive the close handshake to completion.
267 ///
268 /// The websocket RFC defines that the underlying connection should be closed
269 /// by the server. Tungstenite takes care of this asymmetry for you.
270 ///
271 /// When the close handshake is finished (we have both sent and received
272 /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
273 /// [Error::ConnectionClosed] if this endpoint is the server.
274 ///
275 /// If this endpoint is a client, [Error::ConnectionClosed] will only be
276 /// returned after the server has closed the underlying connection.
277 ///
278 /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
279 /// is returned from [`read`](Self::read) or [`flush`](Self::flush).
280 pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
281 self.context.close(&mut self.socket, code)
282 }
283
284 /// Old name for [`read`](Self::read).
285 #[deprecated(note = "Use `read`")]
286 pub fn read_message(&mut self) -> Result<Message> {
287 self.read()
288 }
289
290 /// Old name for [`send`](Self::send).
291 #[deprecated(note = "Use `send`")]
292 pub fn write_message(&mut self, message: Message) -> Result<()> {
293 self.send(message)
294 }
295
296 /// Old name for [`flush`](Self::flush).
297 #[deprecated(note = "Use `flush`")]
298 pub fn write_pending(&mut self) -> Result<()> {
299 self.flush()
300 }
301}
302
303/// A context for managing WebSocket stream.
304#[derive(Debug)]
305pub struct WebSocketContext {
306 /// Server or client?
307 role: Role,
308 /// encoder/decoder of frame.
309 frame: FrameCodec,
310 /// The state of processing, either "active" or "closing".
311 state: WebSocketState,
312 /// Receive: an incomplete message being processed.
313 incomplete: Option<IncompleteMessage>,
314 /// Send in addition to regular messages E.g. "pong" or "close".
315 additional_send: Option<Frame>,
316 /// The configuration for the websocket session.
317 config: WebSocketConfig,
318}
319
320impl WebSocketContext {
321 /// Create a WebSocket context that manages a post-handshake stream.
322 ///
323 /// # Panics
324 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
325 pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self {
326 Self::_new(role, FrameCodec::new(), config.unwrap_or_default())
327 }
328
329 /// Create a WebSocket context that manages an post-handshake stream.
330 ///
331 /// # Panics
332 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
333 pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
334 Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default())
335 }
336
337 fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self {
338 config.assert_valid();
339 frame.set_max_out_buffer_len(config.max_write_buffer_size);
340 frame.set_out_buffer_write_len(config.write_buffer_size);
341 Self {
342 role,
343 frame,
344 state: WebSocketState::Active,
345 incomplete: None,
346 additional_send: None,
347 config,
348 }
349 }
350
351 /// Change the configuration.
352 ///
353 /// # Panics
354 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
355 pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
356 set_func(&mut self.config);
357 self.config.assert_valid();
358 self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size);
359 self.frame.set_out_buffer_write_len(self.config.write_buffer_size);
360 }
361
362 /// Read the configuration.
363 pub fn get_config(&self) -> &WebSocketConfig {
364 &self.config
365 }
366
367 /// Check if it is possible to read messages.
368 ///
369 /// Reading is impossible after receiving `Message::Close`. It is still possible after
370 /// sending close frame since the peer still may send some data before confirming close.
371 pub fn can_read(&self) -> bool {
372 self.state.can_read()
373 }
374
375 /// Check if it is possible to write messages.
376 ///
377 /// Writing gets impossible immediately after sending or receiving `Message::Close`.
378 pub fn can_write(&self) -> bool {
379 self.state.is_active()
380 }
381
382 /// Read a message from the provided stream, if possible.
383 ///
384 /// This function sends pong and close responses automatically.
385 /// However, it never blocks on write.
386 pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
387 where
388 Stream: Read + Write,
389 {
390 // Do not read from already closed connections.
391 self.state.check_not_terminated()?;
392
393 loop {
394 if self.additional_send.is_some() {
395 // Since we may get ping or close, we need to reply to the messages even during read.
396 // Thus we flush but ignore its blocking.
397 self.flush(stream).no_block()?;
398 } else if self.role == Role::Server && !self.state.can_read() {
399 self.state = WebSocketState::Terminated;
400 return Err(Error::ConnectionClosed);
401 }
402
403 // If we get here, either write blocks or we have nothing to write.
404 // Thus if read blocks, just let it return WouldBlock.
405 if let Some(message) = self.read_message_frame(stream)? {
406 trace!("Received message {}", message);
407 return Ok(message);
408 }
409 }
410 }
411
412 /// Write a message to the provided stream.
413 ///
414 /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
415 ///
416 /// In the event of stream write failure the message frame will be stored
417 /// in the write buffer and will try again on the next call to [`write`](Self::write)
418 /// or [`flush`](Self::flush).
419 ///
420 /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
421 /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
422 pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
423 where
424 Stream: Read + Write,
425 {
426 // When terminated, return AlreadyClosed.
427 self.state.check_not_terminated()?;
428
429 // Do not write after sending a close frame.
430 if !self.state.is_active() {
431 return Err(Error::Protocol(ProtocolError::SendAfterClosing));
432 }
433
434 let frame = match message {
435 Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true),
436 Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true),
437 Message::Ping(data) => Frame::ping(data),
438 Message::Pong(data) => {
439 self.set_additional(Frame::pong(data));
440 // Note: user pongs can be user flushed so no need to flush here
441 return self._write(stream, None).map(|_| ());
442 }
443 Message::Close(code) => return self.close(stream, code),
444 Message::Frame(f) => f,
445 };
446
447 let should_flush = self._write(stream, Some(frame))?;
448 if should_flush {
449 self.flush(stream)?;
450 }
451 Ok(())
452 }
453
454 /// Flush writes.
455 ///
456 /// Ensures all messages previously passed to [`write`](Self::write) and automatically
457 /// queued pong responses are written & flushed into the `stream`.
458 #[inline]
459 pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()>
460 where
461 Stream: Read + Write,
462 {
463 self._write(stream, None)?;
464 self.frame.write_out_buffer(stream)?;
465 Ok(stream.flush()?)
466 }
467
468 /// Writes any data in the out_buffer, `additional_send` and given `data`.
469 ///
470 /// Does **not** flush.
471 ///
472 /// Returns true if the write contents indicate we should flush immediately.
473 fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool>
474 where
475 Stream: Read + Write,
476 {
477 if let Some(data) = data {
478 self.buffer_frame(stream, data)?;
479 }
480
481 // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
482 // response, unless it already received a Close frame. It SHOULD
483 // respond with Pong frame as soon as is practical. (RFC 6455)
484 let should_flush = if let Some(msg) = self.additional_send.take() {
485 trace!("Sending pong/close");
486 match self.buffer_frame(stream, msg) {
487 Err(Error::WriteBufferFull(Message::Frame(msg))) => {
488 // if an system message would exceed the buffer put it back in
489 // `additional_send` for retry. Otherwise returning this error
490 // may not make sense to the user, e.g. calling `flush`.
491 self.set_additional(msg);
492 false
493 }
494 Err(err) => return Err(err),
495 Ok(_) => true,
496 }
497 } else {
498 false
499 };
500
501 // If we're closing and there is nothing to send anymore, we should close the connection.
502 if self.role == Role::Server && !self.state.can_read() {
503 // The underlying TCP connection, in most normal cases, SHOULD be closed
504 // first by the server, so that it holds the TIME_WAIT state and not the
505 // client (as this would prevent it from re-opening the connection for 2
506 // maximum segment lifetimes (2MSL), while there is no corresponding
507 // server impact as a TIME_WAIT connection is immediately reopened upon
508 // a new SYN with a higher seq number). (RFC 6455)
509 self.frame.write_out_buffer(stream)?;
510 self.state = WebSocketState::Terminated;
511 Err(Error::ConnectionClosed)
512 } else {
513 Ok(should_flush)
514 }
515 }
516
517 /// Close the connection.
518 ///
519 /// This function guarantees that the close frame will be queued.
520 /// There is no need to call it again. Calling this function is
521 /// the same as calling `send(Message::Close(..))`.
522 pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()>
523 where
524 Stream: Read + Write,
525 {
526 if let WebSocketState::Active = self.state {
527 self.state = WebSocketState::ClosedByUs;
528 let frame = Frame::close(code);
529 self._write(stream, Some(frame))?;
530 }
531 self.flush(stream)
532 }
533
534 /// Try to decode one message frame. May return None.
535 fn read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>>
536 where
537 Stream: Read + Write,
538 {
539 if let Some(mut frame) = self
540 .frame
541 .read_frame(stream, self.config.max_frame_size)
542 .check_connection_reset(self.state)?
543 {
544 if !self.state.can_read() {
545 return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing));
546 }
547 // MUST be 0 unless an extension is negotiated that defines meanings
548 // for non-zero values. If a nonzero value is received and none of
549 // the negotiated extensions defines the meaning of such a nonzero
550 // value, the receiving endpoint MUST _Fail the WebSocket
551 // Connection_.
552 {
553 let hdr = frame.header();
554 if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
555 return Err(Error::Protocol(ProtocolError::NonZeroReservedBits));
556 }
557 }
558
559 match self.role {
560 Role::Server => {
561 if frame.is_masked() {
562 // A server MUST remove masking for data frames received from a client
563 // as described in Section 5.3. (RFC 6455)
564 frame.apply_mask()
565 } else if !self.config.accept_unmasked_frames {
566 // The server MUST close the connection upon receiving a
567 // frame that is not masked. (RFC 6455)
568 // The only exception here is if the user explicitly accepts given
569 // stream by setting WebSocketConfig.accept_unmasked_frames to true
570 return Err(Error::Protocol(ProtocolError::UnmaskedFrameFromClient));
571 }
572 }
573 Role::Client => {
574 if frame.is_masked() {
575 // A client MUST close a connection if it detects a masked frame. (RFC 6455)
576 return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer));
577 }
578 }
579 }
580
581 match frame.header().opcode {
582 OpCode::Control(ctl) => {
583 match ctl {
584 // All control frames MUST have a payload length of 125 bytes or less
585 // and MUST NOT be fragmented. (RFC 6455)
586 _ if !frame.header().is_final => {
587 Err(Error::Protocol(ProtocolError::FragmentedControlFrame))
588 }
589 _ if frame.payload().len() > 125 => {
590 Err(Error::Protocol(ProtocolError::ControlFrameTooBig))
591 }
592 OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)),
593 OpCtl::Reserved(i) => {
594 Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i)))
595 }
596 OpCtl::Ping => {
597 let data = frame.into_data();
598 // No ping processing after we sent a close frame.
599 if self.state.is_active() {
600 self.set_additional(Frame::pong(data.clone()));
601 }
602 Ok(Some(Message::Ping(data)))
603 }
604 OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))),
605 }
606 }
607
608 OpCode::Data(data) => {
609 let fin = frame.header().is_final;
610 match data {
611 OpData::Continue => {
612 if let Some(ref mut msg) = self.incomplete {
613 msg.extend(frame.into_data(), self.config.max_message_size)?;
614 } else {
615 return Err(Error::Protocol(
616 ProtocolError::UnexpectedContinueFrame,
617 ));
618 }
619 if fin {
620 Ok(Some(self.incomplete.take().unwrap().complete()?))
621 } else {
622 Ok(None)
623 }
624 }
625 c if self.incomplete.is_some() => {
626 Err(Error::Protocol(ProtocolError::ExpectedFragment(c)))
627 }
628 OpData::Text | OpData::Binary => {
629 let msg = {
630 let message_type = match data {
631 OpData::Text => IncompleteMessageType::Text,
632 OpData::Binary => IncompleteMessageType::Binary,
633 _ => panic!("Bug: message is not text nor binary"),
634 };
635 let mut m = IncompleteMessage::new(message_type);
636 m.extend(frame.into_data(), self.config.max_message_size)?;
637 m
638 };
639 if fin {
640 Ok(Some(msg.complete()?))
641 } else {
642 self.incomplete = Some(msg);
643 Ok(None)
644 }
645 }
646 OpData::Reserved(i) => {
647 Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i)))
648 }
649 }
650 }
651 } // match opcode
652 } else {
653 // Connection closed by peer
654 match replace(&mut self.state, WebSocketState::Terminated) {
655 WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
656 Err(Error::ConnectionClosed)
657 }
658 _ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)),
659 }
660 }
661 }
662
663 /// Received a close frame. Tells if we need to return a close frame to the user.
664 #[allow(clippy::option_option)]
665 fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> {
666 debug!("Received close frame: {:?}", close);
667 match self.state {
668 WebSocketState::Active => {
669 self.state = WebSocketState::ClosedByPeer;
670
671 let close = close.map(|frame| {
672 if !frame.code.is_allowed() {
673 CloseFrame {
674 code: CloseCode::Protocol,
675 reason: "Protocol violation".into(),
676 }
677 } else {
678 frame
679 }
680 });
681
682 let reply = Frame::close(close.clone());
683 debug!("Replying to close with {:?}", reply);
684 self.set_additional(reply);
685
686 Some(close)
687 }
688 WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
689 // It is already closed, just ignore.
690 None
691 }
692 WebSocketState::ClosedByUs => {
693 // We received a reply.
694 self.state = WebSocketState::CloseAcknowledged;
695 Some(close)
696 }
697 WebSocketState::Terminated => unreachable!(),
698 }
699 }
700
701 /// Write a single frame into the write-buffer.
702 fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
703 where
704 Stream: Read + Write,
705 {
706 match self.role {
707 Role::Server => {}
708 Role::Client => {
709 // 5. If the data is being sent by the client, the frame(s) MUST be
710 // masked as defined in Section 5.3. (RFC 6455)
711 frame.set_random_mask();
712 }
713 }
714
715 trace!("Sending frame: {:?}", frame);
716 self.frame.buffer_frame(stream, frame).check_connection_reset(self.state)
717 }
718
719 /// Replace `additional_send` if it is currently a `Pong` message.
720 fn set_additional(&mut self, add: Frame) {
721 let empty_or_pong = self
722 .additional_send
723 .as_ref()
724 .map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong));
725 if empty_or_pong {
726 self.additional_send.replace(add);
727 }
728 }
729}
730
731/// The current connection state.
732#[derive(Debug, PartialEq, Eq, Clone, Copy)]
733enum WebSocketState {
734 /// The connection is active.
735 Active,
736 /// We initiated a close handshake.
737 ClosedByUs,
738 /// The peer initiated a close handshake.
739 ClosedByPeer,
740 /// The peer replied to our close handshake.
741 CloseAcknowledged,
742 /// The connection does not exist anymore.
743 Terminated,
744}
745
746impl WebSocketState {
747 /// Tell if we're allowed to process normal messages.
748 fn is_active(self) -> bool {
749 matches!(self, WebSocketState::Active)
750 }
751
752 /// Tell if we should process incoming data. Note that if we send a close frame
753 /// but the remote hasn't confirmed, they might have sent data before they receive our
754 /// close frame, so we should still pass those to client code, hence ClosedByUs is valid.
755 fn can_read(self) -> bool {
756 matches!(self, WebSocketState::Active | WebSocketState::ClosedByUs)
757 }
758
759 /// Check if the state is active, return error if not.
760 fn check_not_terminated(self) -> Result<()> {
761 match self {
762 WebSocketState::Terminated => Err(Error::AlreadyClosed),
763 _ => Ok(()),
764 }
765 }
766}
767
768/// Translate "Connection reset by peer" into `ConnectionClosed` if appropriate.
769trait CheckConnectionReset {
770 fn check_connection_reset(self, state: WebSocketState) -> Self;
771}
772
773impl<T> CheckConnectionReset for Result<T> {
774 fn check_connection_reset(self, state: WebSocketState) -> Self {
775 match self {
776 Err(Error::Io(io_error)) => Err({
777 if !state.can_read() && io_error.kind() == IoErrorKind::ConnectionReset {
778 Error::ConnectionClosed
779 } else {
780 Error::Io(io_error)
781 }
782 }),
783 x => x,
784 }
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use super::{Message, Role, WebSocket, WebSocketConfig};
791 use crate::error::{CapacityError, Error};
792
793 use std::{io, io::Cursor};
794
795 struct WriteMoc<Stream>(Stream);
796
797 impl<Stream> io::Write for WriteMoc<Stream> {
798 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
799 Ok(buf.len())
800 }
801 fn flush(&mut self) -> io::Result<()> {
802 Ok(())
803 }
804 }
805
806 impl<Stream: io::Read> io::Read for WriteMoc<Stream> {
807 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
808 self.0.read(buf)
809 }
810 }
811
812 #[test]
813 fn receive_messages() {
814 let incoming = Cursor::new(vec![
815 0x89, 0x02, 0x01, 0x02, 0x8a, 0x01, 0x03, 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f,
816 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x82, 0x03, 0x01, 0x02,
817 0x03,
818 ]);
819 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None);
820 assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2]));
821 assert_eq!(socket.read().unwrap(), Message::Pong(vec![3]));
822 assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into()));
823 assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
824 }
825
826 #[test]
827 fn size_limiting_text_fragmented() {
828 let incoming = Cursor::new(vec![
829 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72,
830 0x6c, 0x64, 0x21,
831 ]);
832 let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() };
833 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
834
835 assert!(matches!(
836 socket.read(),
837 Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 }))
838 ));
839 }
840
841 #[test]
842 fn size_limiting_binary() {
843 let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]);
844 let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() };
845 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
846
847 assert!(matches!(
848 socket.read(),
849 Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 }))
850 ));
851 }
852}