futures_timer/native/
arc_list.rs

1//! An atomically managed intrusive linked list of `Arc` nodes
2
3use std::marker;
4use std::ops::Deref;
5use std::sync::atomic::Ordering::SeqCst;
6use std::sync::atomic::{AtomicBool, AtomicPtr};
7use std::sync::Arc;
8
9pub struct ArcList<T> {
10    list: AtomicPtr<Node<T>>,
11    _marker: marker::PhantomData<T>,
12}
13
14impl<T> ArcList<T> {
15    pub fn new() -> ArcList<T> {
16        ArcList {
17            list: AtomicPtr::new(Node::EMPTY),
18            _marker: marker::PhantomData,
19        }
20    }
21
22    /// Pushes the `data` provided onto this list if it's not already enqueued
23    /// in this list.
24    ///
25    /// If `data` is already enqueued in this list then this is a noop,
26    /// otherwise, the `data` here is pushed on the end of the list.
27    pub fn push(&self, data: &Arc<Node<T>>) -> Result<(), ()> {
28        if data.enqueued.swap(true, SeqCst) {
29            // note that even if our list is sealed off then the other end is
30            // still guaranteed to see us because we were previously enqueued.
31            return Ok(());
32        }
33        let mut head = self.list.load(SeqCst);
34        let node = Arc::into_raw(data.clone()) as *mut Node<T>;
35        loop {
36            // If we've been sealed off, abort and return an error
37            if head == Node::SEALED {
38                unsafe {
39                    drop(Arc::from_raw(node as *mut Node<T>));
40                }
41                return Err(());
42            }
43
44            // Otherwise attempt to push this node
45            data.next.store(head, SeqCst);
46            match self.list.compare_exchange(head, node, SeqCst, SeqCst) {
47                Ok(_) => break Ok(()),
48                Err(new_head) => head = new_head,
49            }
50        }
51    }
52
53    /// Atomically empties this list, returning a new owned copy which can be
54    /// used to iterate over the entries.
55    pub fn take(&self) -> ArcList<T> {
56        let mut list = self.list.load(SeqCst);
57        loop {
58            if list == Node::SEALED {
59                break;
60            }
61            match self
62                .list
63                .compare_exchange(list, Node::EMPTY, SeqCst, SeqCst)
64            {
65                Ok(_) => break,
66                Err(l) => list = l,
67            }
68        }
69        ArcList {
70            list: AtomicPtr::new(list),
71            _marker: marker::PhantomData,
72        }
73    }
74
75    /// Atomically empties this list and prevents further successful calls to
76    /// `push`.
77    pub fn take_and_seal(&self) -> ArcList<T> {
78        ArcList {
79            list: AtomicPtr::new(self.list.swap(Node::SEALED, SeqCst)),
80            _marker: marker::PhantomData,
81        }
82    }
83
84    /// Removes the head of the list of nodes, returning `None` if this is an
85    /// empty list.
86    pub fn pop(&mut self) -> Option<Arc<Node<T>>> {
87        let head = *self.list.get_mut();
88        if head == Node::EMPTY || head == Node::SEALED {
89            return None;
90        }
91        let head = unsafe { Arc::from_raw(head as *const Node<T>) };
92        *self.list.get_mut() = head.next.load(SeqCst);
93        // At this point, the node is out of the list, so store `false` so we
94        // can enqueue it again and see further changes.
95        assert!(head.enqueued.swap(false, SeqCst));
96        Some(head)
97    }
98}
99
100impl<T> Drop for ArcList<T> {
101    fn drop(&mut self) {
102        while let Some(_) = self.pop() {
103            // ...
104        }
105    }
106}
107
108pub struct Node<T> {
109    next: AtomicPtr<Node<T>>,
110    enqueued: AtomicBool,
111    data: T,
112}
113
114impl<T> Node<T> {
115    const EMPTY: *mut Node<T> = std::ptr::null_mut();
116
117    const SEALED: *mut Node<T> = std::ptr::null_mut::<Node<T>>().wrapping_add(1);
118
119    pub fn new(data: T) -> Node<T> {
120        Node {
121            next: AtomicPtr::new(Node::EMPTY),
122            enqueued: AtomicBool::new(false),
123            data,
124        }
125    }
126}
127
128impl<T> Deref for Node<T> {
129    type Target = T;
130
131    fn deref(&self) -> &T {
132        &self.data
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn smoke() {
142        let a = ArcList::new();
143        let n = Arc::new(Node::new(1));
144        assert!(a.push(&n).is_ok());
145
146        let mut l = a.take();
147        assert_eq!(**l.pop().unwrap(), 1);
148        assert!(l.pop().is_none());
149    }
150
151    #[test]
152    fn seal() {
153        let a = ArcList::new();
154        let n = Arc::new(Node::new(1));
155        let mut l = a.take_and_seal();
156        assert!(l.pop().is_none());
157        assert!(a.push(&n).is_err());
158
159        assert!(a.take().pop().is_none());
160        assert!(a.take_and_seal().pop().is_none());
161    }
162}