1use crate::{error, error::Error};
20use codec::{Decode, IoReader as CodecIoReader};
21use futures::{future, prelude::*};
22use futures_timer::Delay;
23use log::{info, warn};
24use sc_chain_spec::ChainSpec;
25use sc_client_api::HeaderBackend;
26use sc_consensus::import_queue::{
27 BlockImportError, BlockImportStatus, ImportQueue, IncomingBlock, Link,
28};
29use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer};
30use sp_consensus::BlockOrigin;
31use sp_runtime::{
32 generic::SignedBlock,
33 traits::{
34 Block as BlockT, CheckedDiv, Header, MaybeSerializeDeserialize, NumberFor, Saturating, Zero,
35 },
36};
37use std::{
38 io::Read,
39 pin::Pin,
40 sync::{
41 atomic::{AtomicBool, AtomicU64, Ordering},
42 Arc,
43 },
44 task::Poll,
45 time::{Duration, Instant},
46};
47
48const MAX_PENDING_BLOCKS: u64 = 10_000;
50
51const DELAY_TIME: u64 = 200;
53
54const TIME_BETWEEN_UPDATES: u64 = 3_000;
56
57pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result<String> {
59 spec.as_json(raw).map_err(Into::into)
60}
61
62enum BlockIter<R, B>
66where
67 R: std::io::Read,
68{
69 Binary {
70 num_expected_blocks: u64,
72 read_block_count: u64,
74 reader: CodecIoReader<R>,
76 },
77 Json {
78 read_block_count: u64,
80 reader: StreamDeserializer<'static, JsonIoRead<R>, SignedBlock<B>>,
82 },
83}
84
85impl<R, B> BlockIter<R, B>
86where
87 R: Read + 'static,
88 B: BlockT + MaybeSerializeDeserialize,
89{
90 fn new(input: R, binary: bool) -> Result<Self, String> {
91 if binary {
92 let mut reader = CodecIoReader(input);
93 let num_expected_blocks: u64 = Decode::decode(&mut reader)
96 .map_err(|e| format!("Failed to decode the number of blocks: {:?}", e))?;
97 Ok(BlockIter::Binary { num_expected_blocks, read_block_count: 0, reader })
98 } else {
99 let stream_deser = Deserializer::from_reader(input).into_iter::<SignedBlock<B>>();
100 Ok(BlockIter::Json { reader: stream_deser, read_block_count: 0 })
101 }
102 }
103
104 fn read_block_count(&self) -> u64 {
106 match self {
107 BlockIter::Binary { read_block_count, .. } |
108 BlockIter::Json { read_block_count, .. } => *read_block_count,
109 }
110 }
111
112 fn num_expected_blocks(&self) -> Option<u64> {
114 match self {
115 BlockIter::Binary { num_expected_blocks, .. } => Some(*num_expected_blocks),
116 BlockIter::Json { .. } => None,
117 }
118 }
119}
120
121impl<R, B> Iterator for BlockIter<R, B>
122where
123 R: Read + 'static,
124 B: BlockT + MaybeSerializeDeserialize,
125{
126 type Item = Result<SignedBlock<B>, String>;
127
128 fn next(&mut self) -> Option<Self::Item> {
129 match self {
130 BlockIter::Binary { num_expected_blocks, read_block_count, reader } => {
131 if read_block_count < num_expected_blocks {
132 let block_result: Result<SignedBlock<B>, _> =
133 SignedBlock::<B>::decode(reader).map_err(|e| e.to_string());
134 *read_block_count += 1;
135 Some(block_result)
136 } else {
137 None
139 }
140 },
141 BlockIter::Json { reader, read_block_count } => {
142 let res = Some(reader.next()?.map_err(|e| e.to_string()));
143 *read_block_count += 1;
144 res
145 },
146 }
147 }
148}
149
150fn import_block_to_queue<TBl, TImpQu>(
152 signed_block: SignedBlock<TBl>,
153 queue: &mut TImpQu,
154 force: bool,
155) where
156 TBl: BlockT + MaybeSerializeDeserialize,
157 TImpQu: 'static + ImportQueue<TBl>,
158{
159 let (header, extrinsics) = signed_block.block.deconstruct();
160 let hash = header.hash();
161 queue.service_ref().import_blocks(
163 BlockOrigin::File,
164 vec![IncomingBlock::<TBl> {
165 hash,
166 header: Some(header),
167 body: Some(extrinsics),
168 indexed_body: None,
169 justifications: signed_block.justifications,
170 origin: None,
171 allow_missing_state: false,
172 import_existing: force,
173 state: None,
174 skip_execution: false,
175 }],
176 );
177}
178
179fn importing_is_done(
181 num_expected_blocks: Option<u64>,
182 read_block_count: u64,
183 imported_blocks: u64,
184) -> bool {
185 if let Some(num_expected_blocks) = num_expected_blocks {
186 imported_blocks >= num_expected_blocks
187 } else {
188 imported_blocks >= read_block_count
189 }
190}
191
192struct Speedometer<B: BlockT> {
194 best_number: NumberFor<B>,
195 last_number: Option<NumberFor<B>>,
196 last_update: Instant,
197}
198
199impl<B: BlockT> Speedometer<B> {
200 fn new() -> Self {
202 Self {
203 best_number: NumberFor::<B>::from(0u32),
204 last_number: None,
205 last_update: Instant::now(),
206 }
207 }
208
209 fn display_speed(&self) {
212 let elapsed_ms = {
214 let elapsed = self.last_update.elapsed();
215 let since_last_millis = elapsed.as_secs() * 1000;
216 let since_last_subsec_millis = elapsed.subsec_millis() as u64;
217 since_last_millis + since_last_subsec_millis
218 };
219
220 let diff = match self.last_number {
222 None => return,
223 Some(n) => self.best_number.saturating_sub(n),
224 };
225
226 if let Ok(diff) = TryInto::<u128>::try_into(diff) {
227 let speed = diff
230 .saturating_mul(10_000)
231 .checked_div(u128::from(elapsed_ms))
232 .map_or(0.0, |s| s as f64) /
233 10.0;
234 info!("๐ฆ Current best block: {} ({:4.1} bps)", self.best_number, speed);
235 } else {
236 let one_thousand = NumberFor::<B>::from(1_000u32);
239 let elapsed =
240 NumberFor::<B>::from(<u32 as TryFrom<_>>::try_from(elapsed_ms).unwrap_or(u32::MAX));
241
242 let speed = diff
243 .saturating_mul(one_thousand)
244 .checked_div(&elapsed)
245 .unwrap_or_else(Zero::zero);
246 info!("๐ฆ Current best block: {} ({} bps)", self.best_number, speed)
247 }
248 }
249
250 fn update(&mut self, best_number: NumberFor<B>) {
252 self.last_number = Some(self.best_number);
253 self.best_number = best_number;
254 self.last_update = Instant::now();
255 }
256
257 fn notify_user(&mut self, best_number: NumberFor<B>) {
260 let delta = Duration::from_millis(TIME_BETWEEN_UPDATES);
261 if Instant::now().duration_since(self.last_update) >= delta {
262 self.display_speed();
263 self.update(best_number);
264 }
265 }
266}
267
268enum ImportState<R, B>
270where
271 R: Read + 'static,
272 B: BlockT + MaybeSerializeDeserialize,
273{
274 Reading { block_iter: BlockIter<R, B> },
277 WaitingForImportQueueToCatchUp {
280 block_iter: BlockIter<R, B>,
281 delay: Delay,
282 block: SignedBlock<B>,
283 },
284 WaitingForImportQueueToFinish {
286 num_expected_blocks: Option<u64>,
287 read_block_count: u64,
288 delay: Delay,
289 },
290}
291
292pub fn import_blocks<B, IQ, C>(
294 client: Arc<C>,
295 mut import_queue: IQ,
296 input: impl Read + Send + 'static,
297 force: bool,
298 binary: bool,
299) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
300where
301 C: HeaderBackend<B> + Send + Sync + 'static,
302 B: BlockT + for<'de> serde::Deserialize<'de>,
303 IQ: ImportQueue<B> + 'static,
304{
305 struct WaitLink {
306 imported_blocks: AtomicU64,
307 has_error: AtomicBool,
308 }
309
310 impl WaitLink {
311 fn new() -> WaitLink {
312 WaitLink { imported_blocks: AtomicU64::new(0), has_error: AtomicBool::new(false) }
313 }
314 }
315
316 impl<B: BlockT> Link<B> for WaitLink {
317 fn blocks_processed(
318 &self,
319 imported: usize,
320 _num_expected_blocks: usize,
321 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
322 ) {
323 self.imported_blocks.fetch_add(imported as u64, Ordering::AcqRel);
324
325 for result in results {
326 if let (Err(err), hash) = result {
327 warn!("There was an error importing block with hash {:?}: {}", hash, err);
328 self.has_error.store(true, Ordering::Release);
329 break
330 }
331 }
332 }
333 }
334
335 let mut link = WaitLink::new();
336 let block_iter_res: Result<BlockIter<_, B>, String> = BlockIter::new(input, binary);
337
338 let block_iter = match block_iter_res {
339 Ok(block_iter) => block_iter,
340 Err(e) => {
341 return future::ready(Err(Error::Other(e))).boxed()
344 },
345 };
346
347 let mut state = Some(ImportState::Reading { block_iter });
348 let mut speedometer = Speedometer::<B>::new();
349
350 let import = future::poll_fn(move |cx| {
358 let client = &client;
359 let queue = &mut import_queue;
360 match state.take().expect("state should never be None; qed") {
361 ImportState::Reading { mut block_iter } => {
362 match block_iter.next() {
363 None => {
364 let num_expected_blocks = block_iter.num_expected_blocks();
366 let read_block_count = block_iter.read_block_count();
367 let delay = Delay::new(Duration::from_millis(DELAY_TIME));
368 state = Some(ImportState::WaitingForImportQueueToFinish {
369 num_expected_blocks,
370 read_block_count,
371 delay,
372 });
373 },
374 Some(block_result) => {
375 let read_block_count = block_iter.read_block_count();
376 match block_result {
377 Ok(block) => {
378 if read_block_count - link.imported_blocks.load(Ordering::Acquire) >=
379 MAX_PENDING_BLOCKS
380 {
381 let delay = Delay::new(Duration::from_millis(DELAY_TIME));
384 state = Some(ImportState::WaitingForImportQueueToCatchUp {
385 block_iter,
386 delay,
387 block,
388 });
389 } else {
390 import_block_to_queue(block, queue, force);
392 state = Some(ImportState::Reading { block_iter });
393 }
394 },
395 Err(e) =>
396 return Poll::Ready(Err(Error::Other(format!(
397 "Error reading block #{}: {}",
398 read_block_count, e
399 )))),
400 }
401 },
402 }
403 },
404 ImportState::WaitingForImportQueueToCatchUp { block_iter, mut delay, block } => {
405 let read_block_count = block_iter.read_block_count();
406 if read_block_count - link.imported_blocks.load(Ordering::Acquire) >=
407 MAX_PENDING_BLOCKS
408 {
409 match Pin::new(&mut delay).poll(cx) {
411 Poll::Pending => {
412 state = Some(ImportState::WaitingForImportQueueToCatchUp {
413 block_iter,
414 delay,
415 block,
416 });
417 return Poll::Pending
418 },
419 Poll::Ready(_) => {
420 delay.reset(Duration::from_millis(DELAY_TIME));
421 },
422 }
423 state = Some(ImportState::WaitingForImportQueueToCatchUp {
424 block_iter,
425 delay,
426 block,
427 });
428 } else {
429 import_block_to_queue(block, queue, force);
431 state = Some(ImportState::Reading { block_iter });
433 }
434 },
435 ImportState::WaitingForImportQueueToFinish {
436 num_expected_blocks,
437 read_block_count,
438 mut delay,
439 } => {
440 if importing_is_done(
443 num_expected_blocks,
444 read_block_count,
445 link.imported_blocks.load(Ordering::Acquire),
446 ) {
447 info!(
449 "๐ Imported {} blocks. Best: #{}",
450 read_block_count,
451 client.info().best_number
452 );
453 return Poll::Ready(Ok(()))
454 } else {
455 match Pin::new(&mut delay).poll(cx) {
458 Poll::Pending => {
459 state = Some(ImportState::WaitingForImportQueueToFinish {
460 num_expected_blocks,
461 read_block_count,
462 delay,
463 });
464 return Poll::Pending
465 },
466 Poll::Ready(_) => {
467 delay.reset(Duration::from_millis(DELAY_TIME));
468 },
469 }
470
471 state = Some(ImportState::WaitingForImportQueueToFinish {
472 num_expected_blocks,
473 read_block_count,
474 delay,
475 });
476 }
477 },
478 }
479
480 queue.poll_actions(cx, &mut link);
481
482 let best_number = client.info().best_number;
483 speedometer.notify_user(best_number);
484
485 if link.has_error.load(Ordering::Acquire) {
486 return Poll::Ready(Err(Error::Other(format!(
487 "Stopping after #{} blocks because of an error",
488 link.imported_blocks.load(Ordering::Acquire)
489 ))))
490 }
491
492 cx.waker().wake_by_ref();
493 Poll::Pending
494 });
495 Box::pin(import)
496}