1use crate::types::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider};
20
21use futures::{channel::oneshot, Stream};
22use sc_network_types::PeerId;
23
24use sc_consensus::{
25 BlockImportError, BlockImportStatus, JustificationImportResult, JustificationSyncLink, Link,
26};
27use sc_network::{NetworkBlock, NetworkSyncForkRequest};
28use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
29use sp_runtime::traits::{Block as BlockT, NumberFor};
30
31use std::{
32 pin::Pin,
33 sync::{
34 atomic::{AtomicBool, AtomicUsize, Ordering},
35 Arc,
36 },
37};
38
39pub enum ToServiceCommand<B: BlockT> {
41 SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
42 RequestJustification(B::Hash, NumberFor<B>),
43 ClearJustificationRequests,
44 BlocksProcessed(
45 usize,
46 usize,
47 Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
48 ),
49 JustificationImported(PeerId, B::Hash, NumberFor<B>, JustificationImportResult),
50 AnnounceBlock(B::Hash, Option<Vec<u8>>),
51 NewBestBlockImported(B::Hash, NumberFor<B>),
52 EventStream(TracingUnboundedSender<SyncEvent>),
53 Status(oneshot::Sender<SyncStatus<B>>),
54 NumActivePeers(oneshot::Sender<usize>),
55 NumDownloadedBlocks(oneshot::Sender<usize>),
56 NumSyncRequests(oneshot::Sender<usize>),
57 PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
58 OnBlockFinalized(B::Hash, B::Header),
59 }
63
64#[derive(Clone)]
66pub struct SyncingService<B: BlockT> {
67 tx: TracingUnboundedSender<ToServiceCommand<B>>,
68 num_connected: Arc<AtomicUsize>,
70 is_major_syncing: Arc<AtomicBool>,
72}
73
74impl<B: BlockT> SyncingService<B> {
75 pub fn new(
77 tx: TracingUnboundedSender<ToServiceCommand<B>>,
78 num_connected: Arc<AtomicUsize>,
79 is_major_syncing: Arc<AtomicBool>,
80 ) -> Self {
81 Self { tx, num_connected, is_major_syncing }
82 }
83
84 pub fn num_connected_peers(&self) -> usize {
86 self.num_connected.load(Ordering::Relaxed)
87 }
88
89 pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
91 let (tx, rx) = oneshot::channel();
92 let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));
93
94 rx.await
95 }
96
97 pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
99 let (tx, rx) = oneshot::channel();
100 let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));
101
102 rx.await
103 }
104
105 pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
107 let (tx, rx) = oneshot::channel();
108 let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));
109
110 rx.await
111 }
112
113 pub async fn peers_info(
115 &self,
116 ) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
117 let (tx, rx) = oneshot::channel();
118 let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));
119
120 rx.await
121 }
122
123 pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
125 let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
126 }
127
128 pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
132 let (tx, rx) = oneshot::channel();
133 let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
134
135 rx.await
136 }
137}
138
139impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
140 fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
149 let _ = self
150 .tx
151 .unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
152 }
153}
154
155impl<B: BlockT> JustificationSyncLink<B> for SyncingService<B> {
156 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
161 let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
162 }
163
164 fn clear_justification_requests(&self) {
165 let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
166 }
167}
168
169#[async_trait::async_trait]
170impl<B: BlockT> SyncStatusProvider<B> for SyncingService<B> {
171 async fn status(&self) -> Result<SyncStatus<B>, ()> {
173 let (rtx, rrx) = oneshot::channel();
174
175 let _ = self.tx.unbounded_send(ToServiceCommand::Status(rtx));
176 rrx.await.map_err(|_| ())
177 }
178}
179
180impl<B: BlockT> Link<B> for SyncingService<B> {
181 fn blocks_processed(
182 &self,
183 imported: usize,
184 count: usize,
185 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
186 ) {
187 let _ = self
188 .tx
189 .unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
190 }
191
192 fn justification_imported(
193 &self,
194 who: PeerId,
195 hash: &B::Hash,
196 number: NumberFor<B>,
197 import_result: JustificationImportResult,
198 ) {
199 let _ = self.tx.unbounded_send(ToServiceCommand::JustificationImported(
200 who,
201 *hash,
202 number,
203 import_result,
204 ));
205 }
206
207 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
208 let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
209 }
210}
211
212impl<B: BlockT> SyncEventStream for SyncingService<B> {
213 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
215 let (tx, rx) = tracing_unbounded(name, 100_000);
216 let _ = self.tx.unbounded_send(ToServiceCommand::EventStream(tx));
217 Box::pin(rx)
218 }
219}
220
221impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
222 fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
223 let _ = self.tx.unbounded_send(ToServiceCommand::AnnounceBlock(hash, data));
224 }
225
226 fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
227 let _ = self.tx.unbounded_send(ToServiceCommand::NewBestBlockImported(hash, number));
228 }
229}
230
231impl<B: BlockT> sp_consensus::SyncOracle for SyncingService<B> {
232 fn is_major_syncing(&self) -> bool {
233 self.is_major_syncing.load(Ordering::Relaxed)
234 }
235
236 fn is_offline(&self) -> bool {
237 self.num_connected.load(Ordering::Relaxed) == 0
238 }
239}