referrerpolicy=no-referrer-when-downgrade

sc_network_sync/
justification_requests.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Justification requests scheduling. [`ExtraRequests`] manages requesting justifications
20//! from peers taking into account forks and their finalization (dropping pending requests
21//! that don't make sense after one of the forks is finalized).
22
23use crate::{
24	strategy::chain_sync::{PeerSync, PeerSyncState},
25	LOG_TARGET,
26};
27use fork_tree::ForkTree;
28use log::{debug, trace, warn};
29use prometheus_endpoint::{
30	prometheus::core::GenericGauge, register, GaugeVec, Opts, PrometheusError, Registry, U64,
31};
32use sc_network_types::PeerId;
33use sp_blockchain::Error as ClientError;
34use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
35use std::{
36	collections::{HashMap, HashSet, VecDeque},
37	time::{Duration, Instant},
38};
39
40// Time to wait before trying to get the same extra data from the same peer.
41const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10);
42
43/// Pending extra data request for the given block (hash and number).
44type ExtraRequest<B> = (<B as BlockT>::Hash, NumberFor<B>);
45
46#[derive(Debug)]
47struct Metrics {
48	pending: GenericGauge<U64>,
49	active: GenericGauge<U64>,
50	failed: GenericGauge<U64>,
51	importing: GenericGauge<U64>,
52}
53
54impl Metrics {
55	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
56		let justifications = GaugeVec::<U64>::new(
57			Opts::new(
58				"substrate_sync_extra_justifications",
59				"Number of extra justifications requests",
60			),
61			&["status"],
62		)?;
63		let justifications = register(justifications, registry)?;
64
65		Ok(Self {
66			pending: justifications.with_label_values(&["pending"]),
67			active: justifications.with_label_values(&["active"]),
68			failed: justifications.with_label_values(&["failed"]),
69			importing: justifications.with_label_values(&["importing"]),
70		})
71	}
72}
73
74/// Manages pending block extra data (e.g. justification) requests.
75///
76/// Multiple extras may be requested for competing forks, or for the same branch
77/// at different (increasing) heights. This structure will guarantee that extras
78/// are fetched in-order, and that obsolete changes are pruned (when finalizing a
79/// competing fork).
80#[derive(Debug)]
81pub(crate) struct ExtraRequests<B: BlockT> {
82	tree: ForkTree<B::Hash, NumberFor<B>, ()>,
83	/// best finalized block number that we have seen since restart
84	best_seen_finalized_number: NumberFor<B>,
85	/// requests which have been queued for later processing
86	pending_requests: VecDeque<ExtraRequest<B>>,
87	/// requests which are currently underway to some peer
88	active_requests: HashMap<PeerId, ExtraRequest<B>>,
89	/// previous requests without response
90	failed_requests: HashMap<ExtraRequest<B>, Vec<(PeerId, Instant)>>,
91	/// successful requests
92	importing_requests: HashSet<ExtraRequest<B>>,
93	/// the name of this type of extra request (useful for logging.)
94	request_type_name: &'static str,
95	metrics: Option<Metrics>,
96}
97
98impl<B: BlockT> ExtraRequests<B> {
99	pub(crate) fn new(
100		request_type_name: &'static str,
101		metrics_registry: Option<&Registry>,
102	) -> Self {
103		Self {
104			tree: ForkTree::new(),
105			best_seen_finalized_number: Zero::zero(),
106			pending_requests: VecDeque::new(),
107			active_requests: HashMap::new(),
108			failed_requests: HashMap::new(),
109			importing_requests: HashSet::new(),
110			request_type_name,
111			metrics: metrics_registry.and_then(|registry| {
112				Metrics::register(registry)
113					.inspect_err(|error| {
114						log::error!(
115							target: LOG_TARGET,
116							"Failed to register `ExtraRequests` metrics {error}",
117						);
118					})
119					.ok()
120			}),
121		}
122	}
123
124	/// Reset all state as if returned from `new`.
125	pub(crate) fn reset(&mut self) {
126		self.tree = ForkTree::new();
127		self.pending_requests.clear();
128		self.active_requests.clear();
129		self.failed_requests.clear();
130
131		if let Some(metrics) = &self.metrics {
132			metrics.pending.set(0);
133			metrics.active.set(0);
134			metrics.failed.set(0);
135		}
136	}
137
138	/// Returns an iterator-like struct that yields peers which extra
139	/// requests can be sent to.
140	pub(crate) fn matcher(&mut self) -> Matcher<B> {
141		Matcher::new(self)
142	}
143
144	/// Queue an extra data request to be considered by the `Matcher`.
145	pub(crate) fn schedule<F>(&mut self, request: ExtraRequest<B>, is_descendent_of: F)
146	where
147		F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>,
148	{
149		match self.tree.import(request.0, request.1, (), &is_descendent_of) {
150			Ok(true) => {
151				// this is a new root so we add it to the current `pending_requests`
152				self.pending_requests.push_back((request.0, request.1));
153				if let Some(metrics) = &self.metrics {
154					metrics.pending.inc();
155				}
156			},
157			Err(fork_tree::Error::Revert) => {
158				// we have finalized further than the given request, presumably
159				// by some other part of the system (not sync). we can safely
160				// ignore the `Revert` error.
161			},
162			Err(err) => {
163				debug!(target: LOG_TARGET, "Failed to insert request {:?} into tree: {}", request, err);
164			},
165			_ => (),
166		}
167	}
168
169	/// Retry any pending request if a peer disconnected.
170	pub(crate) fn peer_disconnected(&mut self, who: &PeerId) {
171		if let Some(request) = self.active_requests.remove(who) {
172			self.pending_requests.push_front(request);
173			if let Some(metrics) = &self.metrics {
174				metrics.active.dec();
175				metrics.pending.inc();
176			}
177		}
178	}
179
180	/// Processes the response for the request previously sent to the given peer.
181	pub(crate) fn on_response<R>(
182		&mut self,
183		who: PeerId,
184		resp: Option<R>,
185	) -> Option<(PeerId, B::Hash, NumberFor<B>, R)> {
186		// we assume that the request maps to the given response, this is
187		// currently enforced by the outer network protocol before passing on
188		// messages to chain sync.
189		if let Some(request) = self.active_requests.remove(&who) {
190			if let Some(metrics) = &self.metrics {
191				metrics.active.dec();
192			}
193
194			if let Some(r) = resp {
195				trace!(target: LOG_TARGET,
196					"Queuing import of {} from {:?} for {:?}",
197					self.request_type_name, who, request,
198				);
199
200				if self.importing_requests.insert(request) {
201					if let Some(metrics) = &self.metrics {
202						metrics.importing.inc();
203					}
204				}
205				return Some((who, request.0, request.1, r))
206			} else {
207				trace!(target: LOG_TARGET,
208					"Empty {} response from {:?} for {:?}",
209					self.request_type_name, who, request,
210				);
211			}
212			self.failed_requests.entry(request).or_default().push((who, Instant::now()));
213			self.pending_requests.push_front(request);
214			if let Some(metrics) = &self.metrics {
215				metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
216				metrics.pending.inc();
217			}
218		} else {
219			trace!(target: LOG_TARGET,
220				"No active {} request to {:?}",
221				self.request_type_name, who,
222			);
223		}
224		None
225	}
226
227	/// Removes any pending extra requests for blocks lower than the given best finalized.
228	pub(crate) fn on_block_finalized<F>(
229		&mut self,
230		best_finalized_hash: &B::Hash,
231		best_finalized_number: NumberFor<B>,
232		is_descendent_of: F,
233	) -> Result<(), fork_tree::Error<ClientError>>
234	where
235		F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>,
236	{
237		let request = (*best_finalized_hash, best_finalized_number);
238
239		if self.try_finalize_root::<()>(request, Ok(request), false) {
240			return Ok(())
241		}
242
243		if best_finalized_number > self.best_seen_finalized_number {
244			// we receive finality notification only for the finalized branch head.
245			match self.tree.finalize_with_ancestors(
246				best_finalized_hash,
247				best_finalized_number,
248				&is_descendent_of,
249			) {
250				Err(fork_tree::Error::Revert) => {
251					// we might have finalized further already in which case we
252					// will get a `Revert` error which we can safely ignore.
253				},
254				Err(err) => return Err(err),
255				Ok(_) => {},
256			}
257
258			self.best_seen_finalized_number = best_finalized_number;
259		}
260
261		let roots = self.tree.roots().collect::<HashSet<_>>();
262
263		self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
264		self.active_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
265		self.failed_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));
266		if let Some(metrics) = &self.metrics {
267			metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
268			metrics.active.set(self.active_requests.len().try_into().unwrap_or(u64::MAX));
269			metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
270		}
271
272		Ok(())
273	}
274
275	/// Try to finalize pending root.
276	///
277	/// Returns true if import of this request has been scheduled.
278	pub(crate) fn try_finalize_root<E>(
279		&mut self,
280		request: ExtraRequest<B>,
281		result: Result<ExtraRequest<B>, E>,
282		reschedule_on_failure: bool,
283	) -> bool {
284		if !self.importing_requests.remove(&request) {
285			return false
286		}
287		if let Some(metrics) = &self.metrics {
288			metrics.importing.dec();
289		}
290
291		let (finalized_hash, finalized_number) = match result {
292			Ok(req) => (req.0, req.1),
293			Err(_) => {
294				if reschedule_on_failure {
295					self.pending_requests.push_front(request);
296					if let Some(metrics) = &self.metrics {
297						metrics.pending.inc();
298					}
299				}
300				return true
301			},
302		};
303
304		if self.tree.finalize_root(&finalized_hash).is_none() {
305			warn!(target: LOG_TARGET,
306				"‼️ Imported {:?} {:?} which isn't a root in the tree: {:?}",
307				finalized_hash, finalized_number, self.tree.roots().collect::<Vec<_>>()
308			);
309			return true
310		}
311
312		self.failed_requests.clear();
313		self.active_requests.clear();
314		self.pending_requests.clear();
315		self.pending_requests.extend(self.tree.roots().map(|(&h, &n, _)| (h, n)));
316		if let Some(metrics) = &self.metrics {
317			metrics.failed.set(0);
318			metrics.active.set(0);
319			metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
320		}
321		self.best_seen_finalized_number = finalized_number;
322
323		true
324	}
325
326	/// Returns an iterator over all active (in-flight) requests and associated peer id.
327	#[cfg(test)]
328	pub(crate) fn active_requests(&self) -> impl Iterator<Item = (&PeerId, &ExtraRequest<B>)> {
329		self.active_requests.iter()
330	}
331
332	/// Returns an iterator over all scheduled pending requests.
333	#[cfg(test)]
334	pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> {
335		self.pending_requests.iter()
336	}
337}
338
339/// Matches peers with pending extra requests.
340#[derive(Debug)]
341pub(crate) struct Matcher<'a, B: BlockT> {
342	/// Length of pending requests collection.
343	/// Used to ensure we do not loop more than once over all pending requests.
344	remaining: usize,
345	extras: &'a mut ExtraRequests<B>,
346}
347
348impl<'a, B: BlockT> Matcher<'a, B> {
349	fn new(extras: &'a mut ExtraRequests<B>) -> Self {
350		Self { remaining: extras.pending_requests.len(), extras }
351	}
352
353	/// Finds a peer to which a pending request can be sent.
354	///
355	/// Peers are filtered according to the current known best block (i.e. we won't
356	/// send an extra request for block #10 to a peer at block #2), and we also
357	/// throttle requests to the same peer if a previous request yielded no results.
358	///
359	/// This method returns as soon as it finds a peer that should be able to answer
360	/// our request. If no request is pending or no peer can handle it, `None` is
361	/// returned instead.
362	///
363	/// # Note
364	///
365	/// The returned `PeerId` (if any) is guaranteed to come from the given `peers`
366	/// argument.
367	pub(crate) fn next(
368		&mut self,
369		peers: &HashMap<PeerId, PeerSync<B>>,
370	) -> Option<(PeerId, ExtraRequest<B>)> {
371		if self.remaining == 0 {
372			return None
373		}
374
375		// clean up previously failed requests so we can retry again
376		for requests in self.extras.failed_requests.values_mut() {
377			requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT);
378		}
379		if let Some(metrics) = &self.extras.metrics {
380			metrics
381				.failed
382				.set(self.extras.failed_requests.len().try_into().unwrap_or(u64::MAX));
383		}
384
385		while let Some(request) = self.extras.pending_requests.pop_front() {
386			if let Some(metrics) = &self.extras.metrics {
387				metrics.pending.dec();
388			}
389
390			for (peer, sync) in
391				peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available)
392			{
393				// only ask peers that have synced at least up to the block number that we're asking
394				// the extra for
395				if sync.best_number < request.1 {
396					continue
397				}
398				// don't request to any peers that already have pending requests
399				if self.extras.active_requests.contains_key(peer) {
400					continue
401				}
402				// only ask if the same request has not failed for this peer before
403				if self
404					.extras
405					.failed_requests
406					.get(&request)
407					.map(|rr| rr.iter().any(|i| &i.0 == peer))
408					.unwrap_or(false)
409				{
410					continue
411				}
412				self.extras.active_requests.insert(*peer, request);
413				if let Some(metrics) = &self.extras.metrics {
414					metrics.active.inc();
415				}
416
417				trace!(target: LOG_TARGET,
418					"Sending {} request to {:?} for {:?}",
419					self.extras.request_type_name, peer, request,
420				);
421
422				return Some((*peer, request))
423			}
424
425			self.extras.pending_requests.push_back(request);
426			if let Some(metrics) = &self.extras.metrics {
427				metrics.pending.inc();
428			}
429			self.remaining -= 1;
430
431			if self.remaining == 0 {
432				break
433			}
434		}
435
436		None
437	}
438}
439
440#[cfg(test)]
441mod tests {
442	use super::*;
443	use crate::strategy::chain_sync::PeerSync;
444	use quickcheck::{Arbitrary, Gen, QuickCheck};
445	use sp_blockchain::Error as ClientError;
446	use sp_test_primitives::{Block, BlockNumber, Hash};
447	use std::collections::{HashMap, HashSet};
448
449	#[test]
450	fn requests_are_processed_in_order() {
451		fn property(mut peers: ArbitraryPeers) {
452			let mut requests = ExtraRequests::<Block>::new("test", None);
453
454			let num_peers_available =
455				peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
456
457			for i in 0..num_peers_available {
458				requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
459			}
460
461			let pending = requests.pending_requests.clone();
462			let mut m = requests.matcher();
463
464			for p in &pending {
465				let (peer, r) = m.next(&peers.0).unwrap();
466				assert_eq!(p, &r);
467				peers.0.get_mut(&peer).unwrap().state =
468					PeerSyncState::DownloadingJustification(r.0);
469			}
470		}
471
472		QuickCheck::new().quickcheck(property as fn(ArbitraryPeers))
473	}
474
475	#[test]
476	fn new_roots_schedule_new_request() {
477		fn property(data: Vec<BlockNumber>) {
478			let mut requests = ExtraRequests::<Block>::new("test", None);
479			for (i, number) in data.into_iter().enumerate() {
480				let hash = [i as u8; 32].into();
481				let pending = requests.pending_requests.len();
482				let is_root = requests.tree.roots().any(|(&h, &n, _)| hash == h && number == n);
483				requests.schedule((hash, number), |a, b| Ok(a[0] >= b[0]));
484				if !is_root {
485					assert_eq!(1 + pending, requests.pending_requests.len())
486				}
487			}
488		}
489		QuickCheck::new().quickcheck(property as fn(Vec<BlockNumber>))
490	}
491
492	#[test]
493	fn disconnecting_implies_rescheduling() {
494		fn property(mut peers: ArbitraryPeers) -> bool {
495			let mut requests = ExtraRequests::<Block>::new("test", None);
496
497			let num_peers_available =
498				peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
499
500			for i in 0..num_peers_available {
501				requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
502			}
503
504			let mut m = requests.matcher();
505			while let Some((peer, r)) = m.next(&peers.0) {
506				peers.0.get_mut(&peer).unwrap().state =
507					PeerSyncState::DownloadingJustification(r.0);
508			}
509
510			assert!(requests.pending_requests.is_empty());
511
512			let active_peers = requests.active_requests.keys().cloned().collect::<Vec<_>>();
513			let previously_active =
514				requests.active_requests.values().cloned().collect::<HashSet<_>>();
515
516			for peer in &active_peers {
517				requests.peer_disconnected(peer)
518			}
519
520			assert!(requests.active_requests.is_empty());
521
522			previously_active == requests.pending_requests.iter().cloned().collect::<HashSet<_>>()
523		}
524
525		QuickCheck::new().quickcheck(property as fn(ArbitraryPeers) -> bool)
526	}
527
528	#[test]
529	fn no_response_reschedules() {
530		fn property(mut peers: ArbitraryPeers) {
531			let mut requests = ExtraRequests::<Block>::new("test", None);
532
533			let num_peers_available =
534				peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
535
536			for i in 0..num_peers_available {
537				requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
538			}
539
540			let mut m = requests.matcher();
541			while let Some((peer, r)) = m.next(&peers.0) {
542				peers.0.get_mut(&peer).unwrap().state =
543					PeerSyncState::DownloadingJustification(r.0);
544			}
545
546			let active = requests.active_requests.iter().map(|(&p, &r)| (p, r)).collect::<Vec<_>>();
547
548			for (peer, req) in &active {
549				assert!(requests.failed_requests.get(req).is_none());
550				assert!(!requests.pending_requests.contains(req));
551				assert!(requests.on_response::<()>(*peer, None).is_none());
552				assert!(requests.pending_requests.contains(req));
553				assert_eq!(
554					1,
555					requests
556						.failed_requests
557						.get(req)
558						.unwrap()
559						.iter()
560						.filter(|(p, _)| p == peer)
561						.count()
562				)
563			}
564		}
565
566		QuickCheck::new().quickcheck(property as fn(ArbitraryPeers))
567	}
568
569	#[test]
570	fn request_is_rescheduled_when_earlier_block_is_finalized() {
571		sp_tracing::try_init_simple();
572
573		let mut finality_proofs = ExtraRequests::<Block>::new("test", None);
574
575		let hash4 = [4; 32].into();
576		let hash5 = [5; 32].into();
577		let hash6 = [6; 32].into();
578		let hash7 = [7; 32].into();
579
580		fn is_descendent_of(base: &Hash, target: &Hash) -> Result<bool, ClientError> {
581			Ok(target[0] >= base[0])
582		}
583
584		// make #4 last finalized block
585		finality_proofs.tree.import(hash4, 4, (), &is_descendent_of).unwrap();
586		finality_proofs.tree.finalize_root(&hash4);
587
588		// schedule request for #6
589		finality_proofs.schedule((hash6, 6), is_descendent_of);
590
591		// receive finality proof for #5
592		finality_proofs.importing_requests.insert((hash6, 6));
593		finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap();
594		finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash5, 5)), true);
595
596		// ensure that request for #6 is still pending
597		assert_eq!(finality_proofs.pending_requests.iter().collect::<Vec<_>>(), vec![&(hash6, 6)]);
598
599		// receive finality proof for #7
600		finality_proofs.importing_requests.insert((hash6, 6));
601		finality_proofs.on_block_finalized(&hash6, 6, is_descendent_of).unwrap();
602		finality_proofs.on_block_finalized(&hash7, 7, is_descendent_of).unwrap();
603		finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash7, 7)), true);
604
605		// ensure that there's no request for #6
606		assert_eq!(
607			finality_proofs.pending_requests.iter().collect::<Vec<_>>(),
608			Vec::<&(Hash, u64)>::new()
609		);
610	}
611
612	#[test]
613	fn ancestor_roots_are_finalized_when_finality_notification_is_missed() {
614		let mut finality_proofs = ExtraRequests::<Block>::new("test", None);
615
616		let hash4 = [4; 32].into();
617		let hash5 = [5; 32].into();
618
619		fn is_descendent_of(base: &Hash, target: &Hash) -> Result<bool, ClientError> {
620			Ok(target[0] >= base[0])
621		}
622
623		// schedule request for #4
624		finality_proofs.schedule((hash4, 4), is_descendent_of);
625
626		// receive finality notification for #5 (missing notification for #4!!!)
627		finality_proofs.importing_requests.insert((hash4, 5));
628		finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap();
629		assert_eq!(finality_proofs.tree.roots().count(), 0);
630	}
631
632	// Some Arbitrary instances to allow easy construction of random peer sets:
633
634	#[derive(Debug, Clone)]
635	struct ArbitraryPeerSyncState(PeerSyncState<Block>);
636
637	impl Arbitrary for ArbitraryPeerSyncState {
638		fn arbitrary(g: &mut Gen) -> Self {
639			let s = match u8::arbitrary(g) % 4 {
640				0 => PeerSyncState::Available,
641				// TODO: 1 => PeerSyncState::AncestorSearch(g.gen(), AncestorSearchState<B>),
642				1 => PeerSyncState::DownloadingNew(BlockNumber::arbitrary(g)),
643				2 => PeerSyncState::DownloadingStale(Hash::random()),
644				_ => PeerSyncState::DownloadingJustification(Hash::random()),
645			};
646			ArbitraryPeerSyncState(s)
647		}
648	}
649
650	#[derive(Debug, Clone)]
651	struct ArbitraryPeerSync(PeerSync<Block>);
652
653	impl Arbitrary for ArbitraryPeerSync {
654		fn arbitrary(g: &mut Gen) -> Self {
655			let ps = PeerSync {
656				peer_id: PeerId::random(),
657				common_number: u64::arbitrary(g),
658				best_hash: Hash::random(),
659				best_number: u64::arbitrary(g),
660				state: ArbitraryPeerSyncState::arbitrary(g).0,
661			};
662			ArbitraryPeerSync(ps)
663		}
664	}
665
666	#[derive(Debug, Clone)]
667	struct ArbitraryPeers(HashMap<PeerId, PeerSync<Block>>);
668
669	impl Arbitrary for ArbitraryPeers {
670		fn arbitrary(g: &mut Gen) -> Self {
671			let mut peers = HashMap::with_capacity(g.size());
672			for _ in 0..g.size() {
673				let ps = ArbitraryPeerSync::arbitrary(g).0;
674				peers.insert(ps.peer_id, ps);
675			}
676			ArbitraryPeers(peers)
677		}
678	}
679}