polkadot_availability_recovery/
futures_undead.rs1use std::{
28 pin::Pin,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use futures::{future::BoxFuture, stream::FuturesUnordered, Future, Stream, StreamExt};
34use polkadot_node_subsystem_util::TimeoutExt;
35
36pub struct FuturesUndead<Output> {
41 inner: FuturesUnordered<Undead<Output>>,
43 next_sequence: SequenceNumber,
45 first_live: Option<SequenceNumber>,
47 undead: usize,
49}
50
51#[derive(Eq, PartialEq, Copy, Clone, Debug, PartialOrd)]
53struct SequenceNumber(usize);
54
55struct Undead<Output> {
56 inner: BoxFuture<'static, Output>,
57 our_sequence: SequenceNumber,
58}
59
60impl<Output> FuturesUndead<Output> {
61 pub fn new() -> Self {
62 Self {
63 inner: FuturesUnordered::new(),
64 next_sequence: SequenceNumber(0),
65 first_live: None,
66 undead: 0,
67 }
68 }
69
70 pub fn push(&mut self, f: BoxFuture<'static, Output>) {
71 self.inner.push(Undead { inner: f, our_sequence: self.next_sequence });
72 self.next_sequence.inc();
73 }
74
75 pub fn soft_cancel(&mut self) {
79 self.undead = self.inner.len();
80 self.first_live = Some(self.next_sequence);
81 }
82
83 pub fn len(&self) -> usize {
85 self.inner.len() - self.undead
86 }
87
88 pub fn total_len(&self) -> usize {
90 self.inner.len()
91 }
92
93 pub async fn next_with_timeout(&mut self, timeout: Duration) -> Option<Output> {
97 match self.next().timeout(timeout).await {
98 None => {
100 self.soft_cancel();
101 None
102 },
103 Some(inner) => inner,
104 }
105 }
106}
107
108impl<Output> Stream for FuturesUndead<Output> {
109 type Item = Output;
110
111 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 match self.inner.poll_next_unpin(cx) {
113 Poll::Pending => Poll::Pending,
114 Poll::Ready(None) => Poll::Ready(None),
115 Poll::Ready(Some((sequence, v))) => {
116 if self.inner.len() == 0 {
118 *self = Self::new();
119 return Poll::Ready(Some(v))
120 }
121
122 let first_live = match self.first_live {
123 None => return Poll::Ready(Some(v)),
124 Some(first_live) => first_live,
125 };
126 if sequence < first_live {
128 self.undead = self.undead.saturating_sub(1);
129 }
130 Poll::Ready(Some(v))
131 },
132 }
133 }
134}
135
136impl SequenceNumber {
137 pub fn inc(&mut self) {
138 self.0 = self.0.checked_add(1).expect(
139 "We don't expect an `UndeadFuture` to live long enough for 2^64 entries ever getting inserted."
140 );
141 }
142}
143
144impl<T> Future for Undead<T> {
145 type Output = (SequenceNumber, T);
146 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
147 match self.inner.as_mut().poll(cx) {
148 Poll::Pending => Poll::Pending,
149 Poll::Ready(v) => Poll::Ready((self.our_sequence, v)),
150 }
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157 use futures::{executor, pending, FutureExt};
158
159 #[test]
160 fn cancel_sets_len_to_zero() {
161 let mut undead = FuturesUndead::new();
162 undead.push((async { () }).boxed());
163 assert_eq!(undead.len(), 1);
164 undead.soft_cancel();
165 assert_eq!(undead.len(), 0);
166 }
167
168 #[test]
169 fn finished_undead_does_not_change_len() {
170 executor::block_on(async {
171 let mut undead = FuturesUndead::new();
172 undead.push(async { 1_i32 }.boxed());
173 undead.push(async { 2_i32 }.boxed());
174 assert_eq!(undead.len(), 2);
175 undead.soft_cancel();
176 assert_eq!(undead.len(), 0);
177 undead.push(
178 async {
179 pending!();
180 0_i32
181 }
182 .boxed(),
183 );
184 undead.next().await;
185 assert_eq!(undead.len(), 1);
186 undead.push(async { 9_i32 }.boxed());
187 undead.soft_cancel();
188 assert_eq!(undead.len(), 0);
189 });
190 }
191
192 #[test]
193 fn len_stays_correct_when_live_future_ends() {
194 executor::block_on(async {
195 let mut undead = FuturesUndead::new();
196 undead.push(
197 async {
198 pending!();
199 1_i32
200 }
201 .boxed(),
202 );
203 undead.push(
204 async {
205 pending!();
206 2_i32
207 }
208 .boxed(),
209 );
210 assert_eq!(undead.len(), 2);
211 undead.soft_cancel();
212 assert_eq!(undead.len(), 0);
213 undead.push(async { 0_i32 }.boxed());
214 undead.push(async { 1_i32 }.boxed());
215 undead.next().await;
216 assert_eq!(undead.len(), 1);
217 undead.next().await;
218 assert_eq!(undead.len(), 0);
219 undead.push(async { 9_i32 }.boxed());
220 assert_eq!(undead.len(), 1);
221 });
222 }
223
224 #[test]
225 fn cleanup_works() {
226 executor::block_on(async {
227 let mut undead = FuturesUndead::new();
228 undead.push(async { 1_i32 }.boxed());
229 undead.soft_cancel();
230 undead.push(async { 2_i32 }.boxed());
231 undead.next().await;
232 undead.next().await;
233 assert_eq!(undead.first_live, None);
234 });
235 }
236}