use core::time::Duration;
use futures::prelude::*;
use sc_service::SpawnTaskHandle;
use smoldot::libp2p::{websocket, with_buffers};
use smoldot_light::platform::{
Address, ConnectError, ConnectionType, IpAddr, MultiStreamWebRtcConnection, PlatformRef,
use std::{net::SocketAddr, pin::Pin, time::Instant};
use tokio::net::TcpStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
type CompatTcpStream = Compat<TcpStream>;
pub struct TokioPlatform {
spawner: SpawnTaskHandle,
impl TokioPlatform {
pub fn new(spawner: SpawnTaskHandle) -> Self {
TokioPlatform { spawner }
impl PlatformRef for TokioPlatform {
type Delay = future::BoxFuture<'static, ()>;
type Instant = Instant;
type MultiStream = std::convert::Infallible;
type Stream = Stream;
type StreamConnectFuture = future::BoxFuture<'static, Result<Self::Stream, ConnectError>>;
type MultiStreamConnectFuture = future::BoxFuture<
Result<MultiStreamWebRtcConnection<Self::MultiStream>, ConnectError>,
type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a>;
type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
type StreamErrorRef<'a> = &'a std::io::Error;
type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>;
fn now_from_unix_epoch(&self) -> Duration {
fn now(&self) -> Self::Instant {
fn fill_random_bytes(&self, buffer: &mut [u8]) {
rand::RngCore::fill_bytes(&mut rand::thread_rng(), buffer);
fn sleep(&self, duration: Duration) -> Self::Delay {
fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
let duration = when.saturating_duration_since(Instant::now());
fn supports_connection_type(&self, connection_type: ConnectionType) -> bool {
ConnectionType::TcpIpv4 |
ConnectionType::TcpIpv6 |
ConnectionType::TcpDns |
ConnectionType::WebSocketIpv4 { .. } |
ConnectionType::WebSocketIpv6 { .. } |
ConnectionType::WebSocketDns { secure: false, .. }
fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture {
let (tcp_socket_addr, host_if_websocket): (
either::Either<SocketAddr, (String, u16)>,
) = match multiaddr {
Address::TcpDns { hostname, port } =>
(either::Right((hostname.to_string(), port)), None),
Address::TcpIp { ip: IpAddr::V4(ip), port } =>
(either::Left(SocketAddr::from((ip, port))), None),
Address::TcpIp { ip: IpAddr::V6(ip), port } =>
(either::Left(SocketAddr::from((ip, port))), None),
Address::WebSocketDns { hostname, port, secure: false } => (
either::Right((hostname.to_string(), port)),
Some(format!("{}:{}", hostname, port)),
Address::WebSocketIp { ip: IpAddr::V4(ip), port } => {
let addr = SocketAddr::from((ip, port));
(either::Left(addr), Some(addr.to_string()))
Address::WebSocketIp { ip: IpAddr::V6(ip), port } => {
let addr = SocketAddr::from((ip, port));
(either::Left(addr), Some(addr.to_string()))
_ => unreachable!(),
Box::pin(async move {
let tcp_socket = match tcp_socket_addr {
either::Left(socket_addr) => TcpStream::connect(socket_addr).await,
either::Right((dns, port)) => TcpStream::connect((&dns[..], port)).await,
if let Ok(tcp_socket) = &tcp_socket {
let _ = tcp_socket.set_nodelay(true);
let socket: TcpOrWs = match (tcp_socket, host_if_websocket) {
(Ok(tcp_socket), Some(host)) => future::Either::Right(
websocket::websocket_client_handshake(websocket::Config {
tcp_socket: tcp_socket.compat(),
host: &host,
url: "/",
.map_err(|err| ConnectError {
message: format!("Failed to negotiate WebSocket: {err}"),
(Ok(tcp_socket), None) => future::Either::Left(tcp_socket.compat()),
(Err(err), _) =>
return Err(ConnectError { message: format!("Failed to reach peer: {err}") }),
fn open_out_substream(&self, _c: &mut Self::MultiStream) {
fn next_substream<'a>(&self, c: &'a mut Self::MultiStream) -> Self::NextSubstreamFuture<'a> {
match *c {}
fn spawn_task(
_: std::borrow::Cow<str>,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawner.spawn("cumulus-internal-light-client-task", None, task)
fn client_name(&self) -> std::borrow::Cow<str> {
fn client_version(&self) -> std::borrow::Cow<str> {
fn connect_multistream(
_address: smoldot_light::platform::MultiStreamAddress,
) -> Self::MultiStreamConnectFuture {
unimplemented!("Multistream not supported!")
fn read_write_access<'a>(
stream: Pin<&'a mut Self::Stream>,
) -> Result<Self::ReadWriteAccess<'a>, &'a std::io::Error> {
let stream = stream.project();
fn wait_read_write_again<'a>(
stream: Pin<&'a mut Self::Stream>,
) -> Self::StreamUpdateFuture<'a> {
let stream = stream.project();
Box::pin(stream.0.wait_read_write_again(|when| async move {
type TcpOrWs = future::Either<CompatTcpStream, websocket::Connection<CompatTcpStream>>;
pub struct Stream(#[pin] with_buffers::WithBuffers<TcpOrWs>);