1use crate::{error, error::Error};
20use codec::{Decode, IoReader as CodecIoReader};
21use futures::{future, prelude::*};
22use futures_timer::Delay;
23use log::{info, warn};
24use sc_chain_spec::ChainSpec;
25use sc_client_api::HeaderBackend;
26use sc_consensus::import_queue::{
27 BlockImportError, BlockImportStatus, ImportQueue, IncomingBlock, Link,
28};
29use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer};
30use sp_consensus::BlockOrigin;
31use sp_runtime::{
32 generic::SignedBlock,
33 traits::{
34 Block as BlockT, CheckedDiv, Header, MaybeSerializeDeserialize, NumberFor, Saturating, Zero,
35 },
36};
37use std::{
38 io::Read,
39 pin::Pin,
40 task::Poll,
41 time::{Duration, Instant},
42};
43
44const MAX_PENDING_BLOCKS: u64 = 10_000;
46
47const DELAY_TIME: u64 = 200;
49
50const TIME_BETWEEN_UPDATES: u64 = 3_000;
52
53use std::sync::Arc;
54
55pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result<String> {
57 spec.as_json(raw).map_err(Into::into)
58}
59
60enum BlockIter<R, B>
64where
65 R: std::io::Read,
66{
67 Binary {
68 num_expected_blocks: u64,
70 read_block_count: u64,
72 reader: CodecIoReader<R>,
74 },
75 Json {
76 read_block_count: u64,
78 reader: StreamDeserializer<'static, JsonIoRead<R>, SignedBlock<B>>,
80 },
81}
82
83impl<R, B> BlockIter<R, B>
84where
85 R: Read + 'static,
86 B: BlockT + MaybeSerializeDeserialize,
87{
88 fn new(input: R, binary: bool) -> Result<Self, String> {
89 if binary {
90 let mut reader = CodecIoReader(input);
91 let num_expected_blocks: u64 = Decode::decode(&mut reader)
94 .map_err(|e| format!("Failed to decode the number of blocks: {:?}", e))?;
95 Ok(BlockIter::Binary { num_expected_blocks, read_block_count: 0, reader })
96 } else {
97 let stream_deser = Deserializer::from_reader(input).into_iter::<SignedBlock<B>>();
98 Ok(BlockIter::Json { reader: stream_deser, read_block_count: 0 })
99 }
100 }
101
102 fn read_block_count(&self) -> u64 {
104 match self {
105 BlockIter::Binary { read_block_count, .. } |
106 BlockIter::Json { read_block_count, .. } => *read_block_count,
107 }
108 }
109
110 fn num_expected_blocks(&self) -> Option<u64> {
112 match self {
113 BlockIter::Binary { num_expected_blocks, .. } => Some(*num_expected_blocks),
114 BlockIter::Json { .. } => None,
115 }
116 }
117}
118
119impl<R, B> Iterator for BlockIter<R, B>
120where
121 R: Read + 'static,
122 B: BlockT + MaybeSerializeDeserialize,
123{
124 type Item = Result<SignedBlock<B>, String>;
125
126 fn next(&mut self) -> Option<Self::Item> {
127 match self {
128 BlockIter::Binary { num_expected_blocks, read_block_count, reader } => {
129 if read_block_count < num_expected_blocks {
130 let block_result: Result<SignedBlock<B>, _> =
131 SignedBlock::<B>::decode(reader).map_err(|e| e.to_string());
132 *read_block_count += 1;
133 Some(block_result)
134 } else {
135 None
137 }
138 },
139 BlockIter::Json { reader, read_block_count } => {
140 let res = Some(reader.next()?.map_err(|e| e.to_string()));
141 *read_block_count += 1;
142 res
143 },
144 }
145 }
146}
147
148fn import_block_to_queue<TBl, TImpQu>(
150 signed_block: SignedBlock<TBl>,
151 queue: &mut TImpQu,
152 force: bool,
153) where
154 TBl: BlockT + MaybeSerializeDeserialize,
155 TImpQu: 'static + ImportQueue<TBl>,
156{
157 let (header, extrinsics) = signed_block.block.deconstruct();
158 let hash = header.hash();
159 queue.service_ref().import_blocks(
161 BlockOrigin::File,
162 vec![IncomingBlock::<TBl> {
163 hash,
164 header: Some(header),
165 body: Some(extrinsics),
166 indexed_body: None,
167 justifications: signed_block.justifications,
168 origin: None,
169 allow_missing_state: false,
170 import_existing: force,
171 state: None,
172 skip_execution: false,
173 }],
174 );
175}
176
177fn importing_is_done(
179 num_expected_blocks: Option<u64>,
180 read_block_count: u64,
181 imported_blocks: u64,
182) -> bool {
183 if let Some(num_expected_blocks) = num_expected_blocks {
184 imported_blocks >= num_expected_blocks
185 } else {
186 imported_blocks >= read_block_count
187 }
188}
189
190struct Speedometer<B: BlockT> {
192 best_number: NumberFor<B>,
193 last_number: Option<NumberFor<B>>,
194 last_update: Instant,
195}
196
197impl<B: BlockT> Speedometer<B> {
198 fn new() -> Self {
200 Self {
201 best_number: NumberFor::<B>::from(0u32),
202 last_number: None,
203 last_update: Instant::now(),
204 }
205 }
206
207 fn display_speed(&self) {
210 let elapsed_ms = {
212 let elapsed = self.last_update.elapsed();
213 let since_last_millis = elapsed.as_secs() * 1000;
214 let since_last_subsec_millis = elapsed.subsec_millis() as u64;
215 since_last_millis + since_last_subsec_millis
216 };
217
218 let diff = match self.last_number {
220 None => return,
221 Some(n) => self.best_number.saturating_sub(n),
222 };
223
224 if let Ok(diff) = TryInto::<u128>::try_into(diff) {
225 let speed = diff
228 .saturating_mul(10_000)
229 .checked_div(u128::from(elapsed_ms))
230 .map_or(0.0, |s| s as f64) /
231 10.0;
232 info!("📦 Current best block: {} ({:4.1} bps)", self.best_number, speed);
233 } else {
234 let one_thousand = NumberFor::<B>::from(1_000u32);
237 let elapsed =
238 NumberFor::<B>::from(<u32 as TryFrom<_>>::try_from(elapsed_ms).unwrap_or(u32::MAX));
239
240 let speed = diff
241 .saturating_mul(one_thousand)
242 .checked_div(&elapsed)
243 .unwrap_or_else(Zero::zero);
244 info!("📦 Current best block: {} ({} bps)", self.best_number, speed)
245 }
246 }
247
248 fn update(&mut self, best_number: NumberFor<B>) {
250 self.last_number = Some(self.best_number);
251 self.best_number = best_number;
252 self.last_update = Instant::now();
253 }
254
255 fn notify_user(&mut self, best_number: NumberFor<B>) {
258 let delta = Duration::from_millis(TIME_BETWEEN_UPDATES);
259 if Instant::now().duration_since(self.last_update) >= delta {
260 self.display_speed();
261 self.update(best_number);
262 }
263 }
264}
265
266enum ImportState<R, B>
268where
269 R: Read + 'static,
270 B: BlockT + MaybeSerializeDeserialize,
271{
272 Reading { block_iter: BlockIter<R, B> },
275 WaitingForImportQueueToCatchUp {
278 block_iter: BlockIter<R, B>,
279 delay: Delay,
280 block: SignedBlock<B>,
281 },
282 WaitingForImportQueueToFinish {
284 num_expected_blocks: Option<u64>,
285 read_block_count: u64,
286 delay: Delay,
287 },
288}
289
290pub fn import_blocks<B, IQ, C>(
292 client: Arc<C>,
293 mut import_queue: IQ,
294 input: impl Read + Send + 'static,
295 force: bool,
296 binary: bool,
297) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
298where
299 C: HeaderBackend<B> + Send + Sync + 'static,
300 B: BlockT + for<'de> serde::Deserialize<'de>,
301 IQ: ImportQueue<B> + 'static,
302{
303 struct WaitLink {
304 imported_blocks: u64,
305 has_error: bool,
306 }
307
308 impl WaitLink {
309 fn new() -> WaitLink {
310 WaitLink { imported_blocks: 0, has_error: false }
311 }
312 }
313
314 impl<B: BlockT> Link<B> for WaitLink {
315 fn blocks_processed(
316 &mut self,
317 imported: usize,
318 _num_expected_blocks: usize,
319 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
320 ) {
321 self.imported_blocks += imported as u64;
322
323 for result in results {
324 if let (Err(err), hash) = result {
325 warn!("There was an error importing block with hash {:?}: {}", hash, err);
326 self.has_error = true;
327 break
328 }
329 }
330 }
331 }
332
333 let mut link = WaitLink::new();
334 let block_iter_res: Result<BlockIter<_, B>, String> = BlockIter::new(input, binary);
335
336 let block_iter = match block_iter_res {
337 Ok(block_iter) => block_iter,
338 Err(e) => {
339 return future::ready(Err(Error::Other(e))).boxed()
342 },
343 };
344
345 let mut state = Some(ImportState::Reading { block_iter });
346 let mut speedometer = Speedometer::<B>::new();
347
348 let import = future::poll_fn(move |cx| {
356 let client = &client;
357 let queue = &mut import_queue;
358 match state.take().expect("state should never be None; qed") {
359 ImportState::Reading { mut block_iter } => {
360 match block_iter.next() {
361 None => {
362 let num_expected_blocks = block_iter.num_expected_blocks();
364 let read_block_count = block_iter.read_block_count();
365 let delay = Delay::new(Duration::from_millis(DELAY_TIME));
366 state = Some(ImportState::WaitingForImportQueueToFinish {
367 num_expected_blocks,
368 read_block_count,
369 delay,
370 });
371 },
372 Some(block_result) => {
373 let read_block_count = block_iter.read_block_count();
374 match block_result {
375 Ok(block) => {
376 if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
377 let delay = Delay::new(Duration::from_millis(DELAY_TIME));
380 state = Some(ImportState::WaitingForImportQueueToCatchUp {
381 block_iter,
382 delay,
383 block,
384 });
385 } else {
386 import_block_to_queue(block, queue, force);
388 state = Some(ImportState::Reading { block_iter });
389 }
390 },
391 Err(e) =>
392 return Poll::Ready(Err(Error::Other(format!(
393 "Error reading block #{}: {}",
394 read_block_count, e
395 )))),
396 }
397 },
398 }
399 },
400 ImportState::WaitingForImportQueueToCatchUp { block_iter, mut delay, block } => {
401 let read_block_count = block_iter.read_block_count();
402 if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
403 match Pin::new(&mut delay).poll(cx) {
405 Poll::Pending => {
406 state = Some(ImportState::WaitingForImportQueueToCatchUp {
407 block_iter,
408 delay,
409 block,
410 });
411 return Poll::Pending
412 },
413 Poll::Ready(_) => {
414 delay.reset(Duration::from_millis(DELAY_TIME));
415 },
416 }
417 state = Some(ImportState::WaitingForImportQueueToCatchUp {
418 block_iter,
419 delay,
420 block,
421 });
422 } else {
423 import_block_to_queue(block, queue, force);
425 state = Some(ImportState::Reading { block_iter });
427 }
428 },
429 ImportState::WaitingForImportQueueToFinish {
430 num_expected_blocks,
431 read_block_count,
432 mut delay,
433 } => {
434 if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) {
437 info!(
439 "🎉 Imported {} blocks. Best: #{}",
440 read_block_count,
441 client.info().best_number
442 );
443 return Poll::Ready(Ok(()))
444 } else {
445 match Pin::new(&mut delay).poll(cx) {
448 Poll::Pending => {
449 state = Some(ImportState::WaitingForImportQueueToFinish {
450 num_expected_blocks,
451 read_block_count,
452 delay,
453 });
454 return Poll::Pending
455 },
456 Poll::Ready(_) => {
457 delay.reset(Duration::from_millis(DELAY_TIME));
458 },
459 }
460
461 state = Some(ImportState::WaitingForImportQueueToFinish {
462 num_expected_blocks,
463 read_block_count,
464 delay,
465 });
466 }
467 },
468 }
469
470 queue.poll_actions(cx, &mut link);
471
472 let best_number = client.info().best_number;
473 speedometer.notify_user(best_number);
474
475 if link.has_error {
476 return Poll::Ready(Err(Error::Other(format!(
477 "Stopping after #{} blocks because of an error",
478 link.imported_blocks
479 ))))
480 }
481
482 cx.waker().wake_by_ref();
483 Poll::Pending
484 });
485 Box::pin(import)
486}