1use crate::{
24 strategy::chain_sync::{PeerSync, PeerSyncState},
25 LOG_TARGET,
26};
27use fork_tree::ForkTree;
28use log::{debug, trace, warn};
29use prometheus_endpoint::{
30 prometheus::core::GenericGauge, register, GaugeVec, Opts, PrometheusError, Registry, U64,
31};
32use sc_network_types::PeerId;
33use sp_blockchain::Error as ClientError;
34use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
35use std::{
36 collections::{HashMap, HashSet, VecDeque},
37 time::{Duration, Instant},
38};
39
40const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10);
42
43type ExtraRequest<B> = (<B as BlockT>::Hash, NumberFor<B>);
45
46#[derive(Debug)]
47struct Metrics {
48 pending: GenericGauge<U64>,
49 active: GenericGauge<U64>,
50 failed: GenericGauge<U64>,
51 importing: GenericGauge<U64>,
52}
53
54impl Metrics {
55 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
56 let justifications = GaugeVec::<U64>::new(
57 Opts::new(
58 "substrate_sync_extra_justifications",
59 "Number of extra justifications requests",
60 ),
61 &["status"],
62 )?;
63 let justifications = register(justifications, registry)?;
64
65 Ok(Self {
66 pending: justifications.with_label_values(&["pending"]),
67 active: justifications.with_label_values(&["active"]),
68 failed: justifications.with_label_values(&["failed"]),
69 importing: justifications.with_label_values(&["importing"]),
70 })
71 }
72}
73
74#[derive(Debug)]
81pub(crate) struct ExtraRequests<B: BlockT> {
82 tree: ForkTree<B::Hash, NumberFor<B>, ()>,
83 best_seen_finalized_number: NumberFor<B>,
85 pending_requests: VecDeque<ExtraRequest<B>>,
87 active_requests: HashMap<PeerId, ExtraRequest<B>>,
89 failed_requests: HashMap<ExtraRequest<B>, Vec<(PeerId, Instant)>>,
91 importing_requests: HashSet<ExtraRequest<B>>,
93 request_type_name: &'static str,
95 metrics: Option<Metrics>,
96}
97
98impl<B: BlockT> ExtraRequests<B> {
99 pub(crate) fn new(
100 request_type_name: &'static str,
101 metrics_registry: Option<&Registry>,
102 ) -> Self {
103 Self {
104 tree: ForkTree::new(),
105 best_seen_finalized_number: Zero::zero(),
106 pending_requests: VecDeque::new(),
107 active_requests: HashMap::new(),
108 failed_requests: HashMap::new(),
109 importing_requests: HashSet::new(),
110 request_type_name,
111 metrics: metrics_registry.and_then(|registry| {
112 Metrics::register(registry)
113 .inspect_err(|error| {
114 log::error!(
115 target: LOG_TARGET,
116 "Failed to register `ExtraRequests` metrics {error}",
117 );
118 })
119 .ok()
120 }),
121 }
122 }
123
124 pub(crate) fn reset(&mut self) {
126 self.tree = ForkTree::new();
127 self.pending_requests.clear();
128 self.active_requests.clear();
129 self.failed_requests.clear();
130
131 if let Some(metrics) = &self.metrics {
132 metrics.pending.set(0);
133 metrics.active.set(0);
134 metrics.failed.set(0);
135 }
136 }
137
138 pub(crate) fn matcher(&mut self) -> Matcher<B> {
141 Matcher::new(self)
142 }
143
144 pub(crate) fn schedule<F>(&mut self, request: ExtraRequest<B>, is_descendent_of: F)
146 where
147 F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>,
148 {
149 match self.tree.import(request.0, request.1, (), &is_descendent_of) {
150 Ok(true) => {
151 self.pending_requests.push_back((request.0, request.1));
153 if let Some(metrics) = &self.metrics {
154 metrics.pending.inc();
155 }
156 },
157 Err(fork_tree::Error::Revert) => {
158 },
162 Err(err) => {
163 debug!(target: LOG_TARGET, "Failed to insert request {:?} into tree: {}", request, err);
164 },
165 _ => (),
166 }
167 }
168
169 pub(crate) fn peer_disconnected(&mut self, who: &PeerId) {
171 if let Some(request) = self.active_requests.remove(who) {
172 self.pending_requests.push_front(request);
173 if let Some(metrics) = &self.metrics {
174 metrics.active.dec();
175 metrics.pending.inc();
176 }
177 }
178 }
179
180 pub(crate) fn on_response<R>(
182 &mut self,
183 who: PeerId,
184 resp: Option<R>,
185 ) -> Option<(PeerId, B::Hash, NumberFor<B>, R)> {
186 if let Some(request) = self.active_requests.remove(&who) {
190 if let Some(metrics) = &self.metrics {
191 metrics.active.dec();
192 }
193
194 if let Some(r) = resp {
195 trace!(target: LOG_TARGET,
196 "Queuing import of {} from {:?} for {:?}",
197 self.request_type_name, who, request,
198 );
199
200 if self.importing_requests.insert(request) {
201 if let Some(metrics) = &self.metrics {
202 metrics.importing.inc();
203 }
204 }
205 return Some((who, request.0, request.1, r))
206 } else {
207 trace!(target: LOG_TARGET,
208 "Empty {} response from {:?} for {:?}",
209 self.request_type_name, who, request,
210 );
211 }
212 self.failed_requests.entry(request).or_default().push((who, Instant::now()));
213 self.pending_requests.push_front(request);
214 if let Some(metrics) = &self.metrics {
215 metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
216 metrics.pending.inc();
217 }
218 } else {
219 trace!(target: LOG_TARGET,
220 "No active {} request to {:?}",
221 self.request_type_name, who,
222 );
223 }
224 None
225 }
226
227 pub(crate) fn on_block_finalized<F>(
229 &mut self,
230 best_finalized_hash: &B::Hash,
231 best_finalized_number: NumberFor<B>,
232 is_descendent_of: F,
233 ) -> Result<(), fork_tree::Error<ClientError>>
234 where
235 F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>,
236 {
237 let request = (*best_finalized_hash, best_finalized_number);
238
239 if self.try_finalize_root::<()>(request, Ok(request), false) {
240 return Ok(())
241 }
242
243 if best_finalized_number > self.best_seen_finalized_number {
244 match self.tree.finalize_with_ancestors(
246 best_finalized_hash,
247 best_finalized_number,
248 &is_descendent_of,
249 ) {
250 Err(fork_tree::Error::Revert) => {
251 },
254 Err(err) => return Err(err),
255 Ok(_) => {},
256 }
257
258 self.best_seen_finalized_number = best_finalized_number;
259 }
260
261 let roots = self.tree.roots().collect::<HashSet<_>>();
262
263 self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
264 self.active_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
265 self.failed_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));
266 if let Some(metrics) = &self.metrics {
267 metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
268 metrics.active.set(self.active_requests.len().try_into().unwrap_or(u64::MAX));
269 metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
270 }
271
272 Ok(())
273 }
274
275 pub(crate) fn try_finalize_root<E>(
279 &mut self,
280 request: ExtraRequest<B>,
281 result: Result<ExtraRequest<B>, E>,
282 reschedule_on_failure: bool,
283 ) -> bool {
284 if !self.importing_requests.remove(&request) {
285 return false
286 }
287 if let Some(metrics) = &self.metrics {
288 metrics.importing.dec();
289 }
290
291 let (finalized_hash, finalized_number) = match result {
292 Ok(req) => (req.0, req.1),
293 Err(_) => {
294 if reschedule_on_failure {
295 self.pending_requests.push_front(request);
296 if let Some(metrics) = &self.metrics {
297 metrics.pending.inc();
298 }
299 }
300 return true
301 },
302 };
303
304 if self.tree.finalize_root(&finalized_hash).is_none() {
305 warn!(target: LOG_TARGET,
306 "‼️ Imported {:?} {:?} which isn't a root in the tree: {:?}",
307 finalized_hash, finalized_number, self.tree.roots().collect::<Vec<_>>()
308 );
309 return true
310 }
311
312 self.failed_requests.clear();
313 self.active_requests.clear();
314 self.pending_requests.clear();
315 self.pending_requests.extend(self.tree.roots().map(|(&h, &n, _)| (h, n)));
316 if let Some(metrics) = &self.metrics {
317 metrics.failed.set(0);
318 metrics.active.set(0);
319 metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
320 }
321 self.best_seen_finalized_number = finalized_number;
322
323 true
324 }
325
326 #[cfg(test)]
328 pub(crate) fn active_requests(&self) -> impl Iterator<Item = (&PeerId, &ExtraRequest<B>)> {
329 self.active_requests.iter()
330 }
331
332 #[cfg(test)]
334 pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> {
335 self.pending_requests.iter()
336 }
337}
338
339#[derive(Debug)]
341pub(crate) struct Matcher<'a, B: BlockT> {
342 remaining: usize,
345 extras: &'a mut ExtraRequests<B>,
346}
347
348impl<'a, B: BlockT> Matcher<'a, B> {
349 fn new(extras: &'a mut ExtraRequests<B>) -> Self {
350 Self { remaining: extras.pending_requests.len(), extras }
351 }
352
353 pub(crate) fn next(
368 &mut self,
369 peers: &HashMap<PeerId, PeerSync<B>>,
370 ) -> Option<(PeerId, ExtraRequest<B>)> {
371 if self.remaining == 0 {
372 return None
373 }
374
375 for requests in self.extras.failed_requests.values_mut() {
377 requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT);
378 }
379 if let Some(metrics) = &self.extras.metrics {
380 metrics
381 .failed
382 .set(self.extras.failed_requests.len().try_into().unwrap_or(u64::MAX));
383 }
384
385 while let Some(request) = self.extras.pending_requests.pop_front() {
386 if let Some(metrics) = &self.extras.metrics {
387 metrics.pending.dec();
388 }
389
390 for (peer, sync) in
391 peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available)
392 {
393 if sync.best_number < request.1 {
396 continue
397 }
398 if self.extras.active_requests.contains_key(peer) {
400 continue
401 }
402 if self
404 .extras
405 .failed_requests
406 .get(&request)
407 .map(|rr| rr.iter().any(|i| &i.0 == peer))
408 .unwrap_or(false)
409 {
410 continue
411 }
412 self.extras.active_requests.insert(*peer, request);
413 if let Some(metrics) = &self.extras.metrics {
414 metrics.active.inc();
415 }
416
417 trace!(target: LOG_TARGET,
418 "Sending {} request to {:?} for {:?}",
419 self.extras.request_type_name, peer, request,
420 );
421
422 return Some((*peer, request))
423 }
424
425 self.extras.pending_requests.push_back(request);
426 if let Some(metrics) = &self.extras.metrics {
427 metrics.pending.inc();
428 }
429 self.remaining -= 1;
430
431 if self.remaining == 0 {
432 break
433 }
434 }
435
436 None
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use crate::strategy::chain_sync::PeerSync;
444 use quickcheck::{Arbitrary, Gen, QuickCheck};
445 use sp_blockchain::Error as ClientError;
446 use sp_test_primitives::{Block, BlockNumber, Hash};
447 use std::collections::{HashMap, HashSet};
448
449 #[test]
450 fn requests_are_processed_in_order() {
451 fn property(mut peers: ArbitraryPeers) {
452 let mut requests = ExtraRequests::<Block>::new("test", None);
453
454 let num_peers_available =
455 peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
456
457 for i in 0..num_peers_available {
458 requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
459 }
460
461 let pending = requests.pending_requests.clone();
462 let mut m = requests.matcher();
463
464 for p in &pending {
465 let (peer, r) = m.next(&peers.0).unwrap();
466 assert_eq!(p, &r);
467 peers.0.get_mut(&peer).unwrap().state =
468 PeerSyncState::DownloadingJustification(r.0);
469 }
470 }
471
472 QuickCheck::new().quickcheck(property as fn(ArbitraryPeers))
473 }
474
475 #[test]
476 fn new_roots_schedule_new_request() {
477 fn property(data: Vec<BlockNumber>) {
478 let mut requests = ExtraRequests::<Block>::new("test", None);
479 for (i, number) in data.into_iter().enumerate() {
480 let hash = [i as u8; 32].into();
481 let pending = requests.pending_requests.len();
482 let is_root = requests.tree.roots().any(|(&h, &n, _)| hash == h && number == n);
483 requests.schedule((hash, number), |a, b| Ok(a[0] >= b[0]));
484 if !is_root {
485 assert_eq!(1 + pending, requests.pending_requests.len())
486 }
487 }
488 }
489 QuickCheck::new().quickcheck(property as fn(Vec<BlockNumber>))
490 }
491
492 #[test]
493 fn disconnecting_implies_rescheduling() {
494 fn property(mut peers: ArbitraryPeers) -> bool {
495 let mut requests = ExtraRequests::<Block>::new("test", None);
496
497 let num_peers_available =
498 peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
499
500 for i in 0..num_peers_available {
501 requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
502 }
503
504 let mut m = requests.matcher();
505 while let Some((peer, r)) = m.next(&peers.0) {
506 peers.0.get_mut(&peer).unwrap().state =
507 PeerSyncState::DownloadingJustification(r.0);
508 }
509
510 assert!(requests.pending_requests.is_empty());
511
512 let active_peers = requests.active_requests.keys().cloned().collect::<Vec<_>>();
513 let previously_active =
514 requests.active_requests.values().cloned().collect::<HashSet<_>>();
515
516 for peer in &active_peers {
517 requests.peer_disconnected(peer)
518 }
519
520 assert!(requests.active_requests.is_empty());
521
522 previously_active == requests.pending_requests.iter().cloned().collect::<HashSet<_>>()
523 }
524
525 QuickCheck::new().quickcheck(property as fn(ArbitraryPeers) -> bool)
526 }
527
528 #[test]
529 fn no_response_reschedules() {
530 fn property(mut peers: ArbitraryPeers) {
531 let mut requests = ExtraRequests::<Block>::new("test", None);
532
533 let num_peers_available =
534 peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
535
536 for i in 0..num_peers_available {
537 requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
538 }
539
540 let mut m = requests.matcher();
541 while let Some((peer, r)) = m.next(&peers.0) {
542 peers.0.get_mut(&peer).unwrap().state =
543 PeerSyncState::DownloadingJustification(r.0);
544 }
545
546 let active = requests.active_requests.iter().map(|(&p, &r)| (p, r)).collect::<Vec<_>>();
547
548 for (peer, req) in &active {
549 assert!(requests.failed_requests.get(req).is_none());
550 assert!(!requests.pending_requests.contains(req));
551 assert!(requests.on_response::<()>(*peer, None).is_none());
552 assert!(requests.pending_requests.contains(req));
553 assert_eq!(
554 1,
555 requests
556 .failed_requests
557 .get(req)
558 .unwrap()
559 .iter()
560 .filter(|(p, _)| p == peer)
561 .count()
562 )
563 }
564 }
565
566 QuickCheck::new().quickcheck(property as fn(ArbitraryPeers))
567 }
568
569 #[test]
570 fn request_is_rescheduled_when_earlier_block_is_finalized() {
571 sp_tracing::try_init_simple();
572
573 let mut finality_proofs = ExtraRequests::<Block>::new("test", None);
574
575 let hash4 = [4; 32].into();
576 let hash5 = [5; 32].into();
577 let hash6 = [6; 32].into();
578 let hash7 = [7; 32].into();
579
580 fn is_descendent_of(base: &Hash, target: &Hash) -> Result<bool, ClientError> {
581 Ok(target[0] >= base[0])
582 }
583
584 finality_proofs.tree.import(hash4, 4, (), &is_descendent_of).unwrap();
586 finality_proofs.tree.finalize_root(&hash4);
587
588 finality_proofs.schedule((hash6, 6), is_descendent_of);
590
591 finality_proofs.importing_requests.insert((hash6, 6));
593 finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap();
594 finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash5, 5)), true);
595
596 assert_eq!(finality_proofs.pending_requests.iter().collect::<Vec<_>>(), vec![&(hash6, 6)]);
598
599 finality_proofs.importing_requests.insert((hash6, 6));
601 finality_proofs.on_block_finalized(&hash6, 6, is_descendent_of).unwrap();
602 finality_proofs.on_block_finalized(&hash7, 7, is_descendent_of).unwrap();
603 finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash7, 7)), true);
604
605 assert_eq!(
607 finality_proofs.pending_requests.iter().collect::<Vec<_>>(),
608 Vec::<&(Hash, u64)>::new()
609 );
610 }
611
612 #[test]
613 fn ancestor_roots_are_finalized_when_finality_notification_is_missed() {
614 let mut finality_proofs = ExtraRequests::<Block>::new("test", None);
615
616 let hash4 = [4; 32].into();
617 let hash5 = [5; 32].into();
618
619 fn is_descendent_of(base: &Hash, target: &Hash) -> Result<bool, ClientError> {
620 Ok(target[0] >= base[0])
621 }
622
623 finality_proofs.schedule((hash4, 4), is_descendent_of);
625
626 finality_proofs.importing_requests.insert((hash4, 5));
628 finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap();
629 assert_eq!(finality_proofs.tree.roots().count(), 0);
630 }
631
632 #[derive(Debug, Clone)]
635 struct ArbitraryPeerSyncState(PeerSyncState<Block>);
636
637 impl Arbitrary for ArbitraryPeerSyncState {
638 fn arbitrary(g: &mut Gen) -> Self {
639 let s = match u8::arbitrary(g) % 4 {
640 0 => PeerSyncState::Available,
641 1 => PeerSyncState::DownloadingNew(BlockNumber::arbitrary(g)),
643 2 => PeerSyncState::DownloadingStale(Hash::random()),
644 _ => PeerSyncState::DownloadingJustification(Hash::random()),
645 };
646 ArbitraryPeerSyncState(s)
647 }
648 }
649
650 #[derive(Debug, Clone)]
651 struct ArbitraryPeerSync(PeerSync<Block>);
652
653 impl Arbitrary for ArbitraryPeerSync {
654 fn arbitrary(g: &mut Gen) -> Self {
655 let ps = PeerSync {
656 peer_id: PeerId::random(),
657 common_number: u64::arbitrary(g),
658 best_hash: Hash::random(),
659 best_number: u64::arbitrary(g),
660 state: ArbitraryPeerSyncState::arbitrary(g).0,
661 };
662 ArbitraryPeerSync(ps)
663 }
664 }
665
666 #[derive(Debug, Clone)]
667 struct ArbitraryPeers(HashMap<PeerId, PeerSync<Block>>);
668
669 impl Arbitrary for ArbitraryPeers {
670 fn arbitrary(g: &mut Gen) -> Self {
671 let mut peers = HashMap::with_capacity(g.size());
672 for _ in 0..g.size() {
673 let ps = ArbitraryPeerSync::arbitrary(g).0;
674 peers.insert(ps.peer_id, ps);
675 }
676 ArbitraryPeers(peers)
677 }
678 }
679}