libp2p_kad/jobs.rs
1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Periodic (background) jobs.
22//!
23//! ## Record Persistence & Expiry
24//!
25//! To ensure persistence of records in the DHT, a Kademlia node
26//! must periodically (re-)publish and (re-)replicate its records:
27//!
28//! 1. (Re-)publishing: The original publisher or provider of a record
29//! must regularly re-publish in order to prolong the expiration.
30//!
31//! 2. (Re-)replication: Every node storing a replica of a record must
32//! regularly re-replicate it to the closest nodes to the key in
33//! order to ensure the record is present at these nodes.
34//!
35//! Re-publishing primarily ensures persistence of the record beyond its
36//! initial TTL, for as long as the publisher stores (or provides) the record,
37//! whilst (re-)replication primarily ensures persistence for the duration
38//! of the TTL in the light of topology changes. Consequently, replication
39//! intervals should be shorter than publication intervals and
40//! publication intervals should be shorter than the TTL.
41//!
42//! This module implements two periodic jobs:
43//!
44//! * [`PutRecordJob`]: For (re-)publication and (re-)replication of
45//! regular (value-)records.
46//!
47//! * [`AddProviderJob`]: For (re-)publication of provider records.
48//! Provider records currently have no separate replication mechanism.
49//!
50//! A periodic job is driven like a `Future` or `Stream` by `poll`ing it.
51//! Once a job starts running it emits records to send to the `k` closest
52//! nodes to the key, where `k` is the replication factor.
53//!
54//! Furthermore, these jobs perform double-duty by removing expired records
55//! from the `RecordStore` on every run. Expired records are never emitted
56//! by the jobs.
57//!
58//! > **Note**: The current implementation takes a snapshot of the records
59//! > to replicate from the `RecordStore` when it starts and thus, to account
60//! > for the worst case, it temporarily requires additional memory proportional
61//! > to the size of all stored records. As a job runs, the records are moved
62//! > out of the job to the consumer, where they can be dropped after being sent.
63
64use crate::record_priv::{self, store::RecordStore, ProviderRecord, Record};
65use futures::prelude::*;
66use futures_timer::Delay;
67use instant::Instant;
68use libp2p_identity::PeerId;
69use std::collections::HashSet;
70use std::pin::Pin;
71use std::task::{Context, Poll};
72use std::time::Duration;
73use std::vec;
74
75/// The maximum number of queries towards which background jobs
76/// are allowed to start new queries on an invocation of
77/// `Behaviour::poll`.
78pub(crate) const JOBS_MAX_QUERIES: usize = 100;
79/// The maximum number of new queries started by a background job
80/// per invocation of `Behaviour::poll`.
81pub(crate) const JOBS_MAX_NEW_QUERIES: usize = 10;
82/// A background job run periodically.
83#[derive(Debug)]
84struct PeriodicJob<T> {
85 interval: Duration,
86 state: PeriodicJobState<T>,
87}
88
89impl<T> PeriodicJob<T> {
90 #[cfg(test)]
91 fn is_running(&self) -> bool {
92 match self.state {
93 PeriodicJobState::Running(..) => true,
94 PeriodicJobState::Waiting(..) => false,
95 }
96 }
97
98 /// Cuts short the remaining delay, if the job is currently waiting
99 /// for the delay to expire.
100 #[cfg(test)]
101 fn asap(&mut self) {
102 if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
103 let new_deadline = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
104 *deadline = new_deadline;
105 delay.reset(Duration::from_secs(1));
106 }
107 }
108
109 /// Returns `true` if the job is currently not running but ready
110 /// to be run, `false` otherwise.
111 fn check_ready(&mut self, cx: &mut Context<'_>, now: Instant) -> bool {
112 if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
113 if now >= *deadline || !Future::poll(Pin::new(delay), cx).is_pending() {
114 return true;
115 }
116 }
117 false
118 }
119}
120
121/// The state of a background job run periodically.
122#[derive(Debug)]
123enum PeriodicJobState<T> {
124 Running(T),
125 Waiting(Delay, Instant),
126}
127
128//////////////////////////////////////////////////////////////////////////////
129// PutRecordJob
130
131/// Periodic job for replicating / publishing records.
132pub(crate) struct PutRecordJob {
133 local_id: PeerId,
134 next_publish: Option<Instant>,
135 publish_interval: Option<Duration>,
136 record_ttl: Option<Duration>,
137 skipped: HashSet<record_priv::Key>,
138 inner: PeriodicJob<vec::IntoIter<Record>>,
139}
140
141impl PutRecordJob {
142 /// Creates a new periodic job for replicating and re-publishing
143 /// locally stored records.
144 pub(crate) fn new(
145 local_id: PeerId,
146 replicate_interval: Duration,
147 publish_interval: Option<Duration>,
148 record_ttl: Option<Duration>,
149 ) -> Self {
150 let now = Instant::now();
151 let deadline = now + replicate_interval;
152 let delay = Delay::new(replicate_interval);
153 let next_publish = publish_interval.map(|i| now + i);
154 Self {
155 local_id,
156 next_publish,
157 publish_interval,
158 record_ttl,
159 skipped: HashSet::new(),
160 inner: PeriodicJob {
161 interval: replicate_interval,
162 state: PeriodicJobState::Waiting(delay, deadline),
163 },
164 }
165 }
166
167 /// Adds the key of a record that is ignored on the current or
168 /// next run of the job.
169 pub(crate) fn skip(&mut self, key: record_priv::Key) {
170 self.skipped.insert(key);
171 }
172
173 /// Checks whether the job is currently running.
174 #[cfg(test)]
175 pub(crate) fn is_running(&self) -> bool {
176 self.inner.is_running()
177 }
178
179 /// Cuts short the remaining delay, if the job is currently waiting
180 /// for the delay to expire.
181 ///
182 /// The job is guaranteed to run on the next invocation of `poll`.
183 #[cfg(test)]
184 pub(crate) fn asap(&mut self, publish: bool) {
185 if publish {
186 self.next_publish = Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())
187 }
188 self.inner.asap()
189 }
190
191 /// Polls the job for records to replicate.
192 ///
193 /// Must be called in the context of a task. When `NotReady` is returned,
194 /// the current task is registered to be notified when the job is ready
195 /// to be run.
196 pub(crate) fn poll<T>(
197 &mut self,
198 cx: &mut Context<'_>,
199 store: &mut T,
200 now: Instant,
201 ) -> Poll<Record>
202 where
203 T: RecordStore,
204 {
205 if self.inner.check_ready(cx, now) {
206 let publish = self.next_publish.map_or(false, |t_pub| now >= t_pub);
207 let records = store
208 .records()
209 .filter_map(|r| {
210 let is_publisher = r.publisher.as_ref() == Some(&self.local_id);
211 if self.skipped.contains(&r.key) || (!publish && is_publisher) {
212 None
213 } else {
214 let mut record = r.into_owned();
215 if publish && is_publisher {
216 record.expires = record
217 .expires
218 .or_else(|| self.record_ttl.map(|ttl| now + ttl));
219 }
220 Some(record)
221 }
222 })
223 .collect::<Vec<_>>()
224 .into_iter();
225
226 // Schedule the next publishing run.
227 if publish {
228 self.next_publish = self.publish_interval.map(|i| now + i);
229 }
230
231 self.skipped.clear();
232
233 self.inner.state = PeriodicJobState::Running(records);
234 }
235
236 if let PeriodicJobState::Running(records) = &mut self.inner.state {
237 for r in records {
238 if r.is_expired(now) {
239 store.remove(&r.key)
240 } else {
241 return Poll::Ready(r);
242 }
243 }
244
245 // Wait for the next run.
246 let deadline = now + self.inner.interval;
247 let delay = Delay::new(self.inner.interval);
248 self.inner.state = PeriodicJobState::Waiting(delay, deadline);
249 assert!(!self.inner.check_ready(cx, now));
250 }
251
252 Poll::Pending
253 }
254}
255
256//////////////////////////////////////////////////////////////////////////////
257// AddProviderJob
258
259/// Periodic job for replicating provider records.
260pub(crate) struct AddProviderJob {
261 inner: PeriodicJob<vec::IntoIter<ProviderRecord>>,
262}
263
264impl AddProviderJob {
265 /// Creates a new periodic job for provider announcements.
266 pub(crate) fn new(interval: Duration) -> Self {
267 let now = Instant::now();
268 Self {
269 inner: PeriodicJob {
270 interval,
271 state: {
272 let deadline = now + interval;
273 PeriodicJobState::Waiting(Delay::new(interval), deadline)
274 },
275 },
276 }
277 }
278
279 /// Checks whether the job is currently running.
280 #[cfg(test)]
281 pub(crate) fn is_running(&self) -> bool {
282 self.inner.is_running()
283 }
284
285 /// Cuts short the remaining delay, if the job is currently waiting
286 /// for the delay to expire.
287 ///
288 /// The job is guaranteed to run on the next invocation of `poll`.
289 #[cfg(test)]
290 pub(crate) fn asap(&mut self) {
291 self.inner.asap()
292 }
293
294 /// Polls the job for provider records to replicate.
295 ///
296 /// Must be called in the context of a task. When `NotReady` is returned,
297 /// the current task is registered to be notified when the job is ready
298 /// to be run.
299 pub(crate) fn poll<T>(
300 &mut self,
301 cx: &mut Context<'_>,
302 store: &mut T,
303 now: Instant,
304 ) -> Poll<ProviderRecord>
305 where
306 T: RecordStore,
307 {
308 if self.inner.check_ready(cx, now) {
309 let records = store
310 .provided()
311 .map(|r| r.into_owned())
312 .collect::<Vec<_>>()
313 .into_iter();
314 self.inner.state = PeriodicJobState::Running(records);
315 }
316
317 if let PeriodicJobState::Running(keys) = &mut self.inner.state {
318 for r in keys {
319 if r.is_expired(now) {
320 store.remove_provider(&r.key, &r.provider)
321 } else {
322 return Poll::Ready(r);
323 }
324 }
325
326 let deadline = now + self.inner.interval;
327 let delay = Delay::new(self.inner.interval);
328 self.inner.state = PeriodicJobState::Waiting(delay, deadline);
329 assert!(!self.inner.check_ready(cx, now));
330 }
331
332 Poll::Pending
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use crate::record_priv::store::MemoryStore;
340 use futures::{executor::block_on, future::poll_fn};
341 use quickcheck::*;
342 use rand::Rng;
343
344 fn rand_put_record_job() -> PutRecordJob {
345 let mut rng = rand::thread_rng();
346 let id = PeerId::random();
347 let replicate_interval = Duration::from_secs(rng.gen_range(1..60));
348 let publish_interval = Some(replicate_interval * rng.gen_range(1..10));
349 let record_ttl = Some(Duration::from_secs(rng.gen_range(1..600)));
350 PutRecordJob::new(id, replicate_interval, publish_interval, record_ttl)
351 }
352
353 fn rand_add_provider_job() -> AddProviderJob {
354 let mut rng = rand::thread_rng();
355 let interval = Duration::from_secs(rng.gen_range(1..60));
356 AddProviderJob::new(interval)
357 }
358
359 #[test]
360 fn new_job_not_running() {
361 let job = rand_put_record_job();
362 assert!(!job.is_running());
363 let job = rand_add_provider_job();
364 assert!(!job.is_running());
365 }
366
367 #[test]
368 fn run_put_record_job() {
369 fn prop(records: Vec<Record>) {
370 let mut job = rand_put_record_job();
371 // Fill a record store.
372 let mut store = MemoryStore::new(job.local_id);
373 for r in records {
374 let _ = store.put(r);
375 }
376
377 block_on(poll_fn(|ctx| {
378 let now = Instant::now() + job.inner.interval;
379 // All (non-expired) records in the store must be yielded by the job.
380 for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() {
381 if !r.is_expired(now) {
382 assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
383 assert!(job.is_running());
384 }
385 }
386 assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
387 assert!(!job.is_running());
388 Poll::Ready(())
389 }));
390 }
391
392 quickcheck(prop as fn(_))
393 }
394
395 #[test]
396 fn run_add_provider_job() {
397 fn prop(records: Vec<ProviderRecord>) {
398 let mut job = rand_add_provider_job();
399 let id = PeerId::random();
400 // Fill a record store.
401 let mut store = MemoryStore::new(id);
402 for mut r in records {
403 r.provider = id;
404 let _ = store.add_provider(r);
405 }
406
407 block_on(poll_fn(|ctx| {
408 let now = Instant::now() + job.inner.interval;
409 // All (non-expired) records in the store must be yielded by the job.
410 for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() {
411 if !r.is_expired(now) {
412 assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
413 assert!(job.is_running());
414 }
415 }
416 assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
417 assert!(!job.is_running());
418 Poll::Ready(())
419 }));
420 }
421
422 quickcheck(prop as fn(_))
423 }
424}