referrerpolicy=no-referrer-when-downgrade

substrate_relay_helper/on_demand/
parachains.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! On-demand Substrate -> Substrate parachain finality relay.
18
19use crate::{
20	messages::source::best_finalized_peer_header_at_self,
21	on_demand::OnDemandRelay,
22	parachains::{
23		source::ParachainsSource, target::ParachainsTarget, ParachainsPipelineAdapter,
24		SubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline,
25	},
26	TransactionParams,
27};
28
29use async_std::{
30	channel::{unbounded, Receiver, Sender},
31	sync::{Arc, Mutex},
32};
33use async_trait::async_trait;
34use bp_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
35use bp_polkadot_core::parachains::{ParaHash, ParaId};
36use bp_runtime::HeaderIdProvider;
37use futures::{select, FutureExt};
38use num_traits::Zero;
39use parachains_relay::parachains_loop::{AvailableHeader, SourceClient, TargetClient};
40use relay_substrate_client::{
41	is_ancient_block, AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client,
42	Error as SubstrateError, HashOf, HeaderIdOf, ParachainBase,
43};
44use relay_utils::{
45	metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient,
46	HeaderId, UniqueSaturatedInto,
47};
48use std::fmt::Debug;
49
50/// On-demand Substrate <-> Substrate parachain finality relay.
51///
52/// This relay may be requested to sync more parachain headers, whenever some other relay
53/// (e.g. messages relay) needs it to continue its regular work. When enough parachain headers
54/// are relayed, on-demand stops syncing headers.
55#[derive(Clone)]
56pub struct OnDemandParachainsRelay<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt> {
57	/// Relay task name.
58	relay_task_name: String,
59	/// Channel used to communicate with background task and ask for relay of parachain heads.
60	required_header_number_sender: Sender<BlockNumberOf<P::SourceParachain>>,
61	/// Source relay chain client.
62	source_relay_client: SourceRelayClnt,
63	/// Target chain client.
64	target_client: TargetClnt,
65	/// On-demand relay chain relay.
66	on_demand_source_relay_to_target_headers:
67		Arc<dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>>,
68}
69
70impl<
71		P: SubstrateParachainsPipeline,
72		SourceRelayClnt: Client<P::SourceRelayChain>,
73		TargetClnt: Client<P::TargetChain>,
74	> OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>
75{
76	/// Create new on-demand parachains relay.
77	///
78	/// Note that the argument is the source relay chain client, not the parachain client.
79	/// That's because parachain finality is determined by the relay chain and we don't
80	/// need to connect to the parachain itself here.
81	pub fn new(
82		source_relay_client: SourceRelayClnt,
83		target_client: TargetClnt,
84		target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
85		on_demand_source_relay_to_target_headers: Arc<
86			dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>,
87		>,
88	) -> Self
89	where
90		P::SourceParachain: Chain<Hash = ParaHash>,
91		P::SourceRelayChain:
92			Chain<BlockNumber = RelayBlockNumber, Hash = RelayBlockHash, Hasher = RelayBlockHasher>,
93		AccountIdOf<P::TargetChain>:
94			From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
95	{
96		let (required_header_number_sender, required_header_number_receiver) = unbounded();
97		let this = OnDemandParachainsRelay {
98			relay_task_name: on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(
99			),
100			required_header_number_sender,
101			source_relay_client: source_relay_client.clone(),
102			target_client: target_client.clone(),
103			on_demand_source_relay_to_target_headers: on_demand_source_relay_to_target_headers
104				.clone(),
105		};
106		async_std::task::spawn(async move {
107			background_task::<P>(
108				source_relay_client,
109				target_client,
110				target_transaction_params,
111				on_demand_source_relay_to_target_headers,
112				required_header_number_receiver,
113			)
114			.await;
115		});
116
117		this
118	}
119}
120
121#[async_trait]
122impl<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>
123	OnDemandRelay<P::SourceParachain, P::TargetChain>
124	for OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>
125where
126	P::SourceParachain: Chain<Hash = ParaHash>,
127	SourceRelayClnt: Client<P::SourceRelayChain>,
128	TargetClnt: Client<P::TargetChain>,
129{
130	async fn reconnect(&self) -> Result<(), SubstrateError> {
131		// using clone is fine here (to avoid mut requirement), because clone on Client clones
132		// internal references
133		self.source_relay_client.clone().reconnect().await?;
134		self.target_client.clone().reconnect().await?;
135		// we'll probably need to reconnect relay chain relayer clients also
136		self.on_demand_source_relay_to_target_headers.reconnect().await
137	}
138
139	async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceParachain>) {
140		if let Err(e) = self.required_header_number_sender.send(required_header).await {
141			log::trace!(
142				target: "bridge",
143				"[{}] Failed to request {} header {:?}: {:?}",
144				self.relay_task_name,
145				P::SourceParachain::NAME,
146				required_header,
147				e,
148			);
149		}
150	}
151
152	/// Ask relay to prove source `required_header` to the `TargetChain`.
153	async fn prove_header(
154		&self,
155		required_parachain_header: BlockNumberOf<P::SourceParachain>,
156	) -> Result<(HeaderIdOf<P::SourceParachain>, Vec<CallOf<P::TargetChain>>), SubstrateError> {
157		// select headers to prove
158		let parachains_source = ParachainsSource::<P, _>::new(
159			self.source_relay_client.clone(),
160			Arc::new(Mutex::new(AvailableHeader::Missing)),
161		);
162		let env = (self, &parachains_source);
163		let (need_to_prove_relay_block, selected_relay_block, selected_parachain_block) =
164			select_headers_to_prove(env, required_parachain_header).await?;
165
166		log::debug!(
167			target: "bridge",
168			"[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?} and {} head {:?}",
169			self.relay_task_name,
170			P::SourceParachain::NAME,
171			required_parachain_header,
172			P::SourceParachain::NAME,
173			selected_parachain_block,
174			P::SourceRelayChain::NAME,
175			if need_to_prove_relay_block {
176				Some(selected_relay_block)
177			} else {
178				None
179			},
180		);
181
182		// now let's prove relay chain block (if needed)
183		let mut calls = Vec::new();
184		let mut proved_relay_block = selected_relay_block;
185		if need_to_prove_relay_block {
186			let (relay_block, relay_prove_call) = self
187				.on_demand_source_relay_to_target_headers
188				.prove_header(selected_relay_block.number())
189				.await?;
190			proved_relay_block = relay_block;
191			calls.extend(relay_prove_call);
192		}
193
194		// despite what we've selected before (in `select_headers_to_prove` call), if headers relay
195		// have chose the different header (e.g. because there's no GRANDPA jusstification for it),
196		// we need to prove parachain head available at this header
197		let para_id = ParaId(P::SourceParachain::PARACHAIN_ID);
198		let mut proved_parachain_block = selected_parachain_block;
199		if proved_relay_block != selected_relay_block {
200			proved_parachain_block = parachains_source
201				.on_chain_para_head_id(proved_relay_block)
202				.await?
203				// this could happen e.g. if parachain has been offboarded?
204				.ok_or_else(|| {
205					SubstrateError::MissingRequiredParachainHead(
206						para_id,
207						proved_relay_block.number().unique_saturated_into(),
208					)
209				})?;
210
211			log::debug!(
212				target: "bridge",
213				"[{}] Selected to prove {} head {:?} and {} head {:?}. Instead proved {} head {:?} and {} head {:?}",
214				self.relay_task_name,
215				P::SourceParachain::NAME,
216				selected_parachain_block,
217				P::SourceRelayChain::NAME,
218				selected_relay_block,
219				P::SourceParachain::NAME,
220				proved_parachain_block,
221				P::SourceRelayChain::NAME,
222				proved_relay_block,
223			);
224		}
225
226		// and finally - prove parachain head
227		let (para_proof, para_hash) =
228			parachains_source.prove_parachain_head(proved_relay_block).await?;
229		calls.push(P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call(
230			proved_relay_block,
231			vec![(para_id, para_hash)],
232			para_proof,
233			false,
234		));
235
236		Ok((proved_parachain_block, calls))
237	}
238}
239
240/// Background task that is responsible for starting parachain headers relay.
241async fn background_task<P: SubstrateParachainsPipeline>(
242	source_relay_client: impl Client<P::SourceRelayChain>,
243	target_client: impl Client<P::TargetChain>,
244	target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
245	on_demand_source_relay_to_target_headers: Arc<
246		dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>,
247	>,
248	required_parachain_header_number_receiver: Receiver<BlockNumberOf<P::SourceParachain>>,
249) where
250	P::SourceParachain: Chain<Hash = ParaHash>,
251	P::SourceRelayChain:
252		Chain<BlockNumber = RelayBlockNumber, Hash = RelayBlockHash, Hasher = RelayBlockHasher>,
253	AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
254{
255	let relay_task_name = on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>();
256	let target_transactions_mortality = target_transaction_params.mortality;
257
258	let mut relay_state = RelayState::Idle;
259	let mut required_parachain_header_number = Zero::zero();
260	let required_para_header_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable));
261
262	let mut restart_relay = true;
263	let parachains_relay_task = futures::future::Fuse::terminated();
264	futures::pin_mut!(parachains_relay_task);
265
266	let mut parachains_source = ParachainsSource::<P, _>::new(
267		source_relay_client.clone(),
268		required_para_header_ref.clone(),
269	);
270	let mut parachains_target = ParachainsTarget::<P, _, _>::new(
271		source_relay_client.clone(),
272		target_client.clone(),
273		target_transaction_params.clone(),
274	);
275
276	loop {
277		select! {
278			new_required_parachain_header_number = required_parachain_header_number_receiver.recv().fuse() => {
279				let new_required_parachain_header_number = match new_required_parachain_header_number {
280					Ok(new_required_parachain_header_number) => new_required_parachain_header_number,
281					Err(e) => {
282						log::error!(
283							target: "bridge",
284							"[{}] Background task has exited with error: {:?}",
285							relay_task_name,
286							e,
287						);
288
289						return;
290					},
291				};
292
293				// keep in mind that we are not updating `required_para_header_ref` here, because
294				// then we'll be submitting all previous headers as well (while required relay headers are
295				// delivered) and we want to avoid that (to reduce cost)
296				if new_required_parachain_header_number > required_parachain_header_number {
297					log::trace!(
298						target: "bridge",
299						"[{}] More {} headers required. Going to sync up to the {}",
300						relay_task_name,
301						P::SourceParachain::NAME,
302						new_required_parachain_header_number,
303					);
304
305					required_parachain_header_number = new_required_parachain_header_number;
306				}
307			},
308			_ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
309			_ = parachains_relay_task => {
310				// this should never happen in practice given the current code
311				restart_relay = true;
312			},
313		}
314
315		// the workflow of the on-demand parachains relay is:
316		//
317		// 1) message relay (or any other dependent relay) sees new message at parachain header
318		// `PH`;
319		//
320		// 2) it sees that the target chain does not know `PH`;
321		//
322		// 3) it asks on-demand parachains relay to relay `PH` to the target chain;
323		//
324		// Phase#1: relaying relay chain header
325		//
326		// 4) on-demand parachains relay waits for GRANDPA-finalized block of the source relay chain
327		//    `RH` that is storing `PH` or its descendant. Let it be `PH'`;
328		// 5) it asks on-demand headers relay to relay `RH` to the target chain;
329		// 6) it waits until `RH` (or its descendant) is relayed to the target chain;
330		//
331		// Phase#2: relaying parachain header
332		//
333		// 7) on-demand parachains relay sets `ParachainsSource::maximal_header_number` to the
334		//    `PH'.number()`.
335		// 8) parachains finality relay sees that the parachain head has been updated and relays
336		//    `PH'` to    the target chain.
337
338		// select headers to relay
339		let relay_data = read_relay_data(
340			&parachains_source,
341			&parachains_target,
342			required_parachain_header_number,
343		)
344		.await;
345		match relay_data {
346			Ok(relay_data) => {
347				let prev_relay_state = relay_state;
348				relay_state = select_headers_to_relay(&relay_data, relay_state);
349				log::trace!(
350					target: "bridge",
351					"[{}] Selected new relay state: {:?} using old state {:?} and data {:?}",
352					relay_task_name,
353					relay_state,
354					prev_relay_state,
355					relay_data,
356				);
357			},
358			Err(failed_client) => {
359				relay_utils::relay_loop::reconnect_failed_client(
360					failed_client,
361					relay_utils::relay_loop::RECONNECT_DELAY,
362					&mut parachains_source,
363					&mut parachains_target,
364				)
365				.await;
366				continue
367			},
368		}
369
370		// we have selected our new 'state' => let's notify our source clients about our new
371		// requirements
372		match relay_state {
373			RelayState::Idle => (),
374			RelayState::RelayingRelayHeader(required_relay_header) => {
375				on_demand_source_relay_to_target_headers
376					.require_more_headers(required_relay_header)
377					.await;
378			},
379			RelayState::RelayingParaHeader(required_para_header) => {
380				*required_para_header_ref.lock().await =
381					AvailableHeader::Available(required_para_header);
382			},
383		}
384
385		// start/restart relay
386		if restart_relay {
387			let stall_timeout = relay_substrate_client::transaction_stall_timeout(
388				target_transactions_mortality,
389				P::TargetChain::AVERAGE_BLOCK_INTERVAL,
390				relay_utils::STALL_TIMEOUT,
391			);
392
393			log::info!(
394				target: "bridge",
395				"[{}] Starting on-demand-parachains relay task\n\t\
396					Tx mortality: {:?} (~{}m)\n\t\
397					Stall timeout: {:?}",
398				relay_task_name,
399				target_transactions_mortality,
400				stall_timeout.as_secs_f64() / 60.0f64,
401				stall_timeout,
402			);
403
404			parachains_relay_task.set(
405				parachains_relay::parachains_loop::run(
406					parachains_source.clone(),
407					parachains_target.clone(),
408					MetricsParams::disabled(),
409					// we do not support free parachain headers relay in on-demand relays
410					false,
411					futures::future::pending(),
412				)
413				.fuse(),
414			);
415
416			restart_relay = false;
417		}
418	}
419}
420
421/// On-demand parachains relay task name.
422fn on_demand_parachains_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
423	format!("{}-to-{}-on-demand-parachain", SourceChain::NAME, TargetChain::NAME)
424}
425
426/// On-demand relay state.
427#[derive(Clone, Copy, Debug, PartialEq)]
428enum RelayState<ParaHash, ParaNumber, RelayNumber> {
429	/// On-demand relay is not doing anything.
430	Idle,
431	/// Relaying given relay header to relay given parachain header later.
432	RelayingRelayHeader(RelayNumber),
433	/// Relaying given parachain header.
434	RelayingParaHeader(HeaderId<ParaHash, ParaNumber>),
435}
436
437/// Data gathered from source and target clients, used by on-demand relay.
438#[derive(Debug)]
439struct RelayData<ParaHash, ParaNumber, RelayNumber> {
440	/// Parachain header number that is required at the target chain.
441	pub required_para_header: ParaNumber,
442	/// Parachain header number, known to the target chain.
443	pub para_header_at_target: Option<ParaNumber>,
444	/// Parachain header id, known to the source (relay) chain.
445	pub para_header_at_source: Option<HeaderId<ParaHash, ParaNumber>>,
446	/// Parachain header, that is available at the source relay chain at `relay_header_at_target`
447	/// block.
448	///
449	/// May be `None` if there's no `relay_header_at_target` yet, or if the
450	/// `relay_header_at_target` is too old and we think its state has been pruned.
451	pub para_header_at_relay_header_at_target: Option<HeaderId<ParaHash, ParaNumber>>,
452	/// Relay header number at the source chain.
453	pub relay_header_at_source: RelayNumber,
454	/// Relay header number at the target chain.
455	pub relay_header_at_target: Option<RelayNumber>,
456}
457
458/// Read required data from source and target clients.
459async fn read_relay_data<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>(
460	source: &ParachainsSource<P, SourceRelayClnt>,
461	target: &ParachainsTarget<P, SourceRelayClnt, TargetClnt>,
462	required_header_number: BlockNumberOf<P::SourceParachain>,
463) -> Result<
464	RelayData<
465		HashOf<P::SourceParachain>,
466		BlockNumberOf<P::SourceParachain>,
467		BlockNumberOf<P::SourceRelayChain>,
468	>,
469	FailedClient,
470>
471where
472	SourceRelayClnt: Client<P::SourceRelayChain>,
473	TargetClnt: Client<P::TargetChain>,
474	ParachainsTarget<P, SourceRelayClnt, TargetClnt>:
475		TargetClient<ParachainsPipelineAdapter<P>> + RelayClient<Error = SubstrateError>,
476{
477	let map_target_err = |e| {
478		log::error!(
479			target: "bridge",
480			"[{}] Failed to read relay data from {} client: {:?}",
481			on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(),
482			P::TargetChain::NAME,
483			e,
484		);
485		FailedClient::Target
486	};
487	let map_source_err = |e| {
488		log::error!(
489			target: "bridge",
490			"[{}] Failed to read relay data from {} client: {:?}",
491			on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(),
492			P::SourceRelayChain::NAME,
493			e,
494		);
495		FailedClient::Source
496	};
497
498	let best_target_block_hash = target.best_block().await.map_err(map_target_err)?.1;
499	let para_header_at_target = best_finalized_peer_header_at_self::<
500		P::TargetChain,
501		P::SourceParachain,
502	>(target.target_client(), best_target_block_hash)
503	.await;
504	// if there are no parachain heads at the target (`NoParachainHeadAtTarget`), we'll need to
505	// submit at least one. Otherwise the pallet will be treated as uninitialized and messages
506	// sync will stall.
507	let para_header_at_target = match para_header_at_target {
508		Ok(Some(para_header_at_target)) => Some(para_header_at_target.0),
509		Ok(None) => None,
510		Err(e) => return Err(map_target_err(e)),
511	};
512
513	let best_finalized_relay_header =
514		source.client().best_finalized_header().await.map_err(map_source_err)?;
515	let best_finalized_relay_block_id = best_finalized_relay_header.id();
516	let para_header_at_source = source
517		.on_chain_para_head_id(best_finalized_relay_block_id)
518		.await
519		.map_err(map_source_err)?;
520
521	let relay_header_at_source = best_finalized_relay_block_id.0;
522	let relay_header_at_target = best_finalized_peer_header_at_self::<
523		P::TargetChain,
524		P::SourceRelayChain,
525	>(target.target_client(), best_target_block_hash)
526	.await
527	.map_err(map_target_err)?;
528
529	// if relay header at target is too old then its state may already be discarded at the source
530	// => just use `None` in this case
531	//
532	// the same is for case when there's no relay header at target at all
533	let available_relay_header_at_target =
534		relay_header_at_target.filter(|relay_header_at_target| {
535			!is_ancient_block(relay_header_at_target.number(), relay_header_at_source)
536		});
537	let para_header_at_relay_header_at_target =
538		if let Some(available_relay_header_at_target) = available_relay_header_at_target {
539			source
540				.on_chain_para_head_id(available_relay_header_at_target)
541				.await
542				.map_err(map_source_err)?
543		} else {
544			None
545		};
546
547	Ok(RelayData {
548		required_para_header: required_header_number,
549		para_header_at_target,
550		para_header_at_source,
551		relay_header_at_source,
552		relay_header_at_target: relay_header_at_target
553			.map(|relay_header_at_target| relay_header_at_target.0),
554		para_header_at_relay_header_at_target,
555	})
556}
557
558/// Select relay and parachain headers that need to be relayed.
559fn select_headers_to_relay<ParaHash, ParaNumber, RelayNumber>(
560	data: &RelayData<ParaHash, ParaNumber, RelayNumber>,
561	state: RelayState<ParaHash, ParaNumber, RelayNumber>,
562) -> RelayState<ParaHash, ParaNumber, RelayNumber>
563where
564	ParaHash: Clone,
565	ParaNumber: Copy + PartialOrd + Zero,
566	RelayNumber: Copy + Debug + Ord,
567{
568	// we can't do anything until **relay chain** bridge GRANDPA pallet is not initialized at the
569	// target chain
570	let relay_header_at_target = match data.relay_header_at_target {
571		Some(relay_header_at_target) => relay_header_at_target,
572		None => return RelayState::Idle,
573	};
574
575	// Process the `RelayingRelayHeader` state.
576	if let &RelayState::RelayingRelayHeader(relay_header_number) = &state {
577		if relay_header_at_target < relay_header_number {
578			// The required relay header hasn't yet been relayed. Ask / wait for it.
579			return state
580		}
581
582		// We may switch to `RelayingParaHeader` if parachain head is available.
583		if let Some(para_header_at_relay_header_at_target) =
584			data.para_header_at_relay_header_at_target.as_ref()
585		{
586			return RelayState::RelayingParaHeader(para_header_at_relay_header_at_target.clone())
587		}
588
589		// else use the regular process - e.g. we may require to deliver new relay header first
590	}
591
592	// Process the `RelayingParaHeader` state.
593	if let RelayState::RelayingParaHeader(para_header_id) = &state {
594		let para_header_at_target_or_zero = data.para_header_at_target.unwrap_or_else(Zero::zero);
595		if para_header_at_target_or_zero < para_header_id.0 {
596			// The required parachain header hasn't yet been relayed. Ask / wait for it.
597			return state
598		}
599	}
600
601	// if we haven't read para head from the source, we can't yet do anything
602	let para_header_at_source = match data.para_header_at_source {
603		Some(ref para_header_at_source) => para_header_at_source.clone(),
604		None => return RelayState::Idle,
605	};
606
607	// if we have parachain head at the source, but no parachain heads at the target, we'll need
608	// to deliver at least one parachain head
609	let (required_para_header, para_header_at_target) = match data.para_header_at_target {
610		Some(para_header_at_target) => (data.required_para_header, para_header_at_target),
611		None => (para_header_at_source.0, Zero::zero()),
612	};
613
614	// if we have already satisfied our "customer", do nothing
615	if required_para_header <= para_header_at_target {
616		return RelayState::Idle
617	}
618
619	// if required header is not available even at the source chain, let's wait
620	if required_para_header > para_header_at_source.0 {
621		return RelayState::Idle
622	}
623
624	// we will always try to sync latest parachain/relay header, even if we've been asked for some
625	// its ancestor
626
627	// we need relay chain header first
628	if relay_header_at_target < data.relay_header_at_source {
629		return RelayState::RelayingRelayHeader(data.relay_header_at_source)
630	}
631
632	// if all relay headers synced, we may start directly with parachain header
633	RelayState::RelayingParaHeader(para_header_at_source)
634}
635
636/// Environment for the `select_headers_to_prove` call.
637#[async_trait]
638trait SelectHeadersToProveEnvironment<RBN, RBH, PBN, PBH> {
639	/// Returns associated parachain id.
640	fn parachain_id(&self) -> ParaId;
641	/// Returns best finalized relay block.
642	async fn best_finalized_relay_block_at_source(
643		&self,
644	) -> Result<HeaderId<RBH, RBN>, SubstrateError>;
645	/// Returns best finalized relay block that is known at `P::TargetChain`.
646	async fn best_finalized_relay_block_at_target(
647		&self,
648	) -> Result<HeaderId<RBH, RBN>, SubstrateError>;
649	/// Returns best finalized parachain block at given source relay chain block.
650	async fn best_finalized_para_block_at_source(
651		&self,
652		at_relay_block: HeaderId<RBH, RBN>,
653	) -> Result<Option<HeaderId<PBH, PBN>>, SubstrateError>;
654}
655
656#[async_trait]
657impl<'a, P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>
658	SelectHeadersToProveEnvironment<
659		BlockNumberOf<P::SourceRelayChain>,
660		HashOf<P::SourceRelayChain>,
661		BlockNumberOf<P::SourceParachain>,
662		HashOf<P::SourceParachain>,
663	>
664	for (
665		&'a OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>,
666		&'a ParachainsSource<P, SourceRelayClnt>,
667	)
668where
669	SourceRelayClnt: Client<P::SourceRelayChain>,
670	TargetClnt: Client<P::TargetChain>,
671{
672	fn parachain_id(&self) -> ParaId {
673		ParaId(P::SourceParachain::PARACHAIN_ID)
674	}
675
676	async fn best_finalized_relay_block_at_source(
677		&self,
678	) -> Result<HeaderIdOf<P::SourceRelayChain>, SubstrateError> {
679		Ok(self.0.source_relay_client.best_finalized_header().await?.id())
680	}
681
682	async fn best_finalized_relay_block_at_target(
683		&self,
684	) -> Result<HeaderIdOf<P::SourceRelayChain>, SubstrateError> {
685		Ok(crate::messages::source::read_client_state::<P::TargetChain, P::SourceRelayChain>(
686			&self.0.target_client,
687		)
688		.await?
689		.best_finalized_peer_at_best_self
690		.ok_or(SubstrateError::BridgePalletIsNotInitialized)?)
691	}
692
693	async fn best_finalized_para_block_at_source(
694		&self,
695		at_relay_block: HeaderIdOf<P::SourceRelayChain>,
696	) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
697		self.1.on_chain_para_head_id(at_relay_block).await
698	}
699}
700
701/// Given request to prove `required_parachain_header`, select actual headers that need to be
702/// proved.
703async fn select_headers_to_prove<RBN, RBH, PBN, PBH>(
704	env: impl SelectHeadersToProveEnvironment<RBN, RBH, PBN, PBH>,
705	required_parachain_header: PBN,
706) -> Result<(bool, HeaderId<RBH, RBN>, HeaderId<PBH, PBN>), SubstrateError>
707where
708	RBH: Copy,
709	RBN: BlockNumberBase,
710	PBH: Copy,
711	PBN: BlockNumberBase,
712{
713	// parachains proof also requires relay header proof. Let's first select relay block
714	// number that we'll be dealing with
715	let best_finalized_relay_block_at_source = env.best_finalized_relay_block_at_source().await?;
716	let best_finalized_relay_block_at_target = env.best_finalized_relay_block_at_target().await?;
717
718	// if we can't prove `required_header` even using `best_finalized_relay_block_at_source`, we
719	// can't do anything here
720	// (this shall not actually happen, given current code, because we only require finalized
721	// headers)
722	let best_possible_parachain_block = env
723		.best_finalized_para_block_at_source(best_finalized_relay_block_at_source)
724		.await?
725		.filter(|best_possible_parachain_block| {
726			best_possible_parachain_block.number() >= required_parachain_header
727		})
728		.ok_or(SubstrateError::MissingRequiredParachainHead(
729			env.parachain_id(),
730			required_parachain_header.unique_saturated_into(),
731		))?;
732
733	// we don't require source node to be archive, so we can't craft storage proofs using
734	// ancient headers. So if the `best_finalized_relay_block_at_target` is too ancient, we
735	// can't craft storage proofs using it
736	let may_use_state_at_best_finalized_relay_block_at_target = !is_ancient_block(
737		best_finalized_relay_block_at_target.number(),
738		best_finalized_relay_block_at_source.number(),
739	);
740
741	// now let's check if `required_header` may be proved using
742	// `best_finalized_relay_block_at_target`
743	let selection = if may_use_state_at_best_finalized_relay_block_at_target {
744		env.best_finalized_para_block_at_source(best_finalized_relay_block_at_target)
745			.await?
746			.filter(|best_finalized_para_block_at_target| {
747				best_finalized_para_block_at_target.number() >= required_parachain_header
748			})
749			.map(|best_finalized_para_block_at_target| {
750				(false, best_finalized_relay_block_at_target, best_finalized_para_block_at_target)
751			})
752	} else {
753		None
754	};
755
756	Ok(selection.unwrap_or((
757		true,
758		best_finalized_relay_block_at_source,
759		best_possible_parachain_block,
760	)))
761}
762
763#[cfg(test)]
764mod tests {
765	use super::*;
766
767	#[test]
768	fn relay_waits_for_relay_header_to_be_delivered() {
769		assert_eq!(
770			select_headers_to_relay(
771				&RelayData {
772					required_para_header: 90,
773					para_header_at_target: Some(50),
774					para_header_at_source: Some(HeaderId(110, 110)),
775					relay_header_at_source: 800,
776					relay_header_at_target: Some(700),
777					para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
778				},
779				RelayState::RelayingRelayHeader(750),
780			),
781			RelayState::RelayingRelayHeader(750),
782		);
783	}
784
785	#[test]
786	fn relay_starts_relaying_requested_para_header_after_relay_header_is_delivered() {
787		assert_eq!(
788			select_headers_to_relay(
789				&RelayData {
790					required_para_header: 90,
791					para_header_at_target: Some(50),
792					para_header_at_source: Some(HeaderId(110, 110)),
793					relay_header_at_source: 800,
794					relay_header_at_target: Some(750),
795					para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
796				},
797				RelayState::RelayingRelayHeader(750),
798			),
799			RelayState::RelayingParaHeader(HeaderId(100, 100)),
800		);
801	}
802
803	#[test]
804	fn relay_selects_better_para_header_after_better_relay_header_is_delivered() {
805		assert_eq!(
806			select_headers_to_relay(
807				&RelayData {
808					required_para_header: 90,
809					para_header_at_target: Some(50),
810					para_header_at_source: Some(HeaderId(110, 110)),
811					relay_header_at_source: 800,
812					relay_header_at_target: Some(780),
813					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
814				},
815				RelayState::RelayingRelayHeader(750),
816			),
817			RelayState::RelayingParaHeader(HeaderId(105, 105)),
818		);
819	}
820	#[test]
821	fn relay_waits_for_para_header_to_be_delivered() {
822		assert_eq!(
823			select_headers_to_relay(
824				&RelayData {
825					required_para_header: 90,
826					para_header_at_target: Some(50),
827					para_header_at_source: Some(HeaderId(110, 110)),
828					relay_header_at_source: 800,
829					relay_header_at_target: Some(780),
830					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
831				},
832				RelayState::RelayingParaHeader(HeaderId(105, 105)),
833			),
834			RelayState::RelayingParaHeader(HeaderId(105, 105)),
835		);
836	}
837
838	#[test]
839	fn relay_stays_idle_if_required_para_header_is_already_delivered() {
840		assert_eq!(
841			select_headers_to_relay(
842				&RelayData {
843					required_para_header: 90,
844					para_header_at_target: Some(105),
845					para_header_at_source: Some(HeaderId(110, 110)),
846					relay_header_at_source: 800,
847					relay_header_at_target: Some(780),
848					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
849				},
850				RelayState::Idle,
851			),
852			RelayState::Idle,
853		);
854	}
855
856	#[test]
857	fn relay_waits_for_required_para_header_to_appear_at_source_1() {
858		assert_eq!(
859			select_headers_to_relay(
860				&RelayData {
861					required_para_header: 120,
862					para_header_at_target: Some(105),
863					para_header_at_source: None,
864					relay_header_at_source: 800,
865					relay_header_at_target: Some(780),
866					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
867				},
868				RelayState::Idle,
869			),
870			RelayState::Idle,
871		);
872	}
873
874	#[test]
875	fn relay_waits_for_required_para_header_to_appear_at_source_2() {
876		assert_eq!(
877			select_headers_to_relay(
878				&RelayData {
879					required_para_header: 120,
880					para_header_at_target: Some(105),
881					para_header_at_source: Some(HeaderId(110, 110)),
882					relay_header_at_source: 800,
883					relay_header_at_target: Some(780),
884					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
885				},
886				RelayState::Idle,
887			),
888			RelayState::Idle,
889		);
890	}
891
892	#[test]
893	fn relay_starts_relaying_relay_header_when_new_para_header_is_requested() {
894		assert_eq!(
895			select_headers_to_relay(
896				&RelayData {
897					required_para_header: 120,
898					para_header_at_target: Some(105),
899					para_header_at_source: Some(HeaderId(125, 125)),
900					relay_header_at_source: 800,
901					relay_header_at_target: Some(780),
902					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
903				},
904				RelayState::Idle,
905			),
906			RelayState::RelayingRelayHeader(800),
907		);
908	}
909
910	#[test]
911	fn relay_starts_relaying_para_header_when_new_para_header_is_requested() {
912		assert_eq!(
913			select_headers_to_relay(
914				&RelayData {
915					required_para_header: 120,
916					para_header_at_target: Some(105),
917					para_header_at_source: Some(HeaderId(125, 125)),
918					relay_header_at_source: 800,
919					relay_header_at_target: Some(800),
920					para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
921				},
922				RelayState::Idle,
923			),
924			RelayState::RelayingParaHeader(HeaderId(125, 125)),
925		);
926	}
927
928	#[test]
929	fn relay_goes_idle_when_parachain_is_deregistered() {
930		assert_eq!(
931			select_headers_to_relay::<i32, _, _>(
932				&RelayData {
933					required_para_header: 120,
934					para_header_at_target: Some(105),
935					para_header_at_source: None,
936					relay_header_at_source: 800,
937					relay_header_at_target: Some(800),
938					para_header_at_relay_header_at_target: None,
939				},
940				RelayState::RelayingRelayHeader(800),
941			),
942			RelayState::Idle,
943		);
944	}
945
946	#[test]
947	fn relay_starts_relaying_first_parachain_header() {
948		assert_eq!(
949			select_headers_to_relay::<i32, _, _>(
950				&RelayData {
951					required_para_header: 0,
952					para_header_at_target: None,
953					para_header_at_source: Some(HeaderId(125, 125)),
954					relay_header_at_source: 800,
955					relay_header_at_target: Some(800),
956					para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
957				},
958				RelayState::Idle,
959			),
960			RelayState::RelayingParaHeader(HeaderId(125, 125)),
961		);
962	}
963
964	#[test]
965	fn relay_starts_relaying_relay_header_to_relay_first_parachain_header() {
966		assert_eq!(
967			select_headers_to_relay::<i32, _, _>(
968				&RelayData {
969					required_para_header: 0,
970					para_header_at_target: None,
971					para_header_at_source: Some(HeaderId(125, 125)),
972					relay_header_at_source: 800,
973					relay_header_at_target: Some(700),
974					para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
975				},
976				RelayState::Idle,
977			),
978			RelayState::RelayingRelayHeader(800),
979		);
980	}
981
982	// tuple is:
983	//
984	// - best_finalized_relay_block_at_source
985	// - best_finalized_relay_block_at_target
986	// - best_finalized_para_block_at_source at best_finalized_relay_block_at_source
987	// - best_finalized_para_block_at_source at best_finalized_relay_block_at_target
988	#[async_trait]
989	impl SelectHeadersToProveEnvironment<u32, u32, u32, u32> for (u32, u32, u32, u32) {
990		fn parachain_id(&self) -> ParaId {
991			ParaId(0)
992		}
993
994		async fn best_finalized_relay_block_at_source(
995			&self,
996		) -> Result<HeaderId<u32, u32>, SubstrateError> {
997			Ok(HeaderId(self.0, self.0))
998		}
999
1000		async fn best_finalized_relay_block_at_target(
1001			&self,
1002		) -> Result<HeaderId<u32, u32>, SubstrateError> {
1003			Ok(HeaderId(self.1, self.1))
1004		}
1005
1006		async fn best_finalized_para_block_at_source(
1007			&self,
1008			at_relay_block: HeaderId<u32, u32>,
1009		) -> Result<Option<HeaderId<u32, u32>>, SubstrateError> {
1010			if at_relay_block.0 == self.0 {
1011				Ok(Some(HeaderId(self.2, self.2)))
1012			} else if at_relay_block.0 == self.1 {
1013				Ok(Some(HeaderId(self.3, self.3)))
1014			} else {
1015				Ok(None)
1016			}
1017		}
1018	}
1019
1020	#[async_std::test]
1021	async fn select_headers_to_prove_returns_err_if_required_para_block_is_missing_at_source() {
1022		assert!(matches!(
1023			select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 300_u32,).await,
1024			Err(SubstrateError::MissingRequiredParachainHead(ParaId(0), 300_u64)),
1025		));
1026	}
1027
1028	#[async_std::test]
1029	async fn select_headers_to_prove_fails_to_use_existing_ancient_relay_block() {
1030		assert_eq!(
1031			select_headers_to_prove((220_u32, 10_u32, 200_u32, 100_u32), 100_u32,)
1032				.await
1033				.map_err(drop),
1034			Ok((true, HeaderId(220, 220), HeaderId(200, 200))),
1035		);
1036	}
1037
1038	#[async_std::test]
1039	async fn select_headers_to_prove_is_able_to_use_existing_recent_relay_block() {
1040		assert_eq!(
1041			select_headers_to_prove((40_u32, 10_u32, 200_u32, 100_u32), 100_u32,)
1042				.await
1043				.map_err(drop),
1044			Ok((false, HeaderId(10, 10), HeaderId(100, 100))),
1045		);
1046	}
1047
1048	#[async_std::test]
1049	async fn select_headers_to_prove_uses_new_relay_block() {
1050		assert_eq!(
1051			select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 200_u32,)
1052				.await
1053				.map_err(drop),
1054			Ok((true, HeaderId(20, 20), HeaderId(200, 200))),
1055		);
1056	}
1057}