1use crate::types::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider};
20
21use futures::{channel::oneshot, Stream};
22use sc_network_types::PeerId;
23
24use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
25use sc_network::{NetworkBlock, NetworkSyncForkRequest};
26use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
27use sp_runtime::traits::{Block as BlockT, NumberFor};
28
29use std::{
30 pin::Pin,
31 sync::{
32 atomic::{AtomicBool, AtomicUsize, Ordering},
33 Arc,
34 },
35};
36
37pub enum ToServiceCommand<B: BlockT> {
39 SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
40 RequestJustification(B::Hash, NumberFor<B>),
41 ClearJustificationRequests,
42 BlocksProcessed(
43 usize,
44 usize,
45 Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
46 ),
47 JustificationImported(PeerId, B::Hash, NumberFor<B>, bool),
48 AnnounceBlock(B::Hash, Option<Vec<u8>>),
49 NewBestBlockImported(B::Hash, NumberFor<B>),
50 EventStream(TracingUnboundedSender<SyncEvent>),
51 Status(oneshot::Sender<SyncStatus<B>>),
52 NumActivePeers(oneshot::Sender<usize>),
53 NumDownloadedBlocks(oneshot::Sender<usize>),
54 NumSyncRequests(oneshot::Sender<usize>),
55 PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
56 OnBlockFinalized(B::Hash, B::Header),
57 }
61
62#[derive(Clone)]
64pub struct SyncingService<B: BlockT> {
65 tx: TracingUnboundedSender<ToServiceCommand<B>>,
66 num_connected: Arc<AtomicUsize>,
68 is_major_syncing: Arc<AtomicBool>,
70}
71
72impl<B: BlockT> SyncingService<B> {
73 pub fn new(
75 tx: TracingUnboundedSender<ToServiceCommand<B>>,
76 num_connected: Arc<AtomicUsize>,
77 is_major_syncing: Arc<AtomicBool>,
78 ) -> Self {
79 Self { tx, num_connected, is_major_syncing }
80 }
81
82 pub fn num_connected_peers(&self) -> usize {
84 self.num_connected.load(Ordering::Relaxed)
85 }
86
87 pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
89 let (tx, rx) = oneshot::channel();
90 let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));
91
92 rx.await
93 }
94
95 pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
97 let (tx, rx) = oneshot::channel();
98 let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));
99
100 rx.await
101 }
102
103 pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
105 let (tx, rx) = oneshot::channel();
106 let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));
107
108 rx.await
109 }
110
111 pub async fn peers_info(
113 &self,
114 ) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
115 let (tx, rx) = oneshot::channel();
116 let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));
117
118 rx.await
119 }
120
121 pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
123 let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
124 }
125
126 pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
130 let (tx, rx) = oneshot::channel();
131 let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
132
133 rx.await
134 }
135}
136
137impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
138 fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
147 let _ = self
148 .tx
149 .unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
150 }
151}
152
153impl<B: BlockT> JustificationSyncLink<B> for SyncingService<B> {
154 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
159 let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
160 }
161
162 fn clear_justification_requests(&self) {
163 let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
164 }
165}
166
167#[async_trait::async_trait]
168impl<B: BlockT> SyncStatusProvider<B> for SyncingService<B> {
169 async fn status(&self) -> Result<SyncStatus<B>, ()> {
171 let (rtx, rrx) = oneshot::channel();
172
173 let _ = self.tx.unbounded_send(ToServiceCommand::Status(rtx));
174 rrx.await.map_err(|_| ())
175 }
176}
177
178impl<B: BlockT> Link<B> for SyncingService<B> {
179 fn blocks_processed(
180 &mut self,
181 imported: usize,
182 count: usize,
183 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
184 ) {
185 let _ = self
186 .tx
187 .unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
188 }
189
190 fn justification_imported(
191 &mut self,
192 who: PeerId,
193 hash: &B::Hash,
194 number: NumberFor<B>,
195 success: bool,
196 ) {
197 let _ = self
198 .tx
199 .unbounded_send(ToServiceCommand::JustificationImported(who, *hash, number, success));
200 }
201
202 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
203 let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
204 }
205}
206
207impl<B: BlockT> SyncEventStream for SyncingService<B> {
208 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
210 let (tx, rx) = tracing_unbounded(name, 100_000);
211 let _ = self.tx.unbounded_send(ToServiceCommand::EventStream(tx));
212 Box::pin(rx)
213 }
214}
215
216impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
217 fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
218 let _ = self.tx.unbounded_send(ToServiceCommand::AnnounceBlock(hash, data));
219 }
220
221 fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
222 let _ = self.tx.unbounded_send(ToServiceCommand::NewBestBlockImported(hash, number));
223 }
224}
225
226impl<B: BlockT> sp_consensus::SyncOracle for SyncingService<B> {
227 fn is_major_syncing(&self) -> bool {
228 self.is_major_syncing.load(Ordering::Relaxed)
229 }
230
231 fn is_offline(&self) -> bool {
232 self.num_connected.load(Ordering::Relaxed) == 0
233 }
234}