exit_future/
lib.rs

1use std::pin::Pin;
2use std::task::{Poll, Context};
3use futures::{Future, FutureExt, channel::oneshot, future::{select, Either, Shared, FusedFuture}, executor::block_on};
4
5/// Future that resolves when the exit signal has fired.
6#[derive(Clone)]
7pub struct Exit(Shared<oneshot::Receiver<()>>);
8
9impl Future for Exit {
10    type Output = ();
11
12    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
13        let receiver = &mut Pin::into_inner(self).0;
14
15        if receiver.is_terminated() {
16            Poll::Ready(())
17        } else {
18            Pin::new(receiver).poll(cx).map(drop)
19        }
20    }
21}
22
23impl Exit {
24     /// Perform given work until complete.
25    pub fn until<F: Future + Unpin>(self, future: F) -> impl Future<Output = Option<F::Output>> {
26        select(self, future)
27            .map(|either| match either {
28                Either::Left(_) => None,
29                Either::Right((output, _)) => Some(output)
30            })
31    }
32
33    /// Block the current thread until complete.
34    pub fn wait(self) {
35        block_on(self)
36    }
37}
38
39/// Exit signal that fires either manually or on drop.
40pub struct Signal(oneshot::Sender<()>);
41
42impl Signal {
43    /// Fire the signal manually.
44    pub fn fire(self) -> Result<(), ()> {
45        self.0.send(())
46    }
47}
48
49/// Create a signal and exit pair. `Exit` is a future that resolves when the `Signal` object is
50/// either dropped or has `fire` called on it.
51pub fn signal() -> (Signal, Exit) {
52    let (sender, receiver) = oneshot::channel();
53    (Signal(sender), Exit(receiver.shared()))
54}
55
56#[cfg(test)]
57mod tests {
58    use futures::future::{join3, ready, pending, lazy};
59    use std::thread::{spawn, sleep};
60    use std::time::Duration;
61    use std::sync::Arc;
62    use super::*;
63
64    #[test]
65    fn it_works() {
66        let (signal, exit_a) = signal();
67        let exit_b = exit_a.clone();
68        let exit_c = exit_b.clone();
69
70        let barrier = Arc::new(::std::sync::Barrier::new(2));
71        let thread_barrier = barrier.clone();
72        let handle = spawn(move || {
73            let barrier = ::futures::future::lazy(move |_| {
74                thread_barrier.wait();
75            });
76
77            block_on(join3(exit_a, exit_b, barrier));
78        });
79
80        barrier.wait();
81        signal.fire().unwrap();
82
83        let _ = handle.join();
84        exit_c.wait()
85    }
86
87    #[test]
88    fn drop_signal() {
89        let (signal, exit) = signal();
90
91        let thread = spawn(move || {
92            sleep(Duration::from_secs(1));
93            drop(signal)
94        });
95
96        thread.join().unwrap();
97        exit.wait()
98    }
99
100    #[test]
101    fn many_exit_signals() {
102        let mut handles = Vec::new();
103        let (signal, exit) = signal();
104
105        for _ in 0 .. 100 {
106            let exit = exit.clone();
107            handles.push(spawn(move || {
108                sleep(Duration::from_secs(1));
109                exit.wait();
110            }));
111        }
112
113        signal.fire().unwrap();
114
115        for handle in handles {
116            handle.join().unwrap();
117        }
118    }
119
120    #[test]
121    fn exit_signal_are_send_and_sync() {
122        fn is_send_and_sync<T: Send + Sync>() {}
123
124        is_send_and_sync::<Exit>();
125        is_send_and_sync::<Signal>();
126    }
127
128    #[test]
129    fn work_until() {
130        let (signal, exit) = signal();
131        let work_a = exit.clone().until(ready(5));
132        assert_eq!(block_on(work_a), Some(5));
133
134        signal.fire().unwrap();
135        let work_b = exit.until(pending::<()>());
136        assert_eq!(block_on(work_b), None);
137    }
138
139    #[test]
140    fn works_from_other_thread() {
141        let (signal, exit) = signal();
142
143        ::std::thread::spawn(move || {
144            ::std::thread::sleep(::std::time::Duration::from_millis(2500));
145            signal.fire().unwrap();
146        });
147
148        block_on(exit);
149    }
150
151    #[test]
152    fn clone_works() {
153        let (_signal, mut exit) = signal();
154
155        let future = lazy(move |cx| {
156            let _ = Pin::new(&mut exit).poll(cx);
157
158            let mut exit2 = exit.clone();
159            let _ = Pin::new(&mut exit2).poll(cx);
160        });
161
162        block_on(future)
163    }
164}