soketto/
lib.rs

1// Copyright (c) 2019 Parity Technologies (UK) Ltd.
2// Copyright (c) 2016 twist developers
3//
4// Licensed under the Apache License, Version 2.0
5// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
6// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. All files in the project carrying such notice may not be copied,
8// modified, or distributed except according to those terms.
9
10//! An implementation of the [RFC 6455][rfc6455] websocket protocol.
11//!
12//! To begin a websocket connection one first needs to perform a [handshake],
13//! either as [client] or [server], in order to upgrade from HTTP.
14//! Once successful, the client or server can transition to a connection,
15//! i.e. a [Sender]/[Receiver] pair and send and receive textual or
16//! binary data.
17//!
18//! **Note**: While it is possible to only receive websocket messages it is
19//! not possible to only send websocket messages. Receiving data is required
20//! in order to react to control frames such as PING or CLOSE. While those will be
21//! answered transparently they have to be received in the first place, so
22//! calling [`connection::Receiver::receive`] is imperative.
23//!
24//! **Note**: None of the `async` methods are safe to cancel so their `Future`s
25//! must not be dropped unless they return `Poll::Ready`.
26//!
27//! # Client example
28//!
29//! ```no_run
30//! # use tokio_util::compat::TokioAsyncReadCompatExt;
31//! # async fn doc() -> Result<(), soketto::BoxedError> {
32//! use soketto::handshake::{Client, ServerResponse};
33//!
34//! // First, we need to establish a TCP connection.
35//! let socket = tokio::net::TcpStream::connect("...").await?;
36//!
37//! // Then we configure the client handshake.
38//! let mut client = Client::new(socket.compat(), "...", "/");
39//!
40//! // And finally we perform the handshake and handle the result.
41//! let (mut sender, mut receiver) = match client.handshake().await? {
42//!     ServerResponse::Accepted { .. } => client.into_builder().finish(),
43//!     ServerResponse::Redirect { status_code, location } => unimplemented!("follow location URL"),
44//!     ServerResponse::Rejected { status_code } => unimplemented!("handle failure")
45//! };
46//!
47//! // Over the established websocket connection we can send
48//! sender.send_text("some text").await?;
49//! sender.send_text("some more text").await?;
50//! sender.flush().await?;
51//!
52//! // ... and receive data.
53//! let mut data = Vec::new();
54//! receiver.receive_data(&mut data).await?;
55//!
56//! # Ok(())
57//! # }
58//!
59//! ```
60//!
61//! # Server example
62//!
63//! ```no_run
64//! # use tokio_util::compat::TokioAsyncReadCompatExt;
65//! # use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
66//! # async fn doc() -> Result<(), soketto::BoxedError> {
67//! use soketto::{handshake::{Server, ClientRequest, server::Response}};
68//!
69//! // First, we listen for incoming connections.
70//! let listener = tokio::net::TcpListener::bind("...").await?;
71//! let mut incoming = TcpListenerStream::new(listener);
72//!
73//! while let Some(socket) = incoming.next().await {
74//!     // For each incoming connection we perform a handshake.
75//!     let mut server = Server::new(socket?.compat());
76//!
77//!     let websocket_key = {
78//!         let req = server.receive_request().await?;
79//!         req.key()
80//!     };
81//!
82//!     // Here we accept the client unconditionally.
83//!     let accept = Response::Accept { key: websocket_key, protocol: None };
84//!     server.send_response(&accept).await?;
85//!
86//!     // And we can finally transition to a websocket connection.
87//!     let (mut sender, mut receiver) = server.into_builder().finish();
88//!
89//!     let mut data = Vec::new();
90//!     let data_type = receiver.receive_data(&mut data).await?;
91//!
92//!     if data_type.is_text() {
93//!         sender.send_text(std::str::from_utf8(&data)?).await?
94//!     } else {
95//!         sender.send_binary(&data).await?
96//!     }
97//!
98//!     sender.close().await?
99//! }
100//!
101//! # Ok(())
102//! # }
103//!
104//! ```
105//!
106//! See `examples/hyper_server.rs` from this crate's repository for an example of
107//! starting up a WebSocket server alongside an Hyper HTTP server.
108//!
109//! [client]: handshake::Client
110//! [server]: handshake::Server
111//! [Sender]: connection::Sender
112//! [Receiver]: connection::Receiver
113//! [rfc6455]: https://tools.ietf.org/html/rfc6455
114//! [handshake]: https://tools.ietf.org/html/rfc6455#section-4
115
116#![forbid(unsafe_code)]
117
118pub mod base;
119pub mod connection;
120pub mod data;
121pub mod extension;
122pub mod handshake;
123
124use bytes::BytesMut;
125use futures::io::{AsyncRead, AsyncReadExt};
126use std::io;
127
128pub use connection::{Mode, Receiver, Sender};
129pub use data::{Data, Incoming};
130
131pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
132
133/// A parsing result.
134#[derive(Debug, Clone)]
135pub enum Parsing<T, N = ()> {
136	/// Parsing completed.
137	Done {
138		/// The parsed value.
139		value: T,
140		/// The offset into the byte slice that has been consumed.
141		offset: usize,
142	},
143	/// Parsing is incomplete and needs more data.
144	NeedMore(N),
145}
146
147/// A buffer type used for implementing `Extension`s.
148#[derive(Debug)]
149pub enum Storage<'a> {
150	/// A read-only shared byte slice.
151	Shared(&'a [u8]),
152	/// A mutable byte slice.
153	Unique(&'a mut [u8]),
154	/// An owned byte buffer.
155	Owned(Vec<u8>),
156}
157
158impl AsRef<[u8]> for Storage<'_> {
159	fn as_ref(&self) -> &[u8] {
160		match self {
161			Storage::Shared(d) => d,
162			Storage::Unique(d) => d,
163			Storage::Owned(b) => b.as_ref(),
164		}
165	}
166}
167
168/// Helper function to allow casts from `usize` to `u64` only on platforms
169/// where the sizes are guaranteed to fit.
170#[cfg(any(target_pointer_width = "32", target_pointer_width = "64"))]
171const fn as_u64(a: usize) -> u64 {
172	a as u64
173}
174
175/// Fill the buffer from the given `AsyncRead` impl with up to `max` bytes.
176async fn read<R>(reader: &mut R, dest: &mut BytesMut, max: usize) -> io::Result<()>
177where
178	R: AsyncRead + Unpin,
179{
180	let i = dest.len();
181	dest.resize(i + max, 0u8);
182	let n = reader.read(&mut dest[i..]).await?;
183	dest.truncate(i + n);
184	if n == 0 {
185		return Err(io::ErrorKind::UnexpectedEof.into());
186	}
187	log::trace!("read {} bytes", n);
188	Ok(())
189}