sc_network_sync/strategy/
state.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! State sync strategy.
20
21use crate::{
22	schema::v1::StateResponse,
23	strategy::{
24		disconnected_peers::DisconnectedPeers,
25		state_sync::{ImportResult, StateSync, StateSyncProvider},
26	},
27	types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncState, SyncStatus},
28	LOG_TARGET,
29};
30use log::{debug, error, trace};
31use sc_client_api::ProofProvider;
32use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
33use sc_network::ProtocolName;
34use sc_network_common::sync::message::BlockAnnounce;
35use sc_network_types::PeerId;
36use sp_consensus::BlockOrigin;
37use sp_runtime::{
38	traits::{Block as BlockT, Header, NumberFor},
39	Justifications, SaturatedConversion,
40};
41use std::{collections::HashMap, sync::Arc};
42
43mod rep {
44	use sc_network::ReputationChange as Rep;
45
46	/// Peer response data does not have requested bits.
47	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
48
49	/// Reputation change for peers which send us a known bad state.
50	pub const BAD_STATE: Rep = Rep::new(-(1 << 29), "Bad state");
51}
52
53/// Action that should be performed on [`StateStrategy`]'s behalf.
54pub enum StateStrategyAction<B: BlockT> {
55	/// Send state request to peer.
56	SendStateRequest { peer_id: PeerId, protocol_name: ProtocolName, request: OpaqueStateRequest },
57	/// Disconnect and report peer.
58	DropPeer(BadPeer),
59	/// Import blocks.
60	ImportBlocks { origin: BlockOrigin, blocks: Vec<IncomingBlock<B>> },
61	/// State sync has finished.
62	Finished,
63}
64
65enum PeerState {
66	Available,
67	DownloadingState,
68}
69
70impl PeerState {
71	fn is_available(&self) -> bool {
72		matches!(self, PeerState::Available)
73	}
74}
75
76struct Peer<B: BlockT> {
77	best_number: NumberFor<B>,
78	state: PeerState,
79}
80
81/// Syncing strategy that downloads and imports a recent state directly.
82pub struct StateStrategy<B: BlockT> {
83	state_sync: Box<dyn StateSyncProvider<B>>,
84	peers: HashMap<PeerId, Peer<B>>,
85	disconnected_peers: DisconnectedPeers,
86	actions: Vec<StateStrategyAction<B>>,
87	protocol_name: ProtocolName,
88	succeeded: bool,
89}
90
91impl<B: BlockT> StateStrategy<B> {
92	/// Create a new instance.
93	pub fn new<Client>(
94		client: Arc<Client>,
95		target_header: B::Header,
96		target_body: Option<Vec<B::Extrinsic>>,
97		target_justifications: Option<Justifications>,
98		skip_proof: bool,
99		initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
100		protocol_name: ProtocolName,
101	) -> Self
102	where
103		Client: ProofProvider<B> + Send + Sync + 'static,
104	{
105		let peers = initial_peers
106			.map(|(peer_id, best_number)| {
107				(peer_id, Peer { best_number, state: PeerState::Available })
108			})
109			.collect();
110		Self {
111			state_sync: Box::new(StateSync::new(
112				client,
113				target_header,
114				target_body,
115				target_justifications,
116				skip_proof,
117			)),
118			peers,
119			disconnected_peers: DisconnectedPeers::new(),
120			actions: Vec::new(),
121			protocol_name,
122			succeeded: false,
123		}
124	}
125
126	// Create a new instance with a custom state sync provider.
127	// Used in tests.
128	#[cfg(test)]
129	fn new_with_provider(
130		state_sync_provider: Box<dyn StateSyncProvider<B>>,
131		initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
132		protocol_name: ProtocolName,
133	) -> Self {
134		Self {
135			state_sync: state_sync_provider,
136			peers: initial_peers
137				.map(|(peer_id, best_number)| {
138					(peer_id, Peer { best_number, state: PeerState::Available })
139				})
140				.collect(),
141			disconnected_peers: DisconnectedPeers::new(),
142			actions: Vec::new(),
143			protocol_name,
144			succeeded: false,
145		}
146	}
147
148	/// Notify that a new peer has connected.
149	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
150		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
151	}
152
153	/// Notify that a peer has disconnected.
154	pub fn remove_peer(&mut self, peer_id: &PeerId) {
155		if let Some(state) = self.peers.remove(peer_id) {
156			if !state.state.is_available() {
157				if let Some(bad_peer) =
158					self.disconnected_peers.on_disconnect_during_request(*peer_id)
159				{
160					self.actions.push(StateStrategyAction::DropPeer(bad_peer));
161				}
162			}
163		}
164	}
165
166	/// Submit a validated block announcement.
167	///
168	/// Returns new best hash & best number of the peer if they are updated.
169	#[must_use]
170	pub fn on_validated_block_announce(
171		&mut self,
172		is_best: bool,
173		peer_id: PeerId,
174		announce: &BlockAnnounce<B::Header>,
175	) -> Option<(B::Hash, NumberFor<B>)> {
176		is_best.then_some({
177			let best_number = *announce.header.number();
178			let best_hash = announce.header.hash();
179			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
180				peer.best_number = best_number;
181			}
182			// Let `SyncingEngine` know that we should update the peer info.
183			(best_hash, best_number)
184		})
185	}
186
187	/// Process state response.
188	pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) {
189		if let Err(bad_peer) = self.on_state_response_inner(peer_id, response) {
190			self.actions.push(StateStrategyAction::DropPeer(bad_peer));
191		}
192	}
193
194	fn on_state_response_inner(
195		&mut self,
196		peer_id: PeerId,
197		response: OpaqueStateResponse,
198	) -> Result<(), BadPeer> {
199		if let Some(peer) = self.peers.get_mut(&peer_id) {
200			peer.state = PeerState::Available;
201		}
202
203		let response: Box<StateResponse> = response.0.downcast().map_err(|_error| {
204			error!(
205				target: LOG_TARGET,
206				"Failed to downcast opaque state response, this is an implementation bug."
207			);
208			debug_assert!(false);
209
210			BadPeer(peer_id, rep::BAD_RESPONSE)
211		})?;
212
213		debug!(
214			target: LOG_TARGET,
215			"Importing state data from {} with {} keys, {} proof nodes.",
216			peer_id,
217			response.entries.len(),
218			response.proof.len(),
219		);
220
221		match self.state_sync.import(*response) {
222			ImportResult::Import(hash, header, state, body, justifications) => {
223				let origin = BlockOrigin::NetworkInitialSync;
224				let block = IncomingBlock {
225					hash,
226					header: Some(header),
227					body,
228					indexed_body: None,
229					justifications,
230					origin: None,
231					allow_missing_state: true,
232					import_existing: true,
233					skip_execution: true,
234					state: Some(state),
235				};
236				debug!(target: LOG_TARGET, "State download is complete. Import is queued");
237				self.actions
238					.push(StateStrategyAction::ImportBlocks { origin, blocks: vec![block] });
239				Ok(())
240			},
241			ImportResult::Continue => Ok(()),
242			ImportResult::BadResponse => {
243				debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
244				Err(BadPeer(peer_id, rep::BAD_STATE))
245			},
246		}
247	}
248
249	/// A batch of blocks have been processed, with or without errors.
250	///
251	/// Normally this should be called when target block with state is imported.
252	pub fn on_blocks_processed(
253		&mut self,
254		imported: usize,
255		count: usize,
256		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
257	) {
258		trace!(target: LOG_TARGET, "State sync: imported {imported} of {count}.");
259
260		let results = results
261			.into_iter()
262			.filter_map(|(result, hash)| {
263				if hash == self.state_sync.target_hash() {
264					Some(result)
265				} else {
266					debug!(
267						target: LOG_TARGET,
268						"Unexpected block processed: {hash} with result {result:?}.",
269					);
270					None
271				}
272			})
273			.collect::<Vec<_>>();
274
275		if !results.is_empty() {
276			// We processed the target block
277			results.iter().filter_map(|result| result.as_ref().err()).for_each(|e| {
278				error!(
279					target: LOG_TARGET,
280					"Failed to import target block with state: {e:?}."
281				);
282			});
283			self.succeeded |= results.into_iter().any(|result| result.is_ok());
284			self.actions.push(StateStrategyAction::Finished);
285		}
286	}
287
288	/// Produce state request.
289	fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
290		if self.state_sync.is_complete() {
291			return None
292		}
293
294		if self
295			.peers
296			.values()
297			.any(|peer| matches!(peer.state, PeerState::DownloadingState))
298		{
299			// Only one state request at a time is possible.
300			return None
301		}
302
303		let peer_id =
304			self.schedule_next_peer(PeerState::DownloadingState, self.state_sync.target_number())?;
305		let request = self.state_sync.next_request();
306		trace!(
307			target: LOG_TARGET,
308			"New state request to {peer_id}: {request:?}.",
309		);
310		Some((peer_id, OpaqueStateRequest(Box::new(request))))
311	}
312
313	fn schedule_next_peer(
314		&mut self,
315		new_state: PeerState,
316		min_best_number: NumberFor<B>,
317	) -> Option<PeerId> {
318		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
319		if targets.is_empty() {
320			return None
321		}
322		targets.sort();
323		let median = targets[targets.len() / 2];
324		let threshold = std::cmp::max(median, min_best_number);
325		// Find a random peer that is synced as much as peer majority and is above
326		// `min_best_number`.
327		for (peer_id, peer) in self.peers.iter_mut() {
328			if peer.state.is_available() &&
329				peer.best_number >= threshold &&
330				self.disconnected_peers.is_peer_available(peer_id)
331			{
332				peer.state = new_state;
333				return Some(*peer_id)
334			}
335		}
336		None
337	}
338
339	/// Returns the current sync status.
340	pub fn status(&self) -> SyncStatus<B> {
341		SyncStatus {
342			state: if self.state_sync.is_complete() {
343				SyncState::Idle
344			} else {
345				SyncState::Downloading { target: self.state_sync.target_number() }
346			},
347			best_seen_block: Some(self.state_sync.target_number()),
348			num_peers: self.peers.len().saturated_into(),
349			queued_blocks: 0,
350			state_sync: Some(self.state_sync.progress()),
351			warp_sync: None,
352		}
353	}
354
355	/// Get actions that should be performed by the owner on [`WarpSync`]'s behalf
356	#[must_use]
357	pub fn actions(&mut self) -> impl Iterator<Item = StateStrategyAction<B>> {
358		let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
359			StateStrategyAction::SendStateRequest {
360				peer_id,
361				protocol_name: self.protocol_name.clone(),
362				request,
363			}
364		});
365		self.actions.extend(state_request);
366
367		std::mem::take(&mut self.actions).into_iter()
368	}
369
370	/// Check if state sync has succeeded.
371	#[must_use]
372	pub fn is_succeeded(&self) -> bool {
373		self.succeeded
374	}
375}
376
377#[cfg(test)]
378mod test {
379	use super::*;
380	use crate::{
381		schema::v1::{StateRequest, StateResponse},
382		strategy::state_sync::{ImportResult, StateSyncProgress, StateSyncProvider},
383	};
384	use codec::Decode;
385	use sc_block_builder::BlockBuilderBuilder;
386	use sc_client_api::KeyValueStates;
387	use sc_consensus::{ImportedAux, ImportedState};
388	use sp_core::H256;
389	use sp_runtime::traits::Zero;
390	use substrate_test_runtime_client::{
391		runtime::{Block, Hash},
392		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
393	};
394
395	mockall::mock! {
396		pub StateSync<B: BlockT> {}
397
398		impl<B: BlockT> StateSyncProvider<B> for StateSync<B> {
399			fn import(&mut self, response: StateResponse) -> ImportResult<B>;
400			fn next_request(&self) -> StateRequest;
401			fn is_complete(&self) -> bool;
402			fn target_number(&self) -> NumberFor<B>;
403			fn target_hash(&self) -> B::Hash;
404			fn progress(&self) -> StateSyncProgress;
405		}
406	}
407
408	#[test]
409	fn no_peer_is_scheduled_if_no_peers_connected() {
410		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
411		let target_block = BlockBuilderBuilder::new(&*client)
412			.on_parent_block(client.chain_info().best_hash)
413			.with_parent_block_number(client.chain_info().best_number)
414			.build()
415			.unwrap()
416			.build()
417			.unwrap()
418			.block;
419		let target_header = target_block.header().clone();
420
421		let mut state_strategy = StateStrategy::new(
422			client,
423			target_header,
424			None,
425			None,
426			false,
427			std::iter::empty(),
428			ProtocolName::Static(""),
429		);
430
431		assert!(state_strategy
432			.schedule_next_peer(PeerState::DownloadingState, Zero::zero())
433			.is_none());
434	}
435
436	#[test]
437	fn at_least_median_synced_peer_is_scheduled() {
438		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
439		let target_block = BlockBuilderBuilder::new(&*client)
440			.on_parent_block(client.chain_info().best_hash)
441			.with_parent_block_number(client.chain_info().best_number)
442			.build()
443			.unwrap()
444			.build()
445			.unwrap()
446			.block;
447
448		for _ in 0..100 {
449			let peers = (1..=10)
450				.map(|best_number| (PeerId::random(), best_number))
451				.collect::<HashMap<_, _>>();
452			let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
453
454			let mut state_strategy = StateStrategy::new(
455				client.clone(),
456				target_block.header().clone(),
457				None,
458				None,
459				false,
460				initial_peers,
461				ProtocolName::Static(""),
462			);
463
464			let peer_id =
465				state_strategy.schedule_next_peer(PeerState::DownloadingState, Zero::zero());
466			assert!(*peers.get(&peer_id.unwrap()).unwrap() >= 6);
467		}
468	}
469
470	#[test]
471	fn min_best_number_peer_is_scheduled() {
472		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
473		let target_block = BlockBuilderBuilder::new(&*client)
474			.on_parent_block(client.chain_info().best_hash)
475			.with_parent_block_number(client.chain_info().best_number)
476			.build()
477			.unwrap()
478			.build()
479			.unwrap()
480			.block;
481
482		for _ in 0..10 {
483			let peers = (1..=10)
484				.map(|best_number| (PeerId::random(), best_number))
485				.collect::<HashMap<_, _>>();
486			let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
487
488			let mut state_strategy = StateStrategy::new(
489				client.clone(),
490				target_block.header().clone(),
491				None,
492				None,
493				false,
494				initial_peers,
495				ProtocolName::Static(""),
496			);
497
498			let peer_id = state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
499			assert!(*peers.get(&peer_id.unwrap()).unwrap() == 10);
500		}
501	}
502
503	#[test]
504	fn backedoff_number_peer_is_not_scheduled() {
505		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
506		let target_block = BlockBuilderBuilder::new(&*client)
507			.on_parent_block(client.chain_info().best_hash)
508			.with_parent_block_number(client.chain_info().best_number)
509			.build()
510			.unwrap()
511			.build()
512			.unwrap()
513			.block;
514
515		let peers = (1..=10)
516			.map(|best_number| (PeerId::random(), best_number))
517			.collect::<Vec<(_, _)>>();
518		let ninth_peer = peers[8].0;
519		let tenth_peer = peers[9].0;
520		let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
521
522		let mut state_strategy = StateStrategy::new(
523			client.clone(),
524			target_block.header().clone(),
525			None,
526			None,
527			false,
528			initial_peers,
529			ProtocolName::Static(""),
530		);
531
532		// Disconnecting a peer without an inflight request has no effect on persistent states.
533		state_strategy.remove_peer(&tenth_peer);
534		assert!(state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
535
536		// Disconnect the peer with an inflight request.
537		state_strategy.add_peer(tenth_peer, H256::random(), 10);
538		let peer_id: Option<PeerId> =
539			state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
540		assert_eq!(tenth_peer, peer_id.unwrap());
541		state_strategy.remove_peer(&tenth_peer);
542
543		// Peer is backed off.
544		assert!(!state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
545
546		// No peer available for 10'th best block because of the backoff.
547		state_strategy.add_peer(tenth_peer, H256::random(), 10);
548		let peer_id: Option<PeerId> =
549			state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
550		assert!(peer_id.is_none());
551
552		// Other requests can still happen.
553		let peer_id: Option<PeerId> =
554			state_strategy.schedule_next_peer(PeerState::DownloadingState, 9);
555		assert_eq!(ninth_peer, peer_id.unwrap());
556	}
557
558	#[test]
559	fn state_request_contains_correct_hash() {
560		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
561		let target_block = BlockBuilderBuilder::new(&*client)
562			.on_parent_block(client.chain_info().best_hash)
563			.with_parent_block_number(client.chain_info().best_number)
564			.build()
565			.unwrap()
566			.build()
567			.unwrap()
568			.block;
569
570		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
571
572		let mut state_strategy = StateStrategy::new(
573			client.clone(),
574			target_block.header().clone(),
575			None,
576			None,
577			false,
578			initial_peers,
579			ProtocolName::Static(""),
580		);
581
582		let (_peer_id, mut opaque_request) = state_strategy.state_request().unwrap();
583		let request: &mut StateRequest = opaque_request.0.downcast_mut().unwrap();
584		let hash = Hash::decode(&mut &*request.block).unwrap();
585
586		assert_eq!(hash, target_block.header().hash());
587	}
588
589	#[test]
590	fn no_parallel_state_requests() {
591		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
592		let target_block = BlockBuilderBuilder::new(&*client)
593			.on_parent_block(client.chain_info().best_hash)
594			.with_parent_block_number(client.chain_info().best_number)
595			.build()
596			.unwrap()
597			.build()
598			.unwrap()
599			.block;
600
601		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
602
603		let mut state_strategy = StateStrategy::new(
604			client.clone(),
605			target_block.header().clone(),
606			None,
607			None,
608			false,
609			initial_peers,
610			ProtocolName::Static(""),
611		);
612
613		// First request is sent.
614		assert!(state_strategy.state_request().is_some());
615
616		// No parallel request is sent.
617		assert!(state_strategy.state_request().is_none());
618	}
619
620	#[test]
621	fn received_state_response_makes_peer_available_again() {
622		let mut state_sync_provider = MockStateSync::<Block>::new();
623		state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
624		let peer_id = PeerId::random();
625		let initial_peers = std::iter::once((peer_id, 10));
626		let mut state_strategy = StateStrategy::new_with_provider(
627			Box::new(state_sync_provider),
628			initial_peers,
629			ProtocolName::Static(""),
630		);
631		// Manually set the peer's state.
632		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
633
634		let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default()));
635		state_strategy.on_state_response(peer_id, dummy_response);
636
637		assert!(state_strategy.peers.get(&peer_id).unwrap().state.is_available());
638	}
639
640	#[test]
641	fn bad_state_response_drops_peer() {
642		let mut state_sync_provider = MockStateSync::<Block>::new();
643		// Provider says that state response is bad.
644		state_sync_provider.expect_import().return_once(|_| ImportResult::BadResponse);
645		let peer_id = PeerId::random();
646		let initial_peers = std::iter::once((peer_id, 10));
647		let mut state_strategy = StateStrategy::new_with_provider(
648			Box::new(state_sync_provider),
649			initial_peers,
650			ProtocolName::Static(""),
651		);
652		// Manually set the peer's state.
653		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
654		let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default()));
655		// Receiving response drops the peer.
656		assert!(matches!(
657			state_strategy.on_state_response_inner(peer_id, dummy_response),
658			Err(BadPeer(id, _rep)) if id == peer_id,
659		));
660	}
661
662	#[test]
663	fn partial_state_response_doesnt_generate_actions() {
664		let mut state_sync_provider = MockStateSync::<Block>::new();
665		// Sync provider says that the response is partial.
666		state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
667		let peer_id = PeerId::random();
668		let initial_peers = std::iter::once((peer_id, 10));
669		let mut state_strategy = StateStrategy::new_with_provider(
670			Box::new(state_sync_provider),
671			initial_peers,
672			ProtocolName::Static(""),
673		);
674		// Manually set the peer's state .
675		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
676
677		let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default()));
678		state_strategy.on_state_response(peer_id, dummy_response);
679
680		// No actions generated.
681		assert_eq!(state_strategy.actions.len(), 0)
682	}
683
684	#[test]
685	fn complete_state_response_leads_to_block_import() {
686		// Build block to use for checks.
687		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
688		let mut block_builder = BlockBuilderBuilder::new(&*client)
689			.on_parent_block(client.chain_info().best_hash)
690			.with_parent_block_number(client.chain_info().best_number)
691			.build()
692			.unwrap();
693		block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap();
694		let block = block_builder.build().unwrap().block;
695		let header = block.header().clone();
696		let hash = header.hash();
697		let body = Some(block.extrinsics().iter().cloned().collect::<Vec<_>>());
698		let state = ImportedState { block: hash, state: KeyValueStates(Vec::new()) };
699		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
700
701		// Prepare `StateSync`
702		let mut state_sync_provider = MockStateSync::<Block>::new();
703		let import = ImportResult::Import(
704			hash,
705			header.clone(),
706			state.clone(),
707			body.clone(),
708			justifications.clone(),
709		);
710		state_sync_provider.expect_import().return_once(move |_| import);
711
712		// Reference values to check against.
713		let expected_origin = BlockOrigin::NetworkInitialSync;
714		let expected_block = IncomingBlock {
715			hash,
716			header: Some(header),
717			body,
718			indexed_body: None,
719			justifications,
720			origin: None,
721			allow_missing_state: true,
722			import_existing: true,
723			skip_execution: true,
724			state: Some(state),
725		};
726		let expected_blocks = vec![expected_block];
727
728		// Prepare `StateStrategy`.
729		let peer_id = PeerId::random();
730		let initial_peers = std::iter::once((peer_id, 10));
731		let mut state_strategy = StateStrategy::new_with_provider(
732			Box::new(state_sync_provider),
733			initial_peers,
734			ProtocolName::Static(""),
735		);
736		// Manually set the peer's state .
737		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
738
739		// Receive response.
740		let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default()));
741		state_strategy.on_state_response(peer_id, dummy_response);
742
743		assert_eq!(state_strategy.actions.len(), 1);
744		assert!(matches!(
745			&state_strategy.actions[0],
746			StateStrategyAction::ImportBlocks { origin, blocks }
747				if *origin == expected_origin && *blocks == expected_blocks,
748		));
749	}
750
751	#[test]
752	fn importing_unknown_block_doesnt_finish_strategy() {
753		let target_hash = Hash::random();
754		let unknown_hash = Hash::random();
755		let mut state_sync_provider = MockStateSync::<Block>::new();
756		state_sync_provider.expect_target_hash().return_const(target_hash);
757
758		let mut state_strategy = StateStrategy::new_with_provider(
759			Box::new(state_sync_provider),
760			std::iter::empty(),
761			ProtocolName::Static(""),
762		);
763
764		// Unknown block imported.
765		state_strategy.on_blocks_processed(
766			1,
767			1,
768			vec![(
769				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
770				unknown_hash,
771			)],
772		);
773
774		// No actions generated.
775		assert_eq!(state_strategy.actions.len(), 0);
776	}
777
778	#[test]
779	fn successfully_importing_target_block_finishes_strategy() {
780		let target_hash = Hash::random();
781		let mut state_sync_provider = MockStateSync::<Block>::new();
782		state_sync_provider.expect_target_hash().return_const(target_hash);
783
784		let mut state_strategy = StateStrategy::new_with_provider(
785			Box::new(state_sync_provider),
786			std::iter::empty(),
787			ProtocolName::Static(""),
788		);
789
790		// Target block imported.
791		state_strategy.on_blocks_processed(
792			1,
793			1,
794			vec![(
795				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
796				target_hash,
797			)],
798		);
799
800		// Strategy finishes.
801		assert_eq!(state_strategy.actions.len(), 1);
802		assert!(matches!(&state_strategy.actions[0], StateStrategyAction::Finished));
803	}
804
805	#[test]
806	fn failure_to_import_target_block_finishes_strategy() {
807		let target_hash = Hash::random();
808		let mut state_sync_provider = MockStateSync::<Block>::new();
809		state_sync_provider.expect_target_hash().return_const(target_hash);
810
811		let mut state_strategy = StateStrategy::new_with_provider(
812			Box::new(state_sync_provider),
813			std::iter::empty(),
814			ProtocolName::Static(""),
815		);
816
817		// Target block import failed.
818		state_strategy.on_blocks_processed(
819			1,
820			1,
821			vec![(
822				Err(BlockImportError::VerificationFailed(None, String::from("test-error"))),
823				target_hash,
824			)],
825		);
826
827		// Strategy finishes.
828		assert_eq!(state_strategy.actions.len(), 1);
829		assert!(matches!(&state_strategy.actions[0], StateStrategyAction::Finished));
830	}
831
832	#[test]
833	fn finished_strategy_doesnt_generate_more_actions() {
834		let target_hash = Hash::random();
835		let mut state_sync_provider = MockStateSync::<Block>::new();
836		state_sync_provider.expect_target_hash().return_const(target_hash);
837		state_sync_provider.expect_is_complete().return_const(true);
838
839		// Get enough peers for possible spurious requests.
840		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
841
842		let mut state_strategy = StateStrategy::new_with_provider(
843			Box::new(state_sync_provider),
844			initial_peers,
845			ProtocolName::Static(""),
846		);
847
848		state_strategy.on_blocks_processed(
849			1,
850			1,
851			vec![(
852				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
853				target_hash,
854			)],
855		);
856
857		// Strategy finishes.
858		let actions = state_strategy.actions().collect::<Vec<_>>();
859		assert_eq!(actions.len(), 1);
860		assert!(matches!(&actions[0], StateStrategyAction::Finished));
861
862		// No more actions generated.
863		assert_eq!(state_strategy.actions().count(), 0);
864	}
865}