libp2p_kad/
jobs.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Periodic (background) jobs.
22//!
23//! ## Record Persistence & Expiry
24//!
25//! To ensure persistence of records in the DHT, a Kademlia node
26//! must periodically (re-)publish and (re-)replicate its records:
27//!
28//!   1. (Re-)publishing: The original publisher or provider of a record
29//!      must regularly re-publish in order to prolong the expiration.
30//!
31//!   2. (Re-)replication: Every node storing a replica of a record must
32//!      regularly re-replicate it to the closest nodes to the key in
33//!      order to ensure the record is present at these nodes.
34//!
35//! Re-publishing primarily ensures persistence of the record beyond its
36//! initial TTL, for as long as the publisher stores (or provides) the record,
37//! whilst (re-)replication primarily ensures persistence for the duration
38//! of the TTL in the light of topology changes. Consequently, replication
39//! intervals should be shorter than publication intervals and
40//! publication intervals should be shorter than the TTL.
41//!
42//! This module implements two periodic jobs:
43//!
44//!   * [`PutRecordJob`]: For (re-)publication and (re-)replication of
45//!     regular (value-)records.
46//!
47//!   * [`AddProviderJob`]: For (re-)publication of provider records.
48//!     Provider records currently have no separate replication mechanism.
49//!
50//! A periodic job is driven like a `Future` or `Stream` by `poll`ing it.
51//! Once a job starts running it emits records to send to the `k` closest
52//! nodes to the key, where `k` is the replication factor.
53//!
54//! Furthermore, these jobs perform double-duty by removing expired records
55//! from the `RecordStore` on every run. Expired records are never emitted
56//! by the jobs.
57//!
58//! > **Note**: The current implementation takes a snapshot of the records
59//! > to replicate from the `RecordStore` when it starts and thus, to account
60//! > for the worst case, it temporarily requires additional memory proportional
61//! > to the size of all stored records. As a job runs, the records are moved
62//! > out of the job to the consumer, where they can be dropped after being sent.
63
64use crate::record_priv::{self, store::RecordStore, ProviderRecord, Record};
65use futures::prelude::*;
66use futures_timer::Delay;
67use instant::Instant;
68use libp2p_identity::PeerId;
69use std::collections::HashSet;
70use std::pin::Pin;
71use std::task::{Context, Poll};
72use std::time::Duration;
73use std::vec;
74
75/// The maximum number of queries towards which background jobs
76/// are allowed to start new queries on an invocation of
77/// `Behaviour::poll`.
78pub(crate) const JOBS_MAX_QUERIES: usize = 100;
79/// The maximum number of new queries started by a background job
80/// per invocation of `Behaviour::poll`.
81pub(crate) const JOBS_MAX_NEW_QUERIES: usize = 10;
82/// A background job run periodically.
83#[derive(Debug)]
84struct PeriodicJob<T> {
85    interval: Duration,
86    state: PeriodicJobState<T>,
87}
88
89impl<T> PeriodicJob<T> {
90    #[cfg(test)]
91    fn is_running(&self) -> bool {
92        match self.state {
93            PeriodicJobState::Running(..) => true,
94            PeriodicJobState::Waiting(..) => false,
95        }
96    }
97
98    /// Cuts short the remaining delay, if the job is currently waiting
99    /// for the delay to expire.
100    #[cfg(test)]
101    fn asap(&mut self) {
102        if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
103            let new_deadline = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
104            *deadline = new_deadline;
105            delay.reset(Duration::from_secs(1));
106        }
107    }
108
109    /// Returns `true` if the job is currently not running but ready
110    /// to be run, `false` otherwise.
111    fn check_ready(&mut self, cx: &mut Context<'_>, now: Instant) -> bool {
112        if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
113            if now >= *deadline || !Future::poll(Pin::new(delay), cx).is_pending() {
114                return true;
115            }
116        }
117        false
118    }
119}
120
121/// The state of a background job run periodically.
122#[derive(Debug)]
123enum PeriodicJobState<T> {
124    Running(T),
125    Waiting(Delay, Instant),
126}
127
128//////////////////////////////////////////////////////////////////////////////
129// PutRecordJob
130
131/// Periodic job for replicating / publishing records.
132pub(crate) struct PutRecordJob {
133    local_id: PeerId,
134    next_publish: Option<Instant>,
135    publish_interval: Option<Duration>,
136    record_ttl: Option<Duration>,
137    skipped: HashSet<record_priv::Key>,
138    inner: PeriodicJob<vec::IntoIter<Record>>,
139}
140
141impl PutRecordJob {
142    /// Creates a new periodic job for replicating and re-publishing
143    /// locally stored records.
144    pub(crate) fn new(
145        local_id: PeerId,
146        replicate_interval: Duration,
147        publish_interval: Option<Duration>,
148        record_ttl: Option<Duration>,
149    ) -> Self {
150        let now = Instant::now();
151        let deadline = now + replicate_interval;
152        let delay = Delay::new(replicate_interval);
153        let next_publish = publish_interval.map(|i| now + i);
154        Self {
155            local_id,
156            next_publish,
157            publish_interval,
158            record_ttl,
159            skipped: HashSet::new(),
160            inner: PeriodicJob {
161                interval: replicate_interval,
162                state: PeriodicJobState::Waiting(delay, deadline),
163            },
164        }
165    }
166
167    /// Adds the key of a record that is ignored on the current or
168    /// next run of the job.
169    pub(crate) fn skip(&mut self, key: record_priv::Key) {
170        self.skipped.insert(key);
171    }
172
173    /// Checks whether the job is currently running.
174    #[cfg(test)]
175    pub(crate) fn is_running(&self) -> bool {
176        self.inner.is_running()
177    }
178
179    /// Cuts short the remaining delay, if the job is currently waiting
180    /// for the delay to expire.
181    ///
182    /// The job is guaranteed to run on the next invocation of `poll`.
183    #[cfg(test)]
184    pub(crate) fn asap(&mut self, publish: bool) {
185        if publish {
186            self.next_publish = Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())
187        }
188        self.inner.asap()
189    }
190
191    /// Polls the job for records to replicate.
192    ///
193    /// Must be called in the context of a task. When `NotReady` is returned,
194    /// the current task is registered to be notified when the job is ready
195    /// to be run.
196    pub(crate) fn poll<T>(
197        &mut self,
198        cx: &mut Context<'_>,
199        store: &mut T,
200        now: Instant,
201    ) -> Poll<Record>
202    where
203        T: RecordStore,
204    {
205        if self.inner.check_ready(cx, now) {
206            let publish = self.next_publish.map_or(false, |t_pub| now >= t_pub);
207            let records = store
208                .records()
209                .filter_map(|r| {
210                    let is_publisher = r.publisher.as_ref() == Some(&self.local_id);
211                    if self.skipped.contains(&r.key) || (!publish && is_publisher) {
212                        None
213                    } else {
214                        let mut record = r.into_owned();
215                        if publish && is_publisher {
216                            record.expires = record
217                                .expires
218                                .or_else(|| self.record_ttl.map(|ttl| now + ttl));
219                        }
220                        Some(record)
221                    }
222                })
223                .collect::<Vec<_>>()
224                .into_iter();
225
226            // Schedule the next publishing run.
227            if publish {
228                self.next_publish = self.publish_interval.map(|i| now + i);
229            }
230
231            self.skipped.clear();
232
233            self.inner.state = PeriodicJobState::Running(records);
234        }
235
236        if let PeriodicJobState::Running(records) = &mut self.inner.state {
237            for r in records {
238                if r.is_expired(now) {
239                    store.remove(&r.key)
240                } else {
241                    return Poll::Ready(r);
242                }
243            }
244
245            // Wait for the next run.
246            let deadline = now + self.inner.interval;
247            let delay = Delay::new(self.inner.interval);
248            self.inner.state = PeriodicJobState::Waiting(delay, deadline);
249            assert!(!self.inner.check_ready(cx, now));
250        }
251
252        Poll::Pending
253    }
254}
255
256//////////////////////////////////////////////////////////////////////////////
257// AddProviderJob
258
259/// Periodic job for replicating provider records.
260pub(crate) struct AddProviderJob {
261    inner: PeriodicJob<vec::IntoIter<ProviderRecord>>,
262}
263
264impl AddProviderJob {
265    /// Creates a new periodic job for provider announcements.
266    pub(crate) fn new(interval: Duration) -> Self {
267        let now = Instant::now();
268        Self {
269            inner: PeriodicJob {
270                interval,
271                state: {
272                    let deadline = now + interval;
273                    PeriodicJobState::Waiting(Delay::new(interval), deadline)
274                },
275            },
276        }
277    }
278
279    /// Checks whether the job is currently running.
280    #[cfg(test)]
281    pub(crate) fn is_running(&self) -> bool {
282        self.inner.is_running()
283    }
284
285    /// Cuts short the remaining delay, if the job is currently waiting
286    /// for the delay to expire.
287    ///
288    /// The job is guaranteed to run on the next invocation of `poll`.
289    #[cfg(test)]
290    pub(crate) fn asap(&mut self) {
291        self.inner.asap()
292    }
293
294    /// Polls the job for provider records to replicate.
295    ///
296    /// Must be called in the context of a task. When `NotReady` is returned,
297    /// the current task is registered to be notified when the job is ready
298    /// to be run.
299    pub(crate) fn poll<T>(
300        &mut self,
301        cx: &mut Context<'_>,
302        store: &mut T,
303        now: Instant,
304    ) -> Poll<ProviderRecord>
305    where
306        T: RecordStore,
307    {
308        if self.inner.check_ready(cx, now) {
309            let records = store
310                .provided()
311                .map(|r| r.into_owned())
312                .collect::<Vec<_>>()
313                .into_iter();
314            self.inner.state = PeriodicJobState::Running(records);
315        }
316
317        if let PeriodicJobState::Running(keys) = &mut self.inner.state {
318            for r in keys {
319                if r.is_expired(now) {
320                    store.remove_provider(&r.key, &r.provider)
321                } else {
322                    return Poll::Ready(r);
323                }
324            }
325
326            let deadline = now + self.inner.interval;
327            let delay = Delay::new(self.inner.interval);
328            self.inner.state = PeriodicJobState::Waiting(delay, deadline);
329            assert!(!self.inner.check_ready(cx, now));
330        }
331
332        Poll::Pending
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use crate::record_priv::store::MemoryStore;
340    use futures::{executor::block_on, future::poll_fn};
341    use quickcheck::*;
342    use rand::Rng;
343
344    fn rand_put_record_job() -> PutRecordJob {
345        let mut rng = rand::thread_rng();
346        let id = PeerId::random();
347        let replicate_interval = Duration::from_secs(rng.gen_range(1..60));
348        let publish_interval = Some(replicate_interval * rng.gen_range(1..10));
349        let record_ttl = Some(Duration::from_secs(rng.gen_range(1..600)));
350        PutRecordJob::new(id, replicate_interval, publish_interval, record_ttl)
351    }
352
353    fn rand_add_provider_job() -> AddProviderJob {
354        let mut rng = rand::thread_rng();
355        let interval = Duration::from_secs(rng.gen_range(1..60));
356        AddProviderJob::new(interval)
357    }
358
359    #[test]
360    fn new_job_not_running() {
361        let job = rand_put_record_job();
362        assert!(!job.is_running());
363        let job = rand_add_provider_job();
364        assert!(!job.is_running());
365    }
366
367    #[test]
368    fn run_put_record_job() {
369        fn prop(records: Vec<Record>) {
370            let mut job = rand_put_record_job();
371            // Fill a record store.
372            let mut store = MemoryStore::new(job.local_id);
373            for r in records {
374                let _ = store.put(r);
375            }
376
377            block_on(poll_fn(|ctx| {
378                let now = Instant::now() + job.inner.interval;
379                // All (non-expired) records in the store must be yielded by the job.
380                for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() {
381                    if !r.is_expired(now) {
382                        assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
383                        assert!(job.is_running());
384                    }
385                }
386                assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
387                assert!(!job.is_running());
388                Poll::Ready(())
389            }));
390        }
391
392        quickcheck(prop as fn(_))
393    }
394
395    #[test]
396    fn run_add_provider_job() {
397        fn prop(records: Vec<ProviderRecord>) {
398            let mut job = rand_add_provider_job();
399            let id = PeerId::random();
400            // Fill a record store.
401            let mut store = MemoryStore::new(id);
402            for mut r in records {
403                r.provider = id;
404                let _ = store.add_provider(r);
405            }
406
407            block_on(poll_fn(|ctx| {
408                let now = Instant::now() + job.inner.interval;
409                // All (non-expired) records in the store must be yielded by the job.
410                for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() {
411                    if !r.is_expired(now) {
412                        assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
413                        assert!(job.is_running());
414                    }
415                }
416                assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
417                assert!(!job.is_running());
418                Poll::Ready(())
419            }));
420        }
421
422        quickcheck(prop as fn(_))
423    }
424}