1use crate::types::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider};
20
21use futures::{channel::oneshot, Stream};
22use sc_network_types::PeerId;
23
24use sc_consensus::{
25 BlockImportError, BlockImportStatus, JustificationImportResult, JustificationSyncLink, Link,
26};
27use sc_network::{NetworkBlock, NetworkSyncForkRequest};
28use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
29use sp_runtime::traits::{Block as BlockT, NumberFor};
30
31use std::{
32 collections::HashSet,
33 pin::Pin,
34 sync::{
35 atomic::{AtomicBool, AtomicUsize, Ordering},
36 Arc,
37 },
38};
39
40pub enum ToServiceCommand<B: BlockT> {
42 SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
43 RequestJustification(B::Hash, NumberFor<B>),
44 ClearJustificationRequests,
45 BlocksProcessed(
46 usize,
47 usize,
48 Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
49 ),
50 JustificationImported(PeerId, B::Hash, NumberFor<B>, JustificationImportResult),
51 AnnounceBlock(B::Hash, Option<Vec<u8>>),
52 NewBestBlockImported(B::Hash, NumberFor<B>),
53 EventStream(TracingUnboundedSender<SyncEvent>),
54 Status(oneshot::Sender<SyncStatus<B>>),
55 NumActivePeers(oneshot::Sender<usize>),
56 NumDownloadedBlocks(oneshot::Sender<usize>),
57 NumSyncRequests(oneshot::Sender<usize>),
58 PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
59 OnBlockFinalized(B::Hash, B::Header),
60 SetNoSlotPeers(HashSet<PeerId>),
62 }
66
67#[derive(Clone)]
69pub struct SyncingService<B: BlockT> {
70 tx: TracingUnboundedSender<ToServiceCommand<B>>,
71 num_connected: Arc<AtomicUsize>,
73 is_major_syncing: Arc<AtomicBool>,
75}
76
77impl<B: BlockT> SyncingService<B> {
78 pub fn new(
80 tx: TracingUnboundedSender<ToServiceCommand<B>>,
81 num_connected: Arc<AtomicUsize>,
82 is_major_syncing: Arc<AtomicBool>,
83 ) -> Self {
84 Self { tx, num_connected, is_major_syncing }
85 }
86
87 pub fn num_connected_peers(&self) -> usize {
89 self.num_connected.load(Ordering::Relaxed)
90 }
91
92 pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
94 let (tx, rx) = oneshot::channel();
95 let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));
96
97 rx.await
98 }
99
100 pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
102 let (tx, rx) = oneshot::channel();
103 let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));
104
105 rx.await
106 }
107
108 pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
110 let (tx, rx) = oneshot::channel();
111 let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));
112
113 rx.await
114 }
115
116 pub async fn peers_info(
118 &self,
119 ) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
120 let (tx, rx) = oneshot::channel();
121 let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));
122
123 rx.await
124 }
125
126 pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
128 let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
129 }
130
131 pub fn set_no_slot_peers(&self, peers: HashSet<PeerId>) {
148 let _ = self.tx.unbounded_send(ToServiceCommand::SetNoSlotPeers(peers));
149 }
150
151 pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
155 let (tx, rx) = oneshot::channel();
156 let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
157
158 rx.await
159 }
160}
161
162impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
163 fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
172 let _ = self
173 .tx
174 .unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
175 }
176}
177
178impl<B: BlockT> JustificationSyncLink<B> for SyncingService<B> {
179 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
184 let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
185 }
186
187 fn clear_justification_requests(&self) {
188 let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
189 }
190}
191
192#[async_trait::async_trait]
193impl<B: BlockT> SyncStatusProvider<B> for SyncingService<B> {
194 async fn status(&self) -> Result<SyncStatus<B>, ()> {
196 let (rtx, rrx) = oneshot::channel();
197
198 let _ = self.tx.unbounded_send(ToServiceCommand::Status(rtx));
199 rrx.await.map_err(|_| ())
200 }
201}
202
203impl<B: BlockT> Link<B> for SyncingService<B> {
204 fn blocks_processed(
205 &self,
206 imported: usize,
207 count: usize,
208 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
209 ) {
210 let _ = self
211 .tx
212 .unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
213 }
214
215 fn justification_imported(
216 &self,
217 who: PeerId,
218 hash: &B::Hash,
219 number: NumberFor<B>,
220 import_result: JustificationImportResult,
221 ) {
222 let _ = self.tx.unbounded_send(ToServiceCommand::JustificationImported(
223 who,
224 *hash,
225 number,
226 import_result,
227 ));
228 }
229
230 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
231 let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
232 }
233}
234
235impl<B: BlockT> SyncEventStream for SyncingService<B> {
236 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
238 let (tx, rx) = tracing_unbounded(name, 100_000);
239 let _ = self.tx.unbounded_send(ToServiceCommand::EventStream(tx));
240 Box::pin(rx)
241 }
242}
243
244impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
245 fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
246 let _ = self.tx.unbounded_send(ToServiceCommand::AnnounceBlock(hash, data));
247 }
248
249 fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
250 let _ = self.tx.unbounded_send(ToServiceCommand::NewBestBlockImported(hash, number));
251 }
252}
253
254impl<B: BlockT> sp_consensus::SyncOracle for SyncingService<B> {
255 fn is_major_syncing(&self) -> bool {
256 self.is_major_syncing.load(Ordering::Relaxed)
257 }
258
259 fn is_offline(&self) -> bool {
260 self.num_connected.load(Ordering::Relaxed) == 0
261 }
262}