referrerpolicy=no-referrer-when-downgrade

sc_network_sync/strategy/
state.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! State sync strategy.
20
21use crate::{
22	schema::v1::{StateRequest, StateResponse},
23	service::network::NetworkServiceHandle,
24	strategy::{
25		disconnected_peers::DisconnectedPeers,
26		state_sync::{ImportResult, StateSync, StateSyncProvider},
27		StrategyKey, SyncingAction,
28	},
29	types::{BadPeer, SyncState, SyncStatus},
30	LOG_TARGET,
31};
32use futures::{channel::oneshot, FutureExt};
33use log::{debug, error, trace};
34use prost::Message;
35use sc_client_api::ProofProvider;
36use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
37use sc_network::{IfDisconnected, ProtocolName};
38use sc_network_common::sync::message::BlockAnnounce;
39use sc_network_types::PeerId;
40use sp_consensus::BlockOrigin;
41use sp_runtime::{
42	traits::{Block as BlockT, Header, NumberFor},
43	Justifications, SaturatedConversion,
44};
45use std::{any::Any, collections::HashMap, sync::Arc};
46
47mod rep {
48	use sc_network::ReputationChange as Rep;
49
50	/// Peer response data does not have requested bits.
51	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
52
53	/// Reputation change for peers which send us a known bad state.
54	pub const BAD_STATE: Rep = Rep::new(-(1 << 29), "Bad state");
55}
56
57enum PeerState {
58	Available,
59	DownloadingState,
60}
61
62impl PeerState {
63	fn is_available(&self) -> bool {
64		matches!(self, PeerState::Available)
65	}
66}
67
68struct Peer<B: BlockT> {
69	best_number: NumberFor<B>,
70	state: PeerState,
71}
72
73/// Syncing strategy that downloads and imports a recent state directly.
74pub struct StateStrategy<B: BlockT> {
75	state_sync: Box<dyn StateSyncProvider<B>>,
76	peers: HashMap<PeerId, Peer<B>>,
77	disconnected_peers: DisconnectedPeers,
78	actions: Vec<SyncingAction<B>>,
79	protocol_name: ProtocolName,
80	succeeded: bool,
81}
82
83impl<B: BlockT> StateStrategy<B> {
84	/// Strategy key used by state sync.
85	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("State");
86
87	/// Create a new instance.
88	pub fn new<Client>(
89		client: Arc<Client>,
90		target_header: B::Header,
91		target_body: Option<Vec<B::Extrinsic>>,
92		target_justifications: Option<Justifications>,
93		skip_proof: bool,
94		initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
95		protocol_name: ProtocolName,
96	) -> Self
97	where
98		Client: ProofProvider<B> + Send + Sync + 'static,
99	{
100		let peers = initial_peers
101			.map(|(peer_id, best_number)| {
102				(peer_id, Peer { best_number, state: PeerState::Available })
103			})
104			.collect();
105		Self {
106			state_sync: Box::new(StateSync::new(
107				client,
108				target_header,
109				target_body,
110				target_justifications,
111				skip_proof,
112			)),
113			peers,
114			disconnected_peers: DisconnectedPeers::new(),
115			actions: Vec::new(),
116			protocol_name,
117			succeeded: false,
118		}
119	}
120
121	/// Create a new instance with a custom state sync provider.
122	///
123	/// Note: In most cases, users should use [`StateStrategy::new`].
124	/// This method is intended for custom sync strategies and advanced use cases.
125	pub fn new_with_provider(
126		state_sync_provider: Box<dyn StateSyncProvider<B>>,
127		initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
128		protocol_name: ProtocolName,
129	) -> Self {
130		Self {
131			state_sync: state_sync_provider,
132			peers: initial_peers
133				.map(|(peer_id, best_number)| {
134					(peer_id, Peer { best_number, state: PeerState::Available })
135				})
136				.collect(),
137			disconnected_peers: DisconnectedPeers::new(),
138			actions: Vec::new(),
139			protocol_name,
140			succeeded: false,
141		}
142	}
143
144	/// Notify that a new peer has connected.
145	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
146		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
147	}
148
149	/// Notify that a peer has disconnected.
150	pub fn remove_peer(&mut self, peer_id: &PeerId) {
151		if let Some(state) = self.peers.remove(peer_id) {
152			if !state.state.is_available() {
153				if let Some(bad_peer) =
154					self.disconnected_peers.on_disconnect_during_request(*peer_id)
155				{
156					self.actions.push(SyncingAction::DropPeer(bad_peer));
157				}
158			}
159		}
160	}
161
162	/// Submit a validated block announcement.
163	///
164	/// Returns new best hash & best number of the peer if they are updated.
165	#[must_use]
166	pub fn on_validated_block_announce(
167		&mut self,
168		is_best: bool,
169		peer_id: PeerId,
170		announce: &BlockAnnounce<B::Header>,
171	) -> Option<(B::Hash, NumberFor<B>)> {
172		is_best.then(|| {
173			let best_number = *announce.header.number();
174			let best_hash = announce.header.hash();
175			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
176				peer.best_number = best_number;
177			}
178			// Let `SyncingEngine` know that we should update the peer info.
179			(best_hash, best_number)
180		})
181	}
182
183	/// Process state response.
184	pub fn on_state_response(&mut self, peer_id: &PeerId, response: Vec<u8>) {
185		if let Err(bad_peer) = self.on_state_response_inner(peer_id, &response) {
186			self.actions.push(SyncingAction::DropPeer(bad_peer));
187		}
188	}
189
190	fn on_state_response_inner(
191		&mut self,
192		peer_id: &PeerId,
193		response: &[u8],
194	) -> Result<(), BadPeer> {
195		if let Some(peer) = self.peers.get_mut(&peer_id) {
196			peer.state = PeerState::Available;
197		}
198
199		let response = match StateResponse::decode(response) {
200			Ok(response) => response,
201			Err(error) => {
202				debug!(
203					target: LOG_TARGET,
204					"Failed to decode state response from peer {peer_id:?}: {error:?}.",
205				);
206
207				return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
208			},
209		};
210
211		debug!(
212			target: LOG_TARGET,
213			"Importing state data from {} with {} keys, {} proof nodes.",
214			peer_id,
215			response.entries.len(),
216			response.proof.len(),
217		);
218
219		match self.state_sync.import(response) {
220			ImportResult::Import(hash, header, state, body, justifications) => {
221				let origin = BlockOrigin::NetworkInitialSync;
222				let block = IncomingBlock {
223					hash,
224					header: Some(header),
225					body,
226					indexed_body: None,
227					justifications,
228					origin: None,
229					allow_missing_state: true,
230					import_existing: true,
231					skip_execution: true,
232					state: Some(state),
233				};
234				debug!(target: LOG_TARGET, "State download is complete. Import is queued");
235				self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
236				Ok(())
237			},
238			ImportResult::Continue => Ok(()),
239			ImportResult::BadResponse => {
240				debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
241				Err(BadPeer(*peer_id, rep::BAD_STATE))
242			},
243		}
244	}
245
246	/// A batch of blocks have been processed, with or without errors.
247	///
248	/// Normally this should be called when target block with state is imported.
249	pub fn on_blocks_processed(
250		&mut self,
251		imported: usize,
252		count: usize,
253		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
254	) {
255		trace!(target: LOG_TARGET, "State sync: imported {imported} of {count}.");
256
257		let results = results
258			.into_iter()
259			.filter_map(|(result, hash)| {
260				if hash == self.state_sync.target_hash() {
261					Some(result)
262				} else {
263					debug!(
264						target: LOG_TARGET,
265						"Unexpected block processed: {hash} with result {result:?}.",
266					);
267					None
268				}
269			})
270			.collect::<Vec<_>>();
271
272		if !results.is_empty() {
273			// We processed the target block
274			results.iter().filter_map(|result| result.as_ref().err()).for_each(|e| {
275				error!(
276					target: LOG_TARGET,
277					"Failed to import target block with state: {e:?}."
278				);
279			});
280			self.succeeded |= results.into_iter().any(|result| result.is_ok());
281			self.actions.push(SyncingAction::Finished);
282		}
283	}
284
285	/// Produce state request.
286	fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
287		if self.state_sync.is_complete() {
288			return None
289		}
290
291		if self
292			.peers
293			.values()
294			.any(|peer| matches!(peer.state, PeerState::DownloadingState))
295		{
296			// Only one state request at a time is possible.
297			return None
298		}
299
300		let peer_id =
301			self.schedule_next_peer(PeerState::DownloadingState, self.state_sync.target_number())?;
302		let request = self.state_sync.next_request();
303		trace!(
304			target: LOG_TARGET,
305			"New state request to {peer_id}: {request:?}.",
306		);
307		Some((peer_id, request))
308	}
309
310	fn schedule_next_peer(
311		&mut self,
312		new_state: PeerState,
313		min_best_number: NumberFor<B>,
314	) -> Option<PeerId> {
315		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
316		if targets.is_empty() {
317			return None
318		}
319		targets.sort();
320		let median = targets[targets.len() / 2];
321		let threshold = std::cmp::max(median, min_best_number);
322		// Find a random peer that is synced as much as peer majority and is above
323		// `min_best_number`.
324		for (peer_id, peer) in self.peers.iter_mut() {
325			if peer.state.is_available() &&
326				peer.best_number >= threshold &&
327				self.disconnected_peers.is_peer_available(peer_id)
328			{
329				peer.state = new_state;
330				return Some(*peer_id)
331			}
332		}
333		None
334	}
335
336	/// Returns the current sync status.
337	pub fn status(&self) -> SyncStatus<B> {
338		SyncStatus {
339			state: if self.state_sync.is_complete() {
340				SyncState::Idle
341			} else {
342				SyncState::Downloading { target: self.state_sync.target_number() }
343			},
344			best_seen_block: Some(self.state_sync.target_number()),
345			num_peers: self.peers.len().saturated_into(),
346			queued_blocks: 0,
347			state_sync: Some(self.state_sync.progress()),
348			warp_sync: None,
349		}
350	}
351
352	/// Get actions that should be performed.
353	#[must_use]
354	pub fn actions(
355		&mut self,
356		network_service: &NetworkServiceHandle,
357	) -> impl Iterator<Item = SyncingAction<B>> {
358		let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
359			let (tx, rx) = oneshot::channel();
360
361			network_service.start_request(
362				peer_id,
363				self.protocol_name.clone(),
364				request.encode_to_vec(),
365				tx,
366				IfDisconnected::ImmediateError,
367			);
368
369			SyncingAction::StartRequest {
370				peer_id,
371				key: Self::STRATEGY_KEY,
372				request: async move {
373					Ok(rx.await?.and_then(|(response, protocol_name)| {
374						Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
375					}))
376				}
377				.boxed(),
378				remove_obsolete: false,
379			}
380		});
381		self.actions.extend(state_request);
382
383		std::mem::take(&mut self.actions).into_iter()
384	}
385
386	/// Check if state sync has succeeded.
387	#[must_use]
388	pub fn is_succeeded(&self) -> bool {
389		self.succeeded
390	}
391}
392
393#[cfg(test)]
394mod test {
395	use super::*;
396	use crate::{
397		schema::v1::{StateRequest, StateResponse},
398		service::network::NetworkServiceProvider,
399		strategy::state_sync::{ImportResult, StateSyncProgress, StateSyncProvider},
400	};
401	use codec::Decode;
402	use sc_block_builder::BlockBuilderBuilder;
403	use sc_client_api::KeyValueStates;
404	use sc_consensus::{ImportedAux, ImportedState};
405	use sp_core::H256;
406	use sp_runtime::traits::Zero;
407	use substrate_test_runtime_client::{
408		runtime::{Block, Hash},
409		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
410	};
411
412	mockall::mock! {
413		pub StateSync<B: BlockT> {}
414
415		impl<B: BlockT> StateSyncProvider<B> for StateSync<B> {
416			fn import(&mut self, response: StateResponse) -> ImportResult<B>;
417			fn next_request(&self) -> StateRequest;
418			fn is_complete(&self) -> bool;
419			fn target_number(&self) -> NumberFor<B>;
420			fn target_hash(&self) -> B::Hash;
421			fn progress(&self) -> StateSyncProgress;
422		}
423	}
424
425	#[test]
426	fn no_peer_is_scheduled_if_no_peers_connected() {
427		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
428		let target_block = BlockBuilderBuilder::new(&*client)
429			.on_parent_block(client.chain_info().best_hash)
430			.with_parent_block_number(client.chain_info().best_number)
431			.build()
432			.unwrap()
433			.build()
434			.unwrap()
435			.block;
436		let target_header = target_block.header().clone();
437
438		let mut state_strategy = StateStrategy::new(
439			client,
440			target_header,
441			None,
442			None,
443			false,
444			std::iter::empty(),
445			ProtocolName::Static(""),
446		);
447
448		assert!(state_strategy
449			.schedule_next_peer(PeerState::DownloadingState, Zero::zero())
450			.is_none());
451	}
452
453	#[test]
454	fn at_least_median_synced_peer_is_scheduled() {
455		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
456		let target_block = BlockBuilderBuilder::new(&*client)
457			.on_parent_block(client.chain_info().best_hash)
458			.with_parent_block_number(client.chain_info().best_number)
459			.build()
460			.unwrap()
461			.build()
462			.unwrap()
463			.block;
464
465		for _ in 0..100 {
466			let peers = (1..=10)
467				.map(|best_number| (PeerId::random(), best_number))
468				.collect::<HashMap<_, _>>();
469			let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
470
471			let mut state_strategy = StateStrategy::new(
472				client.clone(),
473				target_block.header().clone(),
474				None,
475				None,
476				false,
477				initial_peers,
478				ProtocolName::Static(""),
479			);
480
481			let peer_id =
482				state_strategy.schedule_next_peer(PeerState::DownloadingState, Zero::zero());
483			assert!(*peers.get(&peer_id.unwrap()).unwrap() >= 6);
484		}
485	}
486
487	#[test]
488	fn min_best_number_peer_is_scheduled() {
489		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
490		let target_block = BlockBuilderBuilder::new(&*client)
491			.on_parent_block(client.chain_info().best_hash)
492			.with_parent_block_number(client.chain_info().best_number)
493			.build()
494			.unwrap()
495			.build()
496			.unwrap()
497			.block;
498
499		for _ in 0..10 {
500			let peers = (1..=10)
501				.map(|best_number| (PeerId::random(), best_number))
502				.collect::<HashMap<_, _>>();
503			let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
504
505			let mut state_strategy = StateStrategy::new(
506				client.clone(),
507				target_block.header().clone(),
508				None,
509				None,
510				false,
511				initial_peers,
512				ProtocolName::Static(""),
513			);
514
515			let peer_id = state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
516			assert!(*peers.get(&peer_id.unwrap()).unwrap() == 10);
517		}
518	}
519
520	#[test]
521	fn backedoff_number_peer_is_not_scheduled() {
522		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
523		let target_block = BlockBuilderBuilder::new(&*client)
524			.on_parent_block(client.chain_info().best_hash)
525			.with_parent_block_number(client.chain_info().best_number)
526			.build()
527			.unwrap()
528			.build()
529			.unwrap()
530			.block;
531
532		let peers = (1..=10)
533			.map(|best_number| (PeerId::random(), best_number))
534			.collect::<Vec<(_, _)>>();
535		let ninth_peer = peers[8].0;
536		let tenth_peer = peers[9].0;
537		let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
538
539		let mut state_strategy = StateStrategy::new(
540			client.clone(),
541			target_block.header().clone(),
542			None,
543			None,
544			false,
545			initial_peers,
546			ProtocolName::Static(""),
547		);
548
549		// Disconnecting a peer without an inflight request has no effect on persistent states.
550		state_strategy.remove_peer(&tenth_peer);
551		assert!(state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
552
553		// Disconnect the peer with an inflight request.
554		state_strategy.add_peer(tenth_peer, H256::random(), 10);
555		let peer_id: Option<PeerId> =
556			state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
557		assert_eq!(tenth_peer, peer_id.unwrap());
558		state_strategy.remove_peer(&tenth_peer);
559
560		// Peer is backed off.
561		assert!(!state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
562
563		// No peer available for 10'th best block because of the backoff.
564		state_strategy.add_peer(tenth_peer, H256::random(), 10);
565		let peer_id: Option<PeerId> =
566			state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
567		assert!(peer_id.is_none());
568
569		// Other requests can still happen.
570		let peer_id: Option<PeerId> =
571			state_strategy.schedule_next_peer(PeerState::DownloadingState, 9);
572		assert_eq!(ninth_peer, peer_id.unwrap());
573	}
574
575	#[test]
576	fn state_request_contains_correct_hash() {
577		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
578		let target_block = BlockBuilderBuilder::new(&*client)
579			.on_parent_block(client.chain_info().best_hash)
580			.with_parent_block_number(client.chain_info().best_number)
581			.build()
582			.unwrap()
583			.build()
584			.unwrap()
585			.block;
586
587		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
588
589		let mut state_strategy = StateStrategy::new(
590			client.clone(),
591			target_block.header().clone(),
592			None,
593			None,
594			false,
595			initial_peers,
596			ProtocolName::Static(""),
597		);
598
599		let (_peer_id, request) = state_strategy.state_request().unwrap();
600		let hash = Hash::decode(&mut &*request.block).unwrap();
601
602		assert_eq!(hash, target_block.header().hash());
603	}
604
605	#[test]
606	fn no_parallel_state_requests() {
607		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
608		let target_block = BlockBuilderBuilder::new(&*client)
609			.on_parent_block(client.chain_info().best_hash)
610			.with_parent_block_number(client.chain_info().best_number)
611			.build()
612			.unwrap()
613			.build()
614			.unwrap()
615			.block;
616
617		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
618
619		let mut state_strategy = StateStrategy::new(
620			client.clone(),
621			target_block.header().clone(),
622			None,
623			None,
624			false,
625			initial_peers,
626			ProtocolName::Static(""),
627		);
628
629		// First request is sent.
630		assert!(state_strategy.state_request().is_some());
631
632		// No parallel request is sent.
633		assert!(state_strategy.state_request().is_none());
634	}
635
636	#[test]
637	fn received_state_response_makes_peer_available_again() {
638		let mut state_sync_provider = MockStateSync::<Block>::new();
639		state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
640		let peer_id = PeerId::random();
641		let initial_peers = std::iter::once((peer_id, 10));
642		let mut state_strategy = StateStrategy::new_with_provider(
643			Box::new(state_sync_provider),
644			initial_peers,
645			ProtocolName::Static(""),
646		);
647		// Manually set the peer's state.
648		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
649
650		let dummy_response = StateResponse::default().encode_to_vec();
651		state_strategy.on_state_response(&peer_id, dummy_response);
652
653		assert!(state_strategy.peers.get(&peer_id).unwrap().state.is_available());
654	}
655
656	#[test]
657	fn bad_state_response_drops_peer() {
658		let mut state_sync_provider = MockStateSync::<Block>::new();
659		// Provider says that state response is bad.
660		state_sync_provider.expect_import().return_once(|_| ImportResult::BadResponse);
661		let peer_id = PeerId::random();
662		let initial_peers = std::iter::once((peer_id, 10));
663		let mut state_strategy = StateStrategy::new_with_provider(
664			Box::new(state_sync_provider),
665			initial_peers,
666			ProtocolName::Static(""),
667		);
668		// Manually set the peer's state.
669		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
670		let dummy_response = StateResponse::default().encode_to_vec();
671		// Receiving response drops the peer.
672		assert!(matches!(
673			state_strategy.on_state_response_inner(&peer_id, &dummy_response),
674			Err(BadPeer(id, _rep)) if id == peer_id,
675		));
676	}
677
678	#[test]
679	fn partial_state_response_doesnt_generate_actions() {
680		let mut state_sync_provider = MockStateSync::<Block>::new();
681		// Sync provider says that the response is partial.
682		state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
683		let peer_id = PeerId::random();
684		let initial_peers = std::iter::once((peer_id, 10));
685		let mut state_strategy = StateStrategy::new_with_provider(
686			Box::new(state_sync_provider),
687			initial_peers,
688			ProtocolName::Static(""),
689		);
690		// Manually set the peer's state .
691		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
692
693		let dummy_response = StateResponse::default().encode_to_vec();
694		state_strategy.on_state_response(&peer_id, dummy_response);
695
696		// No actions generated.
697		assert_eq!(state_strategy.actions.len(), 0)
698	}
699
700	#[test]
701	fn complete_state_response_leads_to_block_import() {
702		// Build block to use for checks.
703		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
704		let mut block_builder = BlockBuilderBuilder::new(&*client)
705			.on_parent_block(client.chain_info().best_hash)
706			.with_parent_block_number(client.chain_info().best_number)
707			.build()
708			.unwrap();
709		block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap();
710		let block = block_builder.build().unwrap().block;
711		let header = block.header().clone();
712		let hash = header.hash();
713		let body = Some(block.extrinsics().iter().cloned().collect::<Vec<_>>());
714		let state = ImportedState { block: hash, state: KeyValueStates(Vec::new()) };
715		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
716
717		// Prepare `StateSync`
718		let mut state_sync_provider = MockStateSync::<Block>::new();
719		let import = ImportResult::Import(
720			hash,
721			header.clone(),
722			state.clone(),
723			body.clone(),
724			justifications.clone(),
725		);
726		state_sync_provider.expect_import().return_once(move |_| import);
727
728		// Reference values to check against.
729		let expected_origin = BlockOrigin::NetworkInitialSync;
730		let expected_block = IncomingBlock {
731			hash,
732			header: Some(header),
733			body,
734			indexed_body: None,
735			justifications,
736			origin: None,
737			allow_missing_state: true,
738			import_existing: true,
739			skip_execution: true,
740			state: Some(state),
741		};
742		let expected_blocks = vec![expected_block];
743
744		// Prepare `StateStrategy`.
745		let peer_id = PeerId::random();
746		let initial_peers = std::iter::once((peer_id, 10));
747		let mut state_strategy = StateStrategy::new_with_provider(
748			Box::new(state_sync_provider),
749			initial_peers,
750			ProtocolName::Static(""),
751		);
752		// Manually set the peer's state .
753		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
754
755		// Receive response.
756		let dummy_response = StateResponse::default().encode_to_vec();
757		state_strategy.on_state_response(&peer_id, dummy_response);
758
759		assert_eq!(state_strategy.actions.len(), 1);
760		assert!(matches!(
761			&state_strategy.actions[0],
762			SyncingAction::ImportBlocks { origin, blocks }
763				if *origin == expected_origin && *blocks == expected_blocks,
764		));
765	}
766
767	#[test]
768	fn importing_unknown_block_doesnt_finish_strategy() {
769		let target_hash = Hash::random();
770		let unknown_hash = Hash::random();
771		let mut state_sync_provider = MockStateSync::<Block>::new();
772		state_sync_provider.expect_target_hash().return_const(target_hash);
773
774		let mut state_strategy = StateStrategy::new_with_provider(
775			Box::new(state_sync_provider),
776			std::iter::empty(),
777			ProtocolName::Static(""),
778		);
779
780		// Unknown block imported.
781		state_strategy.on_blocks_processed(
782			1,
783			1,
784			vec![(
785				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
786				unknown_hash,
787			)],
788		);
789
790		// No actions generated.
791		assert_eq!(state_strategy.actions.len(), 0);
792	}
793
794	#[test]
795	fn successfully_importing_target_block_finishes_strategy() {
796		let target_hash = Hash::random();
797		let mut state_sync_provider = MockStateSync::<Block>::new();
798		state_sync_provider.expect_target_hash().return_const(target_hash);
799
800		let mut state_strategy = StateStrategy::new_with_provider(
801			Box::new(state_sync_provider),
802			std::iter::empty(),
803			ProtocolName::Static(""),
804		);
805
806		// Target block imported.
807		state_strategy.on_blocks_processed(
808			1,
809			1,
810			vec![(
811				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
812				target_hash,
813			)],
814		);
815
816		// Strategy finishes.
817		assert_eq!(state_strategy.actions.len(), 1);
818		assert!(matches!(&state_strategy.actions[0], SyncingAction::Finished));
819	}
820
821	#[test]
822	fn failure_to_import_target_block_finishes_strategy() {
823		let target_hash = Hash::random();
824		let mut state_sync_provider = MockStateSync::<Block>::new();
825		state_sync_provider.expect_target_hash().return_const(target_hash);
826
827		let mut state_strategy = StateStrategy::new_with_provider(
828			Box::new(state_sync_provider),
829			std::iter::empty(),
830			ProtocolName::Static(""),
831		);
832
833		// Target block import failed.
834		state_strategy.on_blocks_processed(
835			1,
836			1,
837			vec![(
838				Err(BlockImportError::VerificationFailed(None, String::from("test-error"))),
839				target_hash,
840			)],
841		);
842
843		// Strategy finishes.
844		assert_eq!(state_strategy.actions.len(), 1);
845		assert!(matches!(&state_strategy.actions[0], SyncingAction::Finished));
846	}
847
848	#[test]
849	fn finished_strategy_doesnt_generate_more_actions() {
850		let target_hash = Hash::random();
851		let mut state_sync_provider = MockStateSync::<Block>::new();
852		state_sync_provider.expect_target_hash().return_const(target_hash);
853		state_sync_provider.expect_is_complete().return_const(true);
854
855		// Get enough peers for possible spurious requests.
856		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
857
858		let mut state_strategy = StateStrategy::new_with_provider(
859			Box::new(state_sync_provider),
860			initial_peers,
861			ProtocolName::Static(""),
862		);
863
864		state_strategy.on_blocks_processed(
865			1,
866			1,
867			vec![(
868				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
869				target_hash,
870			)],
871		);
872
873		let network_provider = NetworkServiceProvider::new();
874		let network_handle = network_provider.handle();
875
876		// Strategy finishes.
877		let actions = state_strategy.actions(&network_handle).collect::<Vec<_>>();
878		assert_eq!(actions.len(), 1);
879		assert!(matches!(&actions[0], SyncingAction::Finished));
880
881		// No more actions generated.
882		assert_eq!(state_strategy.actions(&network_handle).count(), 0);
883	}
884}