1use std::pin::Pin;
2use std::task::{Poll, Context};
3use futures::{Future, FutureExt, channel::oneshot, future::{select, Either, Shared, FusedFuture}, executor::block_on};
4
5#[derive(Clone)]
7pub struct Exit(Shared<oneshot::Receiver<()>>);
8
9impl Future for Exit {
10 type Output = ();
11
12 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
13 let receiver = &mut Pin::into_inner(self).0;
14
15 if receiver.is_terminated() {
16 Poll::Ready(())
17 } else {
18 Pin::new(receiver).poll(cx).map(drop)
19 }
20 }
21}
22
23impl Exit {
24 pub fn until<F: Future + Unpin>(self, future: F) -> impl Future<Output = Option<F::Output>> {
26 select(self, future)
27 .map(|either| match either {
28 Either::Left(_) => None,
29 Either::Right((output, _)) => Some(output)
30 })
31 }
32
33 pub fn wait(self) {
35 block_on(self)
36 }
37}
38
39pub struct Signal(oneshot::Sender<()>);
41
42impl Signal {
43 pub fn fire(self) -> Result<(), ()> {
45 self.0.send(())
46 }
47}
48
49pub fn signal() -> (Signal, Exit) {
52 let (sender, receiver) = oneshot::channel();
53 (Signal(sender), Exit(receiver.shared()))
54}
55
56#[cfg(test)]
57mod tests {
58 use futures::future::{join3, ready, pending, lazy};
59 use std::thread::{spawn, sleep};
60 use std::time::Duration;
61 use std::sync::Arc;
62 use super::*;
63
64 #[test]
65 fn it_works() {
66 let (signal, exit_a) = signal();
67 let exit_b = exit_a.clone();
68 let exit_c = exit_b.clone();
69
70 let barrier = Arc::new(::std::sync::Barrier::new(2));
71 let thread_barrier = barrier.clone();
72 let handle = spawn(move || {
73 let barrier = ::futures::future::lazy(move |_| {
74 thread_barrier.wait();
75 });
76
77 block_on(join3(exit_a, exit_b, barrier));
78 });
79
80 barrier.wait();
81 signal.fire().unwrap();
82
83 let _ = handle.join();
84 exit_c.wait()
85 }
86
87 #[test]
88 fn drop_signal() {
89 let (signal, exit) = signal();
90
91 let thread = spawn(move || {
92 sleep(Duration::from_secs(1));
93 drop(signal)
94 });
95
96 thread.join().unwrap();
97 exit.wait()
98 }
99
100 #[test]
101 fn many_exit_signals() {
102 let mut handles = Vec::new();
103 let (signal, exit) = signal();
104
105 for _ in 0 .. 100 {
106 let exit = exit.clone();
107 handles.push(spawn(move || {
108 sleep(Duration::from_secs(1));
109 exit.wait();
110 }));
111 }
112
113 signal.fire().unwrap();
114
115 for handle in handles {
116 handle.join().unwrap();
117 }
118 }
119
120 #[test]
121 fn exit_signal_are_send_and_sync() {
122 fn is_send_and_sync<T: Send + Sync>() {}
123
124 is_send_and_sync::<Exit>();
125 is_send_and_sync::<Signal>();
126 }
127
128 #[test]
129 fn work_until() {
130 let (signal, exit) = signal();
131 let work_a = exit.clone().until(ready(5));
132 assert_eq!(block_on(work_a), Some(5));
133
134 signal.fire().unwrap();
135 let work_b = exit.until(pending::<()>());
136 assert_eq!(block_on(work_b), None);
137 }
138
139 #[test]
140 fn works_from_other_thread() {
141 let (signal, exit) = signal();
142
143 ::std::thread::spawn(move || {
144 ::std::thread::sleep(::std::time::Duration::from_millis(2500));
145 signal.fire().unwrap();
146 });
147
148 block_on(exit);
149 }
150
151 #[test]
152 fn clone_works() {
153 let (_signal, mut exit) = signal();
154
155 let future = lazy(move |cx| {
156 let _ = Pin::new(&mut exit).poll(cx);
157
158 let mut exit2 = exit.clone();
159 let _ = Pin::new(&mut exit2).poll(cx);
160 });
161
162 block_on(future)
163 }
164}