tungstenite/protocol/
mod.rs

1//! Generic WebSocket message stream.
2
3pub mod frame;
4
5mod message;
6
7pub use self::{frame::CloseFrame, message::Message};
8
9use self::{
10    frame::{
11        coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode},
12        Frame, FrameCodec,
13    },
14    message::{IncompleteMessage, IncompleteMessageType},
15};
16use crate::{
17    error::{Error, ProtocolError, Result},
18    util::NonBlockingResult,
19};
20use log::*;
21use std::{
22    io::{ErrorKind as IoErrorKind, Read, Write},
23    mem::replace,
24};
25
26/// Indicates a Client or Server role of the websocket
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum Role {
29    /// This socket is a server
30    Server,
31    /// This socket is a client
32    Client,
33}
34
35/// The configuration for WebSocket connection.
36#[derive(Debug, Clone, Copy)]
37pub struct WebSocketConfig {
38    /// Does nothing, instead use `max_write_buffer_size`.
39    #[deprecated]
40    pub max_send_queue: Option<usize>,
41    /// The target minimum size of the write buffer to reach before writing the data
42    /// to the underlying stream.
43    /// The default value is 128 KiB.
44    ///
45    /// If set to `0` each message will be eagerly written to the underlying stream.
46    /// It is often more optimal to allow them to buffer a little, hence the default value.
47    ///
48    /// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
49    pub write_buffer_size: usize,
50    /// The max size of the write buffer in bytes. Setting this can provide backpressure
51    /// in the case the write buffer is filling up due to write errors.
52    /// The default value is unlimited.
53    ///
54    /// Note: The write buffer only builds up past [`write_buffer_size`](Self::write_buffer_size)
55    /// when writes to the underlying stream are failing. So the **write buffer can not
56    /// fill up if you are not observing write errors even if not flushing**.
57    ///
58    /// Note: Should always be at least [`write_buffer_size + 1 message`](Self::write_buffer_size)
59    /// and probably a little more depending on error handling strategy.
60    pub max_write_buffer_size: usize,
61    /// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
62    /// which should be reasonably big for all normal use-cases but small enough to prevent
63    /// memory eating by a malicious user.
64    pub max_message_size: Option<usize>,
65    /// The maximum size of a single message frame. `None` means no size limit. The limit is for
66    /// frame payload NOT including the frame header. The default value is 16 MiB which should
67    /// be reasonably big for all normal use-cases but small enough to prevent memory eating
68    /// by a malicious user.
69    pub max_frame_size: Option<usize>,
70    /// When set to `true`, the server will accept and handle unmasked frames
71    /// from the client. According to the RFC 6455, the server must close the
72    /// connection to the client in such cases, however it seems like there are
73    /// some popular libraries that are sending unmasked frames, ignoring the RFC.
74    /// By default this option is set to `false`, i.e. according to RFC 6455.
75    pub accept_unmasked_frames: bool,
76}
77
78impl Default for WebSocketConfig {
79    fn default() -> Self {
80        #[allow(deprecated)]
81        WebSocketConfig {
82            max_send_queue: None,
83            write_buffer_size: 128 * 1024,
84            max_write_buffer_size: usize::MAX,
85            max_message_size: Some(64 << 20),
86            max_frame_size: Some(16 << 20),
87            accept_unmasked_frames: false,
88        }
89    }
90}
91
92impl WebSocketConfig {
93    /// Panic if values are invalid.
94    pub(crate) fn assert_valid(&self) {
95        assert!(
96            self.max_write_buffer_size > self.write_buffer_size,
97            "WebSocketConfig::max_write_buffer_size must be greater than write_buffer_size, \
98            see WebSocketConfig docs`"
99        );
100    }
101}
102
103/// WebSocket input-output stream.
104///
105/// This is THE structure you want to create to be able to speak the WebSocket protocol.
106/// It may be created by calling `connect`, `accept` or `client` functions.
107///
108/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
109#[derive(Debug)]
110pub struct WebSocket<Stream> {
111    /// The underlying socket.
112    socket: Stream,
113    /// The context for managing a WebSocket.
114    context: WebSocketContext,
115}
116
117impl<Stream> WebSocket<Stream> {
118    /// Convert a raw socket into a WebSocket without performing a handshake.
119    ///
120    /// Call this function if you're using Tungstenite as a part of a web framework
121    /// or together with an existing one. If you need an initial handshake, use
122    /// `connect()` or `accept()` functions of the crate to construct a websocket.
123    ///
124    /// # Panics
125    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
126    pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self {
127        WebSocket { socket: stream, context: WebSocketContext::new(role, config) }
128    }
129
130    /// Convert a raw socket into a WebSocket without performing a handshake.
131    ///
132    /// Call this function if you're using Tungstenite as a part of a web framework
133    /// or together with an existing one. If you need an initial handshake, use
134    /// `connect()` or `accept()` functions of the crate to construct a websocket.
135    ///
136    /// # Panics
137    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
138    pub fn from_partially_read(
139        stream: Stream,
140        part: Vec<u8>,
141        role: Role,
142        config: Option<WebSocketConfig>,
143    ) -> Self {
144        WebSocket {
145            socket: stream,
146            context: WebSocketContext::from_partially_read(part, role, config),
147        }
148    }
149
150    /// Returns a shared reference to the inner stream.
151    pub fn get_ref(&self) -> &Stream {
152        &self.socket
153    }
154    /// Returns a mutable reference to the inner stream.
155    pub fn get_mut(&mut self) -> &mut Stream {
156        &mut self.socket
157    }
158
159    /// Change the configuration.
160    ///
161    /// # Panics
162    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
163    pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
164        self.context.set_config(set_func)
165    }
166
167    /// Read the configuration.
168    pub fn get_config(&self) -> &WebSocketConfig {
169        self.context.get_config()
170    }
171
172    /// Check if it is possible to read messages.
173    ///
174    /// Reading is impossible after receiving `Message::Close`. It is still possible after
175    /// sending close frame since the peer still may send some data before confirming close.
176    pub fn can_read(&self) -> bool {
177        self.context.can_read()
178    }
179
180    /// Check if it is possible to write messages.
181    ///
182    /// Writing gets impossible immediately after sending or receiving `Message::Close`.
183    pub fn can_write(&self) -> bool {
184        self.context.can_write()
185    }
186}
187
188impl<Stream: Read + Write> WebSocket<Stream> {
189    /// Read a message from stream, if possible.
190    ///
191    /// This will also queue responses to ping and close messages. These responses
192    /// will be written and flushed on the next call to [`read`](Self::read),
193    /// [`write`](Self::write) or [`flush`](Self::flush).
194    ///
195    /// # Closing the connection
196    /// When the remote endpoint decides to close the connection this will return
197    /// the close message with an optional close frame.
198    ///
199    /// You should continue calling [`read`](Self::read), [`write`](Self::write) or
200    /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
201    /// is returned. Once that happens it is safe to drop the underlying connection.
202    pub fn read(&mut self) -> Result<Message> {
203        self.context.read(&mut self.socket)
204    }
205
206    /// Writes and immediately flushes a message.
207    /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
208    pub fn send(&mut self, message: Message) -> Result<()> {
209        self.write(message)?;
210        self.flush()
211    }
212
213    /// Write a message to the provided stream, if possible.
214    ///
215    /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
216    ///
217    /// In the event of stream write failure the message frame will be stored
218    /// in the write buffer and will try again on the next call to [`write`](Self::write)
219    /// or [`flush`](Self::flush).
220    ///
221    /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
222    /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
223    ///
224    /// This call will generally not flush. However, if there are queued automatic messages
225    /// they will be written and eagerly flushed.
226    ///
227    /// For example, upon receiving ping messages tungstenite queues pong replies automatically.
228    /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
229    /// will write & flush the pong reply. This means you should not respond to ping frames manually.
230    ///
231    /// You can however send pong frames manually in order to indicate a unidirectional heartbeat
232    /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
233    /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
234    /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
235    /// ping will not be sent as it will be replaced by your custom pong message.
236    ///
237    /// # Errors
238    /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
239    ///   along with the equivalent passed message frame.
240    /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
241    /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
242    ///   [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
243    ///   error on your part.
244    /// - [`Error::Io`] is returned if the underlying connection returns an error
245    ///   (consider these fatal except for WouldBlock).
246    /// - [`Error::Capacity`] if your message size is bigger than the configured max message size.
247    pub fn write(&mut self, message: Message) -> Result<()> {
248        self.context.write(&mut self.socket, message)
249    }
250
251    /// Flush writes.
252    ///
253    /// Ensures all messages previously passed to [`write`](Self::write) and automatic
254    /// queued pong responses are written & flushed into the underlying stream.
255    pub fn flush(&mut self) -> Result<()> {
256        self.context.flush(&mut self.socket)
257    }
258
259    /// Close the connection.
260    ///
261    /// This function guarantees that the close frame will be queued.
262    /// There is no need to call it again. Calling this function is
263    /// the same as calling `write(Message::Close(..))`.
264    ///
265    /// After queuing the close frame you should continue calling [`read`](Self::read) or
266    /// [`flush`](Self::flush) to drive the close handshake to completion.
267    ///
268    /// The websocket RFC defines that the underlying connection should be closed
269    /// by the server. Tungstenite takes care of this asymmetry for you.
270    ///
271    /// When the close handshake is finished (we have both sent and received
272    /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
273    /// [Error::ConnectionClosed] if this endpoint is the server.
274    ///
275    /// If this endpoint is a client, [Error::ConnectionClosed] will only be
276    /// returned after the server has closed the underlying connection.
277    ///
278    /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
279    /// is returned from [`read`](Self::read) or [`flush`](Self::flush).
280    pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
281        self.context.close(&mut self.socket, code)
282    }
283
284    /// Old name for [`read`](Self::read).
285    #[deprecated(note = "Use `read`")]
286    pub fn read_message(&mut self) -> Result<Message> {
287        self.read()
288    }
289
290    /// Old name for [`send`](Self::send).
291    #[deprecated(note = "Use `send`")]
292    pub fn write_message(&mut self, message: Message) -> Result<()> {
293        self.send(message)
294    }
295
296    /// Old name for [`flush`](Self::flush).
297    #[deprecated(note = "Use `flush`")]
298    pub fn write_pending(&mut self) -> Result<()> {
299        self.flush()
300    }
301}
302
303/// A context for managing WebSocket stream.
304#[derive(Debug)]
305pub struct WebSocketContext {
306    /// Server or client?
307    role: Role,
308    /// encoder/decoder of frame.
309    frame: FrameCodec,
310    /// The state of processing, either "active" or "closing".
311    state: WebSocketState,
312    /// Receive: an incomplete message being processed.
313    incomplete: Option<IncompleteMessage>,
314    /// Send in addition to regular messages E.g. "pong" or "close".
315    additional_send: Option<Frame>,
316    /// The configuration for the websocket session.
317    config: WebSocketConfig,
318}
319
320impl WebSocketContext {
321    /// Create a WebSocket context that manages a post-handshake stream.
322    ///
323    /// # Panics
324    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
325    pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self {
326        Self::_new(role, FrameCodec::new(), config.unwrap_or_default())
327    }
328
329    /// Create a WebSocket context that manages an post-handshake stream.
330    ///
331    /// # Panics
332    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
333    pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
334        Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default())
335    }
336
337    fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self {
338        config.assert_valid();
339        frame.set_max_out_buffer_len(config.max_write_buffer_size);
340        frame.set_out_buffer_write_len(config.write_buffer_size);
341        Self {
342            role,
343            frame,
344            state: WebSocketState::Active,
345            incomplete: None,
346            additional_send: None,
347            config,
348        }
349    }
350
351    /// Change the configuration.
352    ///
353    /// # Panics
354    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
355    pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
356        set_func(&mut self.config);
357        self.config.assert_valid();
358        self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size);
359        self.frame.set_out_buffer_write_len(self.config.write_buffer_size);
360    }
361
362    /// Read the configuration.
363    pub fn get_config(&self) -> &WebSocketConfig {
364        &self.config
365    }
366
367    /// Check if it is possible to read messages.
368    ///
369    /// Reading is impossible after receiving `Message::Close`. It is still possible after
370    /// sending close frame since the peer still may send some data before confirming close.
371    pub fn can_read(&self) -> bool {
372        self.state.can_read()
373    }
374
375    /// Check if it is possible to write messages.
376    ///
377    /// Writing gets impossible immediately after sending or receiving `Message::Close`.
378    pub fn can_write(&self) -> bool {
379        self.state.is_active()
380    }
381
382    /// Read a message from the provided stream, if possible.
383    ///
384    /// This function sends pong and close responses automatically.
385    /// However, it never blocks on write.
386    pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
387    where
388        Stream: Read + Write,
389    {
390        // Do not read from already closed connections.
391        self.state.check_not_terminated()?;
392
393        loop {
394            if self.additional_send.is_some() {
395                // Since we may get ping or close, we need to reply to the messages even during read.
396                // Thus we flush but ignore its blocking.
397                self.flush(stream).no_block()?;
398            } else if self.role == Role::Server && !self.state.can_read() {
399                self.state = WebSocketState::Terminated;
400                return Err(Error::ConnectionClosed);
401            }
402
403            // If we get here, either write blocks or we have nothing to write.
404            // Thus if read blocks, just let it return WouldBlock.
405            if let Some(message) = self.read_message_frame(stream)? {
406                trace!("Received message {}", message);
407                return Ok(message);
408            }
409        }
410    }
411
412    /// Write a message to the provided stream.
413    ///
414    /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
415    ///
416    /// In the event of stream write failure the message frame will be stored
417    /// in the write buffer and will try again on the next call to [`write`](Self::write)
418    /// or [`flush`](Self::flush).
419    ///
420    /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
421    /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
422    pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
423    where
424        Stream: Read + Write,
425    {
426        // When terminated, return AlreadyClosed.
427        self.state.check_not_terminated()?;
428
429        // Do not write after sending a close frame.
430        if !self.state.is_active() {
431            return Err(Error::Protocol(ProtocolError::SendAfterClosing));
432        }
433
434        let frame = match message {
435            Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true),
436            Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true),
437            Message::Ping(data) => Frame::ping(data),
438            Message::Pong(data) => {
439                self.set_additional(Frame::pong(data));
440                // Note: user pongs can be user flushed so no need to flush here
441                return self._write(stream, None).map(|_| ());
442            }
443            Message::Close(code) => return self.close(stream, code),
444            Message::Frame(f) => f,
445        };
446
447        let should_flush = self._write(stream, Some(frame))?;
448        if should_flush {
449            self.flush(stream)?;
450        }
451        Ok(())
452    }
453
454    /// Flush writes.
455    ///
456    /// Ensures all messages previously passed to [`write`](Self::write) and automatically
457    /// queued pong responses are written & flushed into the `stream`.
458    #[inline]
459    pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()>
460    where
461        Stream: Read + Write,
462    {
463        self._write(stream, None)?;
464        self.frame.write_out_buffer(stream)?;
465        Ok(stream.flush()?)
466    }
467
468    /// Writes any data in the out_buffer, `additional_send` and given `data`.
469    ///
470    /// Does **not** flush.
471    ///
472    /// Returns true if the write contents indicate we should flush immediately.
473    fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool>
474    where
475        Stream: Read + Write,
476    {
477        if let Some(data) = data {
478            self.buffer_frame(stream, data)?;
479        }
480
481        // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
482        // response, unless it already received a Close frame. It SHOULD
483        // respond with Pong frame as soon as is practical. (RFC 6455)
484        let should_flush = if let Some(msg) = self.additional_send.take() {
485            trace!("Sending pong/close");
486            match self.buffer_frame(stream, msg) {
487                Err(Error::WriteBufferFull(Message::Frame(msg))) => {
488                    // if an system message would exceed the buffer put it back in
489                    // `additional_send` for retry. Otherwise returning this error
490                    // may not make sense to the user, e.g. calling `flush`.
491                    self.set_additional(msg);
492                    false
493                }
494                Err(err) => return Err(err),
495                Ok(_) => true,
496            }
497        } else {
498            false
499        };
500
501        // If we're closing and there is nothing to send anymore, we should close the connection.
502        if self.role == Role::Server && !self.state.can_read() {
503            // The underlying TCP connection, in most normal cases, SHOULD be closed
504            // first by the server, so that it holds the TIME_WAIT state and not the
505            // client (as this would prevent it from re-opening the connection for 2
506            // maximum segment lifetimes (2MSL), while there is no corresponding
507            // server impact as a TIME_WAIT connection is immediately reopened upon
508            // a new SYN with a higher seq number). (RFC 6455)
509            self.frame.write_out_buffer(stream)?;
510            self.state = WebSocketState::Terminated;
511            Err(Error::ConnectionClosed)
512        } else {
513            Ok(should_flush)
514        }
515    }
516
517    /// Close the connection.
518    ///
519    /// This function guarantees that the close frame will be queued.
520    /// There is no need to call it again. Calling this function is
521    /// the same as calling `send(Message::Close(..))`.
522    pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()>
523    where
524        Stream: Read + Write,
525    {
526        if let WebSocketState::Active = self.state {
527            self.state = WebSocketState::ClosedByUs;
528            let frame = Frame::close(code);
529            self._write(stream, Some(frame))?;
530        }
531        self.flush(stream)
532    }
533
534    /// Try to decode one message frame. May return None.
535    fn read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>>
536    where
537        Stream: Read + Write,
538    {
539        if let Some(mut frame) = self
540            .frame
541            .read_frame(stream, self.config.max_frame_size)
542            .check_connection_reset(self.state)?
543        {
544            if !self.state.can_read() {
545                return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing));
546            }
547            // MUST be 0 unless an extension is negotiated that defines meanings
548            // for non-zero values.  If a nonzero value is received and none of
549            // the negotiated extensions defines the meaning of such a nonzero
550            // value, the receiving endpoint MUST _Fail the WebSocket
551            // Connection_.
552            {
553                let hdr = frame.header();
554                if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
555                    return Err(Error::Protocol(ProtocolError::NonZeroReservedBits));
556                }
557            }
558
559            match self.role {
560                Role::Server => {
561                    if frame.is_masked() {
562                        // A server MUST remove masking for data frames received from a client
563                        // as described in Section 5.3. (RFC 6455)
564                        frame.apply_mask()
565                    } else if !self.config.accept_unmasked_frames {
566                        // The server MUST close the connection upon receiving a
567                        // frame that is not masked. (RFC 6455)
568                        // The only exception here is if the user explicitly accepts given
569                        // stream by setting WebSocketConfig.accept_unmasked_frames to true
570                        return Err(Error::Protocol(ProtocolError::UnmaskedFrameFromClient));
571                    }
572                }
573                Role::Client => {
574                    if frame.is_masked() {
575                        // A client MUST close a connection if it detects a masked frame. (RFC 6455)
576                        return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer));
577                    }
578                }
579            }
580
581            match frame.header().opcode {
582                OpCode::Control(ctl) => {
583                    match ctl {
584                        // All control frames MUST have a payload length of 125 bytes or less
585                        // and MUST NOT be fragmented. (RFC 6455)
586                        _ if !frame.header().is_final => {
587                            Err(Error::Protocol(ProtocolError::FragmentedControlFrame))
588                        }
589                        _ if frame.payload().len() > 125 => {
590                            Err(Error::Protocol(ProtocolError::ControlFrameTooBig))
591                        }
592                        OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)),
593                        OpCtl::Reserved(i) => {
594                            Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i)))
595                        }
596                        OpCtl::Ping => {
597                            let data = frame.into_data();
598                            // No ping processing after we sent a close frame.
599                            if self.state.is_active() {
600                                self.set_additional(Frame::pong(data.clone()));
601                            }
602                            Ok(Some(Message::Ping(data)))
603                        }
604                        OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))),
605                    }
606                }
607
608                OpCode::Data(data) => {
609                    let fin = frame.header().is_final;
610                    match data {
611                        OpData::Continue => {
612                            if let Some(ref mut msg) = self.incomplete {
613                                msg.extend(frame.into_data(), self.config.max_message_size)?;
614                            } else {
615                                return Err(Error::Protocol(
616                                    ProtocolError::UnexpectedContinueFrame,
617                                ));
618                            }
619                            if fin {
620                                Ok(Some(self.incomplete.take().unwrap().complete()?))
621                            } else {
622                                Ok(None)
623                            }
624                        }
625                        c if self.incomplete.is_some() => {
626                            Err(Error::Protocol(ProtocolError::ExpectedFragment(c)))
627                        }
628                        OpData::Text | OpData::Binary => {
629                            let msg = {
630                                let message_type = match data {
631                                    OpData::Text => IncompleteMessageType::Text,
632                                    OpData::Binary => IncompleteMessageType::Binary,
633                                    _ => panic!("Bug: message is not text nor binary"),
634                                };
635                                let mut m = IncompleteMessage::new(message_type);
636                                m.extend(frame.into_data(), self.config.max_message_size)?;
637                                m
638                            };
639                            if fin {
640                                Ok(Some(msg.complete()?))
641                            } else {
642                                self.incomplete = Some(msg);
643                                Ok(None)
644                            }
645                        }
646                        OpData::Reserved(i) => {
647                            Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i)))
648                        }
649                    }
650                }
651            } // match opcode
652        } else {
653            // Connection closed by peer
654            match replace(&mut self.state, WebSocketState::Terminated) {
655                WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
656                    Err(Error::ConnectionClosed)
657                }
658                _ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)),
659            }
660        }
661    }
662
663    /// Received a close frame. Tells if we need to return a close frame to the user.
664    #[allow(clippy::option_option)]
665    fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> {
666        debug!("Received close frame: {:?}", close);
667        match self.state {
668            WebSocketState::Active => {
669                self.state = WebSocketState::ClosedByPeer;
670
671                let close = close.map(|frame| {
672                    if !frame.code.is_allowed() {
673                        CloseFrame {
674                            code: CloseCode::Protocol,
675                            reason: "Protocol violation".into(),
676                        }
677                    } else {
678                        frame
679                    }
680                });
681
682                let reply = Frame::close(close.clone());
683                debug!("Replying to close with {:?}", reply);
684                self.set_additional(reply);
685
686                Some(close)
687            }
688            WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
689                // It is already closed, just ignore.
690                None
691            }
692            WebSocketState::ClosedByUs => {
693                // We received a reply.
694                self.state = WebSocketState::CloseAcknowledged;
695                Some(close)
696            }
697            WebSocketState::Terminated => unreachable!(),
698        }
699    }
700
701    /// Write a single frame into the write-buffer.
702    fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
703    where
704        Stream: Read + Write,
705    {
706        match self.role {
707            Role::Server => {}
708            Role::Client => {
709                // 5.  If the data is being sent by the client, the frame(s) MUST be
710                // masked as defined in Section 5.3. (RFC 6455)
711                frame.set_random_mask();
712            }
713        }
714
715        trace!("Sending frame: {:?}", frame);
716        self.frame.buffer_frame(stream, frame).check_connection_reset(self.state)
717    }
718
719    /// Replace `additional_send` if it is currently a `Pong` message.
720    fn set_additional(&mut self, add: Frame) {
721        let empty_or_pong = self
722            .additional_send
723            .as_ref()
724            .map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong));
725        if empty_or_pong {
726            self.additional_send.replace(add);
727        }
728    }
729}
730
731/// The current connection state.
732#[derive(Debug, PartialEq, Eq, Clone, Copy)]
733enum WebSocketState {
734    /// The connection is active.
735    Active,
736    /// We initiated a close handshake.
737    ClosedByUs,
738    /// The peer initiated a close handshake.
739    ClosedByPeer,
740    /// The peer replied to our close handshake.
741    CloseAcknowledged,
742    /// The connection does not exist anymore.
743    Terminated,
744}
745
746impl WebSocketState {
747    /// Tell if we're allowed to process normal messages.
748    fn is_active(self) -> bool {
749        matches!(self, WebSocketState::Active)
750    }
751
752    /// Tell if we should process incoming data. Note that if we send a close frame
753    /// but the remote hasn't confirmed, they might have sent data before they receive our
754    /// close frame, so we should still pass those to client code, hence ClosedByUs is valid.
755    fn can_read(self) -> bool {
756        matches!(self, WebSocketState::Active | WebSocketState::ClosedByUs)
757    }
758
759    /// Check if the state is active, return error if not.
760    fn check_not_terminated(self) -> Result<()> {
761        match self {
762            WebSocketState::Terminated => Err(Error::AlreadyClosed),
763            _ => Ok(()),
764        }
765    }
766}
767
768/// Translate "Connection reset by peer" into `ConnectionClosed` if appropriate.
769trait CheckConnectionReset {
770    fn check_connection_reset(self, state: WebSocketState) -> Self;
771}
772
773impl<T> CheckConnectionReset for Result<T> {
774    fn check_connection_reset(self, state: WebSocketState) -> Self {
775        match self {
776            Err(Error::Io(io_error)) => Err({
777                if !state.can_read() && io_error.kind() == IoErrorKind::ConnectionReset {
778                    Error::ConnectionClosed
779                } else {
780                    Error::Io(io_error)
781                }
782            }),
783            x => x,
784        }
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use super::{Message, Role, WebSocket, WebSocketConfig};
791    use crate::error::{CapacityError, Error};
792
793    use std::{io, io::Cursor};
794
795    struct WriteMoc<Stream>(Stream);
796
797    impl<Stream> io::Write for WriteMoc<Stream> {
798        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
799            Ok(buf.len())
800        }
801        fn flush(&mut self) -> io::Result<()> {
802            Ok(())
803        }
804    }
805
806    impl<Stream: io::Read> io::Read for WriteMoc<Stream> {
807        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
808            self.0.read(buf)
809        }
810    }
811
812    #[test]
813    fn receive_messages() {
814        let incoming = Cursor::new(vec![
815            0x89, 0x02, 0x01, 0x02, 0x8a, 0x01, 0x03, 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f,
816            0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x82, 0x03, 0x01, 0x02,
817            0x03,
818        ]);
819        let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None);
820        assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2]));
821        assert_eq!(socket.read().unwrap(), Message::Pong(vec![3]));
822        assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into()));
823        assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
824    }
825
826    #[test]
827    fn size_limiting_text_fragmented() {
828        let incoming = Cursor::new(vec![
829            0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72,
830            0x6c, 0x64, 0x21,
831        ]);
832        let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() };
833        let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
834
835        assert!(matches!(
836            socket.read(),
837            Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 }))
838        ));
839    }
840
841    #[test]
842    fn size_limiting_binary() {
843        let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]);
844        let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() };
845        let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
846
847        assert!(matches!(
848            socket.read(),
849            Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 }))
850        ));
851    }
852}