referrerpolicy=no-referrer-when-downgrade

sc_network_sync/service/
syncing_service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::types::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider};
20
21use futures::{channel::oneshot, Stream};
22use sc_network_types::PeerId;
23
24use sc_consensus::{
25	BlockImportError, BlockImportStatus, JustificationImportResult, JustificationSyncLink, Link,
26};
27use sc_network::{NetworkBlock, NetworkSyncForkRequest};
28use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
29use sp_runtime::traits::{Block as BlockT, NumberFor};
30
31use std::{
32	collections::HashSet,
33	pin::Pin,
34	sync::{
35		atomic::{AtomicBool, AtomicUsize, Ordering},
36		Arc,
37	},
38};
39
40/// Commands send to `SyncingEngine`
41pub enum ToServiceCommand<B: BlockT> {
42	SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
43	RequestJustification(B::Hash, NumberFor<B>),
44	ClearJustificationRequests,
45	BlocksProcessed(
46		usize,
47		usize,
48		Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
49	),
50	JustificationImported(PeerId, B::Hash, NumberFor<B>, JustificationImportResult),
51	AnnounceBlock(B::Hash, Option<Vec<u8>>),
52	NewBestBlockImported(B::Hash, NumberFor<B>),
53	EventStream(TracingUnboundedSender<SyncEvent>),
54	Status(oneshot::Sender<SyncStatus<B>>),
55	NumActivePeers(oneshot::Sender<usize>),
56	NumDownloadedBlocks(oneshot::Sender<usize>),
57	NumSyncRequests(oneshot::Sender<usize>),
58	PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
59	OnBlockFinalized(B::Hash, B::Header),
60	/// Dynamically update the no-slot peer set. Does not modify the static no-slot peer set.
61	SetNoSlotPeers(HashSet<PeerId>),
62	// Status {
63	// 	pending_response: oneshot::Sender<SyncStatus<B>>,
64	// },
65}
66
67/// Handle for communicating with `SyncingEngine` asynchronously
68#[derive(Clone)]
69pub struct SyncingService<B: BlockT> {
70	tx: TracingUnboundedSender<ToServiceCommand<B>>,
71	/// Number of peers we're connected to.
72	num_connected: Arc<AtomicUsize>,
73	/// Are we actively catching up with the chain?
74	is_major_syncing: Arc<AtomicBool>,
75}
76
77impl<B: BlockT> SyncingService<B> {
78	/// Create new handle
79	pub fn new(
80		tx: TracingUnboundedSender<ToServiceCommand<B>>,
81		num_connected: Arc<AtomicUsize>,
82		is_major_syncing: Arc<AtomicBool>,
83	) -> Self {
84		Self { tx, num_connected, is_major_syncing }
85	}
86
87	/// Get the number of peers known to `SyncingEngine` (both full and light).
88	pub fn num_connected_peers(&self) -> usize {
89		self.num_connected.load(Ordering::Relaxed)
90	}
91
92	/// Get the number of active peers.
93	pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
94		let (tx, rx) = oneshot::channel();
95		let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));
96
97		rx.await
98	}
99
100	/// Get the number of downloaded blocks.
101	pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
102		let (tx, rx) = oneshot::channel();
103		let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));
104
105		rx.await
106	}
107
108	/// Get the number of sync requests.
109	pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
110		let (tx, rx) = oneshot::channel();
111		let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));
112
113		rx.await
114	}
115
116	/// Get peer information.
117	pub async fn peers_info(
118		&self,
119	) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
120		let (tx, rx) = oneshot::channel();
121		let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));
122
123		rx.await
124	}
125
126	/// Notify the `SyncingEngine` that a block has been finalized.
127	pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
128		let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
129	}
130
131	/// Replace the dynamic set of no-slot peer set.
132	///
133	/// The engine maintains a per-role inbound-peer counter on top of the underlying network's
134	/// reserved-peer handling (since <https://github.com/paritytech/substrate/pull/14603>) to ensure the
135	/// `--in-peers` budget is enforced specifically for full peers in default notification set.
136	/// A peer in either the static (config-time) or dynamic (this)
137	/// no-slot set bypasses that counter, exactly mirroring what reserved-peer handling does at
138	/// the network layer.
139	///
140	/// Each call replaces the dynamic set. The engine reconciles against its previous dynamic set.
141	/// Peers added are promoted (taken off the inbound budget). Peers removed are demoted
142	/// (counted again, or disconnected if doing so would now exceed `--in-peers`).
143	///
144	/// Callers that maintain a dynamic reserved-peer relationship (e.g. parachain
145	/// collator-discovery) must call this in lockstep with `set_reserved_peers` using the same
146	/// peer set, so the two layers stay in sync.
147	pub fn set_no_slot_peers(&self, peers: HashSet<PeerId>) {
148		let _ = self.tx.unbounded_send(ToServiceCommand::SetNoSlotPeers(peers));
149	}
150
151	/// Get sync status
152	///
153	/// Returns an error if `SyncingEngine` has terminated.
154	pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
155		let (tx, rx) = oneshot::channel();
156		let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
157
158		rx.await
159	}
160}
161
162impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
163	/// Configure an explicit fork sync request.
164	///
165	/// Note that this function should not be used for recent blocks.
166	/// Sync should be able to download all the recent forks normally.
167	/// `set_sync_fork_request` should only be used if external code detects that there's
168	/// a stale fork missing.
169	///
170	/// Passing empty `peers` set effectively removes the sync request.
171	fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
172		let _ = self
173			.tx
174			.unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
175	}
176}
177
178impl<B: BlockT> JustificationSyncLink<B> for SyncingService<B> {
179	/// Request a justification for the given block from the network.
180	///
181	/// On success, the justification will be passed to the import queue that was part at
182	/// initialization as part of the configuration.
183	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
184		let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
185	}
186
187	fn clear_justification_requests(&self) {
188		let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
189	}
190}
191
192#[async_trait::async_trait]
193impl<B: BlockT> SyncStatusProvider<B> for SyncingService<B> {
194	/// Get high-level view of the syncing status.
195	async fn status(&self) -> Result<SyncStatus<B>, ()> {
196		let (rtx, rrx) = oneshot::channel();
197
198		let _ = self.tx.unbounded_send(ToServiceCommand::Status(rtx));
199		rrx.await.map_err(|_| ())
200	}
201}
202
203impl<B: BlockT> Link<B> for SyncingService<B> {
204	fn blocks_processed(
205		&self,
206		imported: usize,
207		count: usize,
208		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
209	) {
210		let _ = self
211			.tx
212			.unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
213	}
214
215	fn justification_imported(
216		&self,
217		who: PeerId,
218		hash: &B::Hash,
219		number: NumberFor<B>,
220		import_result: JustificationImportResult,
221	) {
222		let _ = self.tx.unbounded_send(ToServiceCommand::JustificationImported(
223			who,
224			*hash,
225			number,
226			import_result,
227		));
228	}
229
230	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
231		let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
232	}
233}
234
235impl<B: BlockT> SyncEventStream for SyncingService<B> {
236	/// Get syncing event stream.
237	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
238		let (tx, rx) = tracing_unbounded(name, 100_000);
239		let _ = self.tx.unbounded_send(ToServiceCommand::EventStream(tx));
240		Box::pin(rx)
241	}
242}
243
244impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
245	fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
246		let _ = self.tx.unbounded_send(ToServiceCommand::AnnounceBlock(hash, data));
247	}
248
249	fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
250		let _ = self.tx.unbounded_send(ToServiceCommand::NewBestBlockImported(hash, number));
251	}
252}
253
254impl<B: BlockT> sp_consensus::SyncOracle for SyncingService<B> {
255	fn is_major_syncing(&self) -> bool {
256		self.is_major_syncing.load(Ordering::Relaxed)
257	}
258
259	fn is_offline(&self) -> bool {
260		self.num_connected.load(Ordering::Relaxed) == 0
261	}
262}