referrerpolicy=no-referrer-when-downgrade

sc_network_sync/service/
syncing_service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::types::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider};
20
21use futures::{channel::oneshot, Stream};
22use sc_network_types::PeerId;
23
24use sc_consensus::{
25	BlockImportError, BlockImportStatus, JustificationImportResult, JustificationSyncLink, Link,
26};
27use sc_network::{NetworkBlock, NetworkSyncForkRequest};
28use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
29use sp_runtime::traits::{Block as BlockT, NumberFor};
30
31use std::{
32	pin::Pin,
33	sync::{
34		atomic::{AtomicBool, AtomicUsize, Ordering},
35		Arc,
36	},
37};
38
39/// Commands send to `SyncingEngine`
40pub enum ToServiceCommand<B: BlockT> {
41	SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
42	RequestJustification(B::Hash, NumberFor<B>),
43	ClearJustificationRequests,
44	BlocksProcessed(
45		usize,
46		usize,
47		Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
48	),
49	JustificationImported(PeerId, B::Hash, NumberFor<B>, JustificationImportResult),
50	AnnounceBlock(B::Hash, Option<Vec<u8>>),
51	NewBestBlockImported(B::Hash, NumberFor<B>),
52	EventStream(TracingUnboundedSender<SyncEvent>),
53	Status(oneshot::Sender<SyncStatus<B>>),
54	NumActivePeers(oneshot::Sender<usize>),
55	NumDownloadedBlocks(oneshot::Sender<usize>),
56	NumSyncRequests(oneshot::Sender<usize>),
57	PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
58	OnBlockFinalized(B::Hash, B::Header),
59	// Status {
60	// 	pending_response: oneshot::Sender<SyncStatus<B>>,
61	// },
62}
63
64/// Handle for communicating with `SyncingEngine` asynchronously
65#[derive(Clone)]
66pub struct SyncingService<B: BlockT> {
67	tx: TracingUnboundedSender<ToServiceCommand<B>>,
68	/// Number of peers we're connected to.
69	num_connected: Arc<AtomicUsize>,
70	/// Are we actively catching up with the chain?
71	is_major_syncing: Arc<AtomicBool>,
72}
73
74impl<B: BlockT> SyncingService<B> {
75	/// Create new handle
76	pub fn new(
77		tx: TracingUnboundedSender<ToServiceCommand<B>>,
78		num_connected: Arc<AtomicUsize>,
79		is_major_syncing: Arc<AtomicBool>,
80	) -> Self {
81		Self { tx, num_connected, is_major_syncing }
82	}
83
84	/// Get the number of peers known to `SyncingEngine` (both full and light).
85	pub fn num_connected_peers(&self) -> usize {
86		self.num_connected.load(Ordering::Relaxed)
87	}
88
89	/// Get the number of active peers.
90	pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
91		let (tx, rx) = oneshot::channel();
92		let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));
93
94		rx.await
95	}
96
97	/// Get the number of downloaded blocks.
98	pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
99		let (tx, rx) = oneshot::channel();
100		let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));
101
102		rx.await
103	}
104
105	/// Get the number of sync requests.
106	pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
107		let (tx, rx) = oneshot::channel();
108		let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));
109
110		rx.await
111	}
112
113	/// Get peer information.
114	pub async fn peers_info(
115		&self,
116	) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
117		let (tx, rx) = oneshot::channel();
118		let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));
119
120		rx.await
121	}
122
123	/// Notify the `SyncingEngine` that a block has been finalized.
124	pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
125		let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
126	}
127
128	/// Get sync status
129	///
130	/// Returns an error if `SyncingEngine` has terminated.
131	pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
132		let (tx, rx) = oneshot::channel();
133		let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
134
135		rx.await
136	}
137}
138
139impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
140	/// Configure an explicit fork sync request.
141	///
142	/// Note that this function should not be used for recent blocks.
143	/// Sync should be able to download all the recent forks normally.
144	/// `set_sync_fork_request` should only be used if external code detects that there's
145	/// a stale fork missing.
146	///
147	/// Passing empty `peers` set effectively removes the sync request.
148	fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
149		let _ = self
150			.tx
151			.unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
152	}
153}
154
155impl<B: BlockT> JustificationSyncLink<B> for SyncingService<B> {
156	/// Request a justification for the given block from the network.
157	///
158	/// On success, the justification will be passed to the import queue that was part at
159	/// initialization as part of the configuration.
160	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
161		let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
162	}
163
164	fn clear_justification_requests(&self) {
165		let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
166	}
167}
168
169#[async_trait::async_trait]
170impl<B: BlockT> SyncStatusProvider<B> for SyncingService<B> {
171	/// Get high-level view of the syncing status.
172	async fn status(&self) -> Result<SyncStatus<B>, ()> {
173		let (rtx, rrx) = oneshot::channel();
174
175		let _ = self.tx.unbounded_send(ToServiceCommand::Status(rtx));
176		rrx.await.map_err(|_| ())
177	}
178}
179
180impl<B: BlockT> Link<B> for SyncingService<B> {
181	fn blocks_processed(
182		&self,
183		imported: usize,
184		count: usize,
185		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
186	) {
187		let _ = self
188			.tx
189			.unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
190	}
191
192	fn justification_imported(
193		&self,
194		who: PeerId,
195		hash: &B::Hash,
196		number: NumberFor<B>,
197		import_result: JustificationImportResult,
198	) {
199		let _ = self.tx.unbounded_send(ToServiceCommand::JustificationImported(
200			who,
201			*hash,
202			number,
203			import_result,
204		));
205	}
206
207	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
208		let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
209	}
210}
211
212impl<B: BlockT> SyncEventStream for SyncingService<B> {
213	/// Get syncing event stream.
214	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
215		let (tx, rx) = tracing_unbounded(name, 100_000);
216		let _ = self.tx.unbounded_send(ToServiceCommand::EventStream(tx));
217		Box::pin(rx)
218	}
219}
220
221impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
222	fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
223		let _ = self.tx.unbounded_send(ToServiceCommand::AnnounceBlock(hash, data));
224	}
225
226	fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
227		let _ = self.tx.unbounded_send(ToServiceCommand::NewBestBlockImported(hash, number));
228	}
229}
230
231impl<B: BlockT> sp_consensus::SyncOracle for SyncingService<B> {
232	fn is_major_syncing(&self) -> bool {
233		self.is_major_syncing.load(Ordering::Relaxed)
234	}
235
236	fn is_offline(&self) -> bool {
237		self.num_connected.load(Ordering::Relaxed) == 0
238	}
239}