sc_informant/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Console informant. Prints sync progress and block events. Runs on the calling thread.
20
21use console::style;
22use futures::prelude::*;
23use futures_timer::Delay;
24use log::{debug, info, trace};
25use sc_client_api::{BlockchainEvents, UsageProvider};
26use sc_network::NetworkStatusProvider;
27use sc_network_sync::{SyncStatusProvider, SyncingService};
28use sp_blockchain::HeaderMetadata;
29use sp_runtime::traits::{Block as BlockT, Header};
30use std::{collections::VecDeque, fmt::Display, sync::Arc, time::Duration};
31
32mod display;
33
34/// Creates a stream that returns a new value every `duration`.
35fn interval(duration: Duration) -> impl Stream<Item = ()> + Unpin {
36	futures::stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
37}
38
39/// Builds the informant and returns a `Future` that drives the informant.
40pub async fn build<B: BlockT, C, N>(client: Arc<C>, network: N, syncing: Arc<SyncingService<B>>)
41where
42	N: NetworkStatusProvider,
43	C: UsageProvider<B> + HeaderMetadata<B> + BlockchainEvents<B>,
44	<C as HeaderMetadata<B>>::Error: Display,
45{
46	let mut display = display::InformantDisplay::new();
47
48	let client_1 = client.clone();
49
50	let display_notifications = interval(Duration::from_millis(5000))
51		.filter_map(|_| async {
52			let net_status = network.status().await;
53			let sync_status = syncing.status().await;
54			let num_connected_peers = syncing.num_connected_peers();
55
56			match (net_status, sync_status) {
57				(Ok(net), Ok(sync)) => Some((net, sync, num_connected_peers)),
58				_ => None,
59			}
60		})
61		.for_each(move |(net_status, sync_status, num_connected_peers)| {
62			let info = client_1.usage_info();
63			if let Some(ref usage) = info.usage {
64				trace!(target: "usage", "Usage statistics: {}", usage);
65			} else {
66				trace!(
67					target: "usage",
68					"Usage statistics not displayed as backend does not provide it",
69				)
70			}
71			display.display(&info, net_status, sync_status, num_connected_peers);
72			future::ready(())
73		});
74
75	futures::select! {
76		() = display_notifications.fuse() => (),
77		() = display_block_import(client).fuse() => (),
78	};
79}
80
81fn display_block_import<B: BlockT, C>(client: Arc<C>) -> impl Future<Output = ()>
82where
83	C: UsageProvider<B> + HeaderMetadata<B> + BlockchainEvents<B>,
84	<C as HeaderMetadata<B>>::Error: Display,
85{
86	let mut last_best = {
87		let info = client.usage_info();
88		Some((info.chain.best_number, info.chain.best_hash))
89	};
90
91	// Hashes of the last blocks we have seen at import.
92	let mut last_blocks = VecDeque::new();
93	let max_blocks_to_track = 100;
94
95	client.import_notification_stream().for_each(move |n| {
96		// detect and log reorganizations.
97		if let Some((ref last_num, ref last_hash)) = last_best {
98			if n.header.parent_hash() != last_hash && n.is_new_best {
99				let maybe_ancestor =
100					sp_blockchain::lowest_common_ancestor(&*client, *last_hash, n.hash);
101
102				match maybe_ancestor {
103					Ok(ref ancestor) if ancestor.hash != *last_hash => info!(
104						"♻️  Reorg on #{},{} to #{},{}, common ancestor #{},{}",
105						style(last_num).red().bold(),
106						last_hash,
107						style(n.header.number()).green().bold(),
108						n.hash,
109						style(ancestor.number).white().bold(),
110						ancestor.hash,
111					),
112					Ok(_) => {},
113					Err(e) => debug!("Error computing tree route: {}", e),
114				}
115			}
116		}
117
118		if n.is_new_best {
119			last_best = Some((*n.header.number(), n.hash));
120		}
121
122		// If we already printed a message for a given block recently,
123		// we should not print it again.
124		if !last_blocks.contains(&n.hash) {
125			last_blocks.push_back(n.hash);
126
127			if last_blocks.len() > max_blocks_to_track {
128				last_blocks.pop_front();
129			}
130
131			let best_indicator = if n.is_new_best { "🏆" } else { "🆕" };
132			info!(
133				target: "substrate",
134				"{best_indicator} Imported #{} ({} → {})",
135				style(n.header.number()).white().bold(),
136				n.header.parent_hash(),
137				n.hash,
138			);
139		}
140
141		future::ready(())
142	})
143}