wasm_bindgen_futures/task/
singlethread.rs

1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::mem::ManuallyDrop;
4use std::pin::Pin;
5use std::rc::Rc;
6use std::task::{Context, RawWaker, RawWakerVTable, Waker};
7
8struct Inner {
9    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
10    waker: Waker,
11}
12
13pub(crate) struct Task {
14    // The actual Future that we're executing as part of this task.
15    //
16    // This is an Option so that the Future can be immediately dropped when it's
17    // finished
18    inner: RefCell<Option<Inner>>,
19
20    // This is used to ensure that the Task will only be queued once
21    is_queued: Cell<bool>,
22}
23
24impl Task {
25    pub(crate) fn spawn(future: Pin<Box<dyn Future<Output = ()> + 'static>>) {
26        let this = Rc::new(Self {
27            inner: RefCell::new(None),
28            is_queued: Cell::new(true),
29        });
30
31        let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) };
32
33        *this.inner.borrow_mut() = Some(Inner { future, waker });
34
35        crate::queue::QUEUE.with(|queue| queue.schedule_task(this));
36    }
37
38    fn force_wake(this: Rc<Self>) {
39        crate::queue::QUEUE.with(|queue| {
40            queue.push_task(this);
41        });
42    }
43
44    fn wake(this: Rc<Self>) {
45        // If we've already been placed on the run queue then there's no need to
46        // requeue ourselves since we're going to run at some point in the
47        // future anyway.
48        if this.is_queued.replace(true) {
49            return;
50        }
51
52        Self::force_wake(this);
53    }
54
55    fn wake_by_ref(this: &Rc<Self>) {
56        // If we've already been placed on the run queue then there's no need to
57        // requeue ourselves since we're going to run at some point in the
58        // future anyway.
59        if this.is_queued.replace(true) {
60            return;
61        }
62
63        Self::force_wake(Rc::clone(this));
64    }
65
66    /// Creates a standard library `RawWaker` from an `Rc` of ourselves.
67    ///
68    /// Note that in general this is wildly unsafe because everything with
69    /// Futures requires `Sync` + `Send` with regard to Wakers. For wasm,
70    /// however, everything is guaranteed to be singlethreaded (since we're
71    /// compiled without the `atomics` feature) so we "safely lie" and say our
72    /// `Rc` pointer is good enough.
73    ///
74    /// The implementation is based off of futures::task::ArcWake
75    unsafe fn into_raw_waker(this: Rc<Self>) -> RawWaker {
76        unsafe fn raw_clone(ptr: *const ()) -> RawWaker {
77            let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
78            Task::into_raw_waker(Rc::clone(&ptr))
79        }
80
81        unsafe fn raw_wake(ptr: *const ()) {
82            let ptr = Rc::from_raw(ptr as *const Task);
83            Task::wake(ptr);
84        }
85
86        unsafe fn raw_wake_by_ref(ptr: *const ()) {
87            let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
88            Task::wake_by_ref(&ptr);
89        }
90
91        unsafe fn raw_drop(ptr: *const ()) {
92            drop(Rc::from_raw(ptr as *const Task));
93        }
94
95        static VTABLE: RawWakerVTable =
96            RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);
97
98        RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE)
99    }
100
101    pub(crate) fn run(&self) {
102        let mut borrow = self.inner.borrow_mut();
103
104        // Wakeups can come in after a Future has finished and been destroyed,
105        // so handle this gracefully by just ignoring the request to run.
106        let inner = match borrow.as_mut() {
107            Some(inner) => inner,
108            None => return,
109        };
110
111        // Ensure that if poll calls `waker.wake()` we can get enqueued back on
112        // the run queue.
113        self.is_queued.set(false);
114
115        let poll = {
116            let mut cx = Context::from_waker(&inner.waker);
117            inner.future.as_mut().poll(&mut cx)
118        };
119
120        // If a future has finished (`Ready`) then clean up resources associated
121        // with the future ASAP. This ensures that we don't keep anything extra
122        // alive in-memory by accident. Our own struct, `Rc<Task>` won't
123        // actually go away until all wakers referencing us go away, which may
124        // take quite some time, so ensure that the heaviest of resources are
125        // released early.
126        if poll.is_ready() {
127            *borrow = None;
128        }
129    }
130}