sc_network_sync/service/
syncing_service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::types::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider};
20
21use futures::{channel::oneshot, Stream};
22use sc_network_types::PeerId;
23
24use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
25use sc_network::{NetworkBlock, NetworkSyncForkRequest};
26use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
27use sp_runtime::traits::{Block as BlockT, NumberFor};
28
29use std::{
30	pin::Pin,
31	sync::{
32		atomic::{AtomicBool, AtomicUsize, Ordering},
33		Arc,
34	},
35};
36
37/// Commands send to `SyncingEngine`
38pub enum ToServiceCommand<B: BlockT> {
39	SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
40	RequestJustification(B::Hash, NumberFor<B>),
41	ClearJustificationRequests,
42	BlocksProcessed(
43		usize,
44		usize,
45		Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
46	),
47	JustificationImported(PeerId, B::Hash, NumberFor<B>, bool),
48	AnnounceBlock(B::Hash, Option<Vec<u8>>),
49	NewBestBlockImported(B::Hash, NumberFor<B>),
50	EventStream(TracingUnboundedSender<SyncEvent>),
51	Status(oneshot::Sender<SyncStatus<B>>),
52	NumActivePeers(oneshot::Sender<usize>),
53	NumDownloadedBlocks(oneshot::Sender<usize>),
54	NumSyncRequests(oneshot::Sender<usize>),
55	PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
56	OnBlockFinalized(B::Hash, B::Header),
57	// Status {
58	// 	pending_response: oneshot::Sender<SyncStatus<B>>,
59	// },
60}
61
62/// Handle for communicating with `SyncingEngine` asynchronously
63#[derive(Clone)]
64pub struct SyncingService<B: BlockT> {
65	tx: TracingUnboundedSender<ToServiceCommand<B>>,
66	/// Number of peers we're connected to.
67	num_connected: Arc<AtomicUsize>,
68	/// Are we actively catching up with the chain?
69	is_major_syncing: Arc<AtomicBool>,
70}
71
72impl<B: BlockT> SyncingService<B> {
73	/// Create new handle
74	pub fn new(
75		tx: TracingUnboundedSender<ToServiceCommand<B>>,
76		num_connected: Arc<AtomicUsize>,
77		is_major_syncing: Arc<AtomicBool>,
78	) -> Self {
79		Self { tx, num_connected, is_major_syncing }
80	}
81
82	/// Get the number of peers known to `SyncingEngine` (both full and light).
83	pub fn num_connected_peers(&self) -> usize {
84		self.num_connected.load(Ordering::Relaxed)
85	}
86
87	/// Get the number of active peers.
88	pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
89		let (tx, rx) = oneshot::channel();
90		let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));
91
92		rx.await
93	}
94
95	/// Get the number of downloaded blocks.
96	pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
97		let (tx, rx) = oneshot::channel();
98		let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));
99
100		rx.await
101	}
102
103	/// Get the number of sync requests.
104	pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
105		let (tx, rx) = oneshot::channel();
106		let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));
107
108		rx.await
109	}
110
111	/// Get peer information.
112	pub async fn peers_info(
113		&self,
114	) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
115		let (tx, rx) = oneshot::channel();
116		let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));
117
118		rx.await
119	}
120
121	/// Notify the `SyncingEngine` that a block has been finalized.
122	pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
123		let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
124	}
125
126	/// Get sync status
127	///
128	/// Returns an error if `SyncingEngine` has terminated.
129	pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
130		let (tx, rx) = oneshot::channel();
131		let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
132
133		rx.await
134	}
135}
136
137impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
138	/// Configure an explicit fork sync request.
139	///
140	/// Note that this function should not be used for recent blocks.
141	/// Sync should be able to download all the recent forks normally.
142	/// `set_sync_fork_request` should only be used if external code detects that there's
143	/// a stale fork missing.
144	///
145	/// Passing empty `peers` set effectively removes the sync request.
146	fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
147		let _ = self
148			.tx
149			.unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
150	}
151}
152
153impl<B: BlockT> JustificationSyncLink<B> for SyncingService<B> {
154	/// Request a justification for the given block from the network.
155	///
156	/// On success, the justification will be passed to the import queue that was part at
157	/// initialization as part of the configuration.
158	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
159		let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
160	}
161
162	fn clear_justification_requests(&self) {
163		let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
164	}
165}
166
167#[async_trait::async_trait]
168impl<B: BlockT> SyncStatusProvider<B> for SyncingService<B> {
169	/// Get high-level view of the syncing status.
170	async fn status(&self) -> Result<SyncStatus<B>, ()> {
171		let (rtx, rrx) = oneshot::channel();
172
173		let _ = self.tx.unbounded_send(ToServiceCommand::Status(rtx));
174		rrx.await.map_err(|_| ())
175	}
176}
177
178impl<B: BlockT> Link<B> for SyncingService<B> {
179	fn blocks_processed(
180		&mut self,
181		imported: usize,
182		count: usize,
183		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
184	) {
185		let _ = self
186			.tx
187			.unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
188	}
189
190	fn justification_imported(
191		&mut self,
192		who: PeerId,
193		hash: &B::Hash,
194		number: NumberFor<B>,
195		success: bool,
196	) {
197		let _ = self
198			.tx
199			.unbounded_send(ToServiceCommand::JustificationImported(who, *hash, number, success));
200	}
201
202	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
203		let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
204	}
205}
206
207impl<B: BlockT> SyncEventStream for SyncingService<B> {
208	/// Get syncing event stream.
209	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
210		let (tx, rx) = tracing_unbounded(name, 100_000);
211		let _ = self.tx.unbounded_send(ToServiceCommand::EventStream(tx));
212		Box::pin(rx)
213	}
214}
215
216impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
217	fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
218		let _ = self.tx.unbounded_send(ToServiceCommand::AnnounceBlock(hash, data));
219	}
220
221	fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
222		let _ = self.tx.unbounded_send(ToServiceCommand::NewBestBlockImported(hash, number));
223	}
224}
225
226impl<B: BlockT> sp_consensus::SyncOracle for SyncingService<B> {
227	fn is_major_syncing(&self) -> bool {
228		self.is_major_syncing.load(Ordering::Relaxed)
229	}
230
231	fn is_offline(&self) -> bool {
232		self.num_connected.load(Ordering::Relaxed) == 0
233	}
234}