1use crate::{
20 messages::source::best_finalized_peer_header_at_self,
21 on_demand::OnDemandRelay,
22 parachains::{
23 source::ParachainsSource, target::ParachainsTarget, ParachainsPipelineAdapter,
24 SubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline,
25 },
26 TransactionParams,
27};
28
29use async_std::{
30 channel::{unbounded, Receiver, Sender},
31 sync::{Arc, Mutex},
32};
33use async_trait::async_trait;
34use bp_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
35use bp_polkadot_core::parachains::{ParaHash, ParaId};
36use bp_runtime::HeaderIdProvider;
37use futures::{select, FutureExt};
38use num_traits::Zero;
39use parachains_relay::parachains_loop::{AvailableHeader, SourceClient, TargetClient};
40use relay_substrate_client::{
41 is_ancient_block, AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client,
42 Error as SubstrateError, HashOf, HeaderIdOf, ParachainBase,
43};
44use relay_utils::{
45 metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient,
46 HeaderId, UniqueSaturatedInto,
47};
48use std::fmt::Debug;
49
50#[derive(Clone)]
56pub struct OnDemandParachainsRelay<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt> {
57 relay_task_name: String,
59 required_header_number_sender: Sender<BlockNumberOf<P::SourceParachain>>,
61 source_relay_client: SourceRelayClnt,
63 target_client: TargetClnt,
65 on_demand_source_relay_to_target_headers:
67 Arc<dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>>,
68}
69
70impl<
71 P: SubstrateParachainsPipeline,
72 SourceRelayClnt: Client<P::SourceRelayChain>,
73 TargetClnt: Client<P::TargetChain>,
74 > OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>
75{
76 pub fn new(
82 source_relay_client: SourceRelayClnt,
83 target_client: TargetClnt,
84 target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
85 on_demand_source_relay_to_target_headers: Arc<
86 dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>,
87 >,
88 ) -> Self
89 where
90 P::SourceParachain: Chain<Hash = ParaHash>,
91 P::SourceRelayChain:
92 Chain<BlockNumber = RelayBlockNumber, Hash = RelayBlockHash, Hasher = RelayBlockHasher>,
93 AccountIdOf<P::TargetChain>:
94 From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
95 {
96 let (required_header_number_sender, required_header_number_receiver) = unbounded();
97 let this = OnDemandParachainsRelay {
98 relay_task_name: on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(
99 ),
100 required_header_number_sender,
101 source_relay_client: source_relay_client.clone(),
102 target_client: target_client.clone(),
103 on_demand_source_relay_to_target_headers: on_demand_source_relay_to_target_headers
104 .clone(),
105 };
106 async_std::task::spawn(async move {
107 background_task::<P>(
108 source_relay_client,
109 target_client,
110 target_transaction_params,
111 on_demand_source_relay_to_target_headers,
112 required_header_number_receiver,
113 )
114 .await;
115 });
116
117 this
118 }
119}
120
121#[async_trait]
122impl<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>
123 OnDemandRelay<P::SourceParachain, P::TargetChain>
124 for OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>
125where
126 P::SourceParachain: Chain<Hash = ParaHash>,
127 SourceRelayClnt: Client<P::SourceRelayChain>,
128 TargetClnt: Client<P::TargetChain>,
129{
130 async fn reconnect(&self) -> Result<(), SubstrateError> {
131 self.source_relay_client.clone().reconnect().await?;
134 self.target_client.clone().reconnect().await?;
135 self.on_demand_source_relay_to_target_headers.reconnect().await
137 }
138
139 async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceParachain>) {
140 if let Err(e) = self.required_header_number_sender.send(required_header).await {
141 log::trace!(
142 target: "bridge",
143 "[{}] Failed to request {} header {:?}: {:?}",
144 self.relay_task_name,
145 P::SourceParachain::NAME,
146 required_header,
147 e,
148 );
149 }
150 }
151
152 async fn prove_header(
154 &self,
155 required_parachain_header: BlockNumberOf<P::SourceParachain>,
156 ) -> Result<(HeaderIdOf<P::SourceParachain>, Vec<CallOf<P::TargetChain>>), SubstrateError> {
157 let parachains_source = ParachainsSource::<P, _>::new(
159 self.source_relay_client.clone(),
160 Arc::new(Mutex::new(AvailableHeader::Missing)),
161 );
162 let env = (self, ¶chains_source);
163 let (need_to_prove_relay_block, selected_relay_block, selected_parachain_block) =
164 select_headers_to_prove(env, required_parachain_header).await?;
165
166 log::debug!(
167 target: "bridge",
168 "[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?} and {} head {:?}",
169 self.relay_task_name,
170 P::SourceParachain::NAME,
171 required_parachain_header,
172 P::SourceParachain::NAME,
173 selected_parachain_block,
174 P::SourceRelayChain::NAME,
175 if need_to_prove_relay_block {
176 Some(selected_relay_block)
177 } else {
178 None
179 },
180 );
181
182 let mut calls = Vec::new();
184 let mut proved_relay_block = selected_relay_block;
185 if need_to_prove_relay_block {
186 let (relay_block, relay_prove_call) = self
187 .on_demand_source_relay_to_target_headers
188 .prove_header(selected_relay_block.number())
189 .await?;
190 proved_relay_block = relay_block;
191 calls.extend(relay_prove_call);
192 }
193
194 let para_id = ParaId(P::SourceParachain::PARACHAIN_ID);
198 let mut proved_parachain_block = selected_parachain_block;
199 if proved_relay_block != selected_relay_block {
200 proved_parachain_block = parachains_source
201 .on_chain_para_head_id(proved_relay_block)
202 .await?
203 .ok_or_else(|| {
205 SubstrateError::MissingRequiredParachainHead(
206 para_id,
207 proved_relay_block.number().unique_saturated_into(),
208 )
209 })?;
210
211 log::debug!(
212 target: "bridge",
213 "[{}] Selected to prove {} head {:?} and {} head {:?}. Instead proved {} head {:?} and {} head {:?}",
214 self.relay_task_name,
215 P::SourceParachain::NAME,
216 selected_parachain_block,
217 P::SourceRelayChain::NAME,
218 selected_relay_block,
219 P::SourceParachain::NAME,
220 proved_parachain_block,
221 P::SourceRelayChain::NAME,
222 proved_relay_block,
223 );
224 }
225
226 let (para_proof, para_hash) =
228 parachains_source.prove_parachain_head(proved_relay_block).await?;
229 calls.push(P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call(
230 proved_relay_block,
231 vec![(para_id, para_hash)],
232 para_proof,
233 false,
234 ));
235
236 Ok((proved_parachain_block, calls))
237 }
238}
239
240async fn background_task<P: SubstrateParachainsPipeline>(
242 source_relay_client: impl Client<P::SourceRelayChain>,
243 target_client: impl Client<P::TargetChain>,
244 target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
245 on_demand_source_relay_to_target_headers: Arc<
246 dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>,
247 >,
248 required_parachain_header_number_receiver: Receiver<BlockNumberOf<P::SourceParachain>>,
249) where
250 P::SourceParachain: Chain<Hash = ParaHash>,
251 P::SourceRelayChain:
252 Chain<BlockNumber = RelayBlockNumber, Hash = RelayBlockHash, Hasher = RelayBlockHasher>,
253 AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
254{
255 let relay_task_name = on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>();
256 let target_transactions_mortality = target_transaction_params.mortality;
257
258 let mut relay_state = RelayState::Idle;
259 let mut required_parachain_header_number = Zero::zero();
260 let required_para_header_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable));
261
262 let mut restart_relay = true;
263 let parachains_relay_task = futures::future::Fuse::terminated();
264 futures::pin_mut!(parachains_relay_task);
265
266 let mut parachains_source = ParachainsSource::<P, _>::new(
267 source_relay_client.clone(),
268 required_para_header_ref.clone(),
269 );
270 let mut parachains_target = ParachainsTarget::<P, _, _>::new(
271 source_relay_client.clone(),
272 target_client.clone(),
273 target_transaction_params.clone(),
274 );
275
276 loop {
277 select! {
278 new_required_parachain_header_number = required_parachain_header_number_receiver.recv().fuse() => {
279 let new_required_parachain_header_number = match new_required_parachain_header_number {
280 Ok(new_required_parachain_header_number) => new_required_parachain_header_number,
281 Err(e) => {
282 log::error!(
283 target: "bridge",
284 "[{}] Background task has exited with error: {:?}",
285 relay_task_name,
286 e,
287 );
288
289 return;
290 },
291 };
292
293 if new_required_parachain_header_number > required_parachain_header_number {
297 log::trace!(
298 target: "bridge",
299 "[{}] More {} headers required. Going to sync up to the {}",
300 relay_task_name,
301 P::SourceParachain::NAME,
302 new_required_parachain_header_number,
303 );
304
305 required_parachain_header_number = new_required_parachain_header_number;
306 }
307 },
308 _ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
309 _ = parachains_relay_task => {
310 restart_relay = true;
312 },
313 }
314
315 let relay_data = read_relay_data(
340 ¶chains_source,
341 ¶chains_target,
342 required_parachain_header_number,
343 )
344 .await;
345 match relay_data {
346 Ok(relay_data) => {
347 let prev_relay_state = relay_state;
348 relay_state = select_headers_to_relay(&relay_data, relay_state);
349 log::trace!(
350 target: "bridge",
351 "[{}] Selected new relay state: {:?} using old state {:?} and data {:?}",
352 relay_task_name,
353 relay_state,
354 prev_relay_state,
355 relay_data,
356 );
357 },
358 Err(failed_client) => {
359 relay_utils::relay_loop::reconnect_failed_client(
360 failed_client,
361 relay_utils::relay_loop::RECONNECT_DELAY,
362 &mut parachains_source,
363 &mut parachains_target,
364 )
365 .await;
366 continue
367 },
368 }
369
370 match relay_state {
373 RelayState::Idle => (),
374 RelayState::RelayingRelayHeader(required_relay_header) => {
375 on_demand_source_relay_to_target_headers
376 .require_more_headers(required_relay_header)
377 .await;
378 },
379 RelayState::RelayingParaHeader(required_para_header) => {
380 *required_para_header_ref.lock().await =
381 AvailableHeader::Available(required_para_header);
382 },
383 }
384
385 if restart_relay {
387 let stall_timeout = relay_substrate_client::transaction_stall_timeout(
388 target_transactions_mortality,
389 P::TargetChain::AVERAGE_BLOCK_INTERVAL,
390 relay_utils::STALL_TIMEOUT,
391 );
392
393 log::info!(
394 target: "bridge",
395 "[{}] Starting on-demand-parachains relay task\n\t\
396 Tx mortality: {:?} (~{}m)\n\t\
397 Stall timeout: {:?}",
398 relay_task_name,
399 target_transactions_mortality,
400 stall_timeout.as_secs_f64() / 60.0f64,
401 stall_timeout,
402 );
403
404 parachains_relay_task.set(
405 parachains_relay::parachains_loop::run(
406 parachains_source.clone(),
407 parachains_target.clone(),
408 MetricsParams::disabled(),
409 false,
411 futures::future::pending(),
412 )
413 .fuse(),
414 );
415
416 restart_relay = false;
417 }
418 }
419}
420
421fn on_demand_parachains_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
423 format!("{}-to-{}-on-demand-parachain", SourceChain::NAME, TargetChain::NAME)
424}
425
426#[derive(Clone, Copy, Debug, PartialEq)]
428enum RelayState<ParaHash, ParaNumber, RelayNumber> {
429 Idle,
431 RelayingRelayHeader(RelayNumber),
433 RelayingParaHeader(HeaderId<ParaHash, ParaNumber>),
435}
436
437#[derive(Debug)]
439struct RelayData<ParaHash, ParaNumber, RelayNumber> {
440 pub required_para_header: ParaNumber,
442 pub para_header_at_target: Option<ParaNumber>,
444 pub para_header_at_source: Option<HeaderId<ParaHash, ParaNumber>>,
446 pub para_header_at_relay_header_at_target: Option<HeaderId<ParaHash, ParaNumber>>,
452 pub relay_header_at_source: RelayNumber,
454 pub relay_header_at_target: Option<RelayNumber>,
456}
457
458async fn read_relay_data<P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>(
460 source: &ParachainsSource<P, SourceRelayClnt>,
461 target: &ParachainsTarget<P, SourceRelayClnt, TargetClnt>,
462 required_header_number: BlockNumberOf<P::SourceParachain>,
463) -> Result<
464 RelayData<
465 HashOf<P::SourceParachain>,
466 BlockNumberOf<P::SourceParachain>,
467 BlockNumberOf<P::SourceRelayChain>,
468 >,
469 FailedClient,
470>
471where
472 SourceRelayClnt: Client<P::SourceRelayChain>,
473 TargetClnt: Client<P::TargetChain>,
474 ParachainsTarget<P, SourceRelayClnt, TargetClnt>:
475 TargetClient<ParachainsPipelineAdapter<P>> + RelayClient<Error = SubstrateError>,
476{
477 let map_target_err = |e| {
478 log::error!(
479 target: "bridge",
480 "[{}] Failed to read relay data from {} client: {:?}",
481 on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(),
482 P::TargetChain::NAME,
483 e,
484 );
485 FailedClient::Target
486 };
487 let map_source_err = |e| {
488 log::error!(
489 target: "bridge",
490 "[{}] Failed to read relay data from {} client: {:?}",
491 on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(),
492 P::SourceRelayChain::NAME,
493 e,
494 );
495 FailedClient::Source
496 };
497
498 let best_target_block_hash = target.best_block().await.map_err(map_target_err)?.1;
499 let para_header_at_target = best_finalized_peer_header_at_self::<
500 P::TargetChain,
501 P::SourceParachain,
502 >(target.target_client(), best_target_block_hash)
503 .await;
504 let para_header_at_target = match para_header_at_target {
508 Ok(Some(para_header_at_target)) => Some(para_header_at_target.0),
509 Ok(None) => None,
510 Err(e) => return Err(map_target_err(e)),
511 };
512
513 let best_finalized_relay_header =
514 source.client().best_finalized_header().await.map_err(map_source_err)?;
515 let best_finalized_relay_block_id = best_finalized_relay_header.id();
516 let para_header_at_source = source
517 .on_chain_para_head_id(best_finalized_relay_block_id)
518 .await
519 .map_err(map_source_err)?;
520
521 let relay_header_at_source = best_finalized_relay_block_id.0;
522 let relay_header_at_target = best_finalized_peer_header_at_self::<
523 P::TargetChain,
524 P::SourceRelayChain,
525 >(target.target_client(), best_target_block_hash)
526 .await
527 .map_err(map_target_err)?;
528
529 let available_relay_header_at_target =
534 relay_header_at_target.filter(|relay_header_at_target| {
535 !is_ancient_block(relay_header_at_target.number(), relay_header_at_source)
536 });
537 let para_header_at_relay_header_at_target =
538 if let Some(available_relay_header_at_target) = available_relay_header_at_target {
539 source
540 .on_chain_para_head_id(available_relay_header_at_target)
541 .await
542 .map_err(map_source_err)?
543 } else {
544 None
545 };
546
547 Ok(RelayData {
548 required_para_header: required_header_number,
549 para_header_at_target,
550 para_header_at_source,
551 relay_header_at_source,
552 relay_header_at_target: relay_header_at_target
553 .map(|relay_header_at_target| relay_header_at_target.0),
554 para_header_at_relay_header_at_target,
555 })
556}
557
558fn select_headers_to_relay<ParaHash, ParaNumber, RelayNumber>(
560 data: &RelayData<ParaHash, ParaNumber, RelayNumber>,
561 state: RelayState<ParaHash, ParaNumber, RelayNumber>,
562) -> RelayState<ParaHash, ParaNumber, RelayNumber>
563where
564 ParaHash: Clone,
565 ParaNumber: Copy + PartialOrd + Zero,
566 RelayNumber: Copy + Debug + Ord,
567{
568 let relay_header_at_target = match data.relay_header_at_target {
571 Some(relay_header_at_target) => relay_header_at_target,
572 None => return RelayState::Idle,
573 };
574
575 if let &RelayState::RelayingRelayHeader(relay_header_number) = &state {
577 if relay_header_at_target < relay_header_number {
578 return state
580 }
581
582 if let Some(para_header_at_relay_header_at_target) =
584 data.para_header_at_relay_header_at_target.as_ref()
585 {
586 return RelayState::RelayingParaHeader(para_header_at_relay_header_at_target.clone())
587 }
588
589 }
591
592 if let RelayState::RelayingParaHeader(para_header_id) = &state {
594 let para_header_at_target_or_zero = data.para_header_at_target.unwrap_or_else(Zero::zero);
595 if para_header_at_target_or_zero < para_header_id.0 {
596 return state
598 }
599 }
600
601 let para_header_at_source = match data.para_header_at_source {
603 Some(ref para_header_at_source) => para_header_at_source.clone(),
604 None => return RelayState::Idle,
605 };
606
607 let (required_para_header, para_header_at_target) = match data.para_header_at_target {
610 Some(para_header_at_target) => (data.required_para_header, para_header_at_target),
611 None => (para_header_at_source.0, Zero::zero()),
612 };
613
614 if required_para_header <= para_header_at_target {
616 return RelayState::Idle
617 }
618
619 if required_para_header > para_header_at_source.0 {
621 return RelayState::Idle
622 }
623
624 if relay_header_at_target < data.relay_header_at_source {
629 return RelayState::RelayingRelayHeader(data.relay_header_at_source)
630 }
631
632 RelayState::RelayingParaHeader(para_header_at_source)
634}
635
636#[async_trait]
638trait SelectHeadersToProveEnvironment<RBN, RBH, PBN, PBH> {
639 fn parachain_id(&self) -> ParaId;
641 async fn best_finalized_relay_block_at_source(
643 &self,
644 ) -> Result<HeaderId<RBH, RBN>, SubstrateError>;
645 async fn best_finalized_relay_block_at_target(
647 &self,
648 ) -> Result<HeaderId<RBH, RBN>, SubstrateError>;
649 async fn best_finalized_para_block_at_source(
651 &self,
652 at_relay_block: HeaderId<RBH, RBN>,
653 ) -> Result<Option<HeaderId<PBH, PBN>>, SubstrateError>;
654}
655
656#[async_trait]
657impl<'a, P: SubstrateParachainsPipeline, SourceRelayClnt, TargetClnt>
658 SelectHeadersToProveEnvironment<
659 BlockNumberOf<P::SourceRelayChain>,
660 HashOf<P::SourceRelayChain>,
661 BlockNumberOf<P::SourceParachain>,
662 HashOf<P::SourceParachain>,
663 >
664 for (
665 &'a OnDemandParachainsRelay<P, SourceRelayClnt, TargetClnt>,
666 &'a ParachainsSource<P, SourceRelayClnt>,
667 )
668where
669 SourceRelayClnt: Client<P::SourceRelayChain>,
670 TargetClnt: Client<P::TargetChain>,
671{
672 fn parachain_id(&self) -> ParaId {
673 ParaId(P::SourceParachain::PARACHAIN_ID)
674 }
675
676 async fn best_finalized_relay_block_at_source(
677 &self,
678 ) -> Result<HeaderIdOf<P::SourceRelayChain>, SubstrateError> {
679 Ok(self.0.source_relay_client.best_finalized_header().await?.id())
680 }
681
682 async fn best_finalized_relay_block_at_target(
683 &self,
684 ) -> Result<HeaderIdOf<P::SourceRelayChain>, SubstrateError> {
685 Ok(crate::messages::source::read_client_state::<P::TargetChain, P::SourceRelayChain>(
686 &self.0.target_client,
687 )
688 .await?
689 .best_finalized_peer_at_best_self
690 .ok_or(SubstrateError::BridgePalletIsNotInitialized)?)
691 }
692
693 async fn best_finalized_para_block_at_source(
694 &self,
695 at_relay_block: HeaderIdOf<P::SourceRelayChain>,
696 ) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
697 self.1.on_chain_para_head_id(at_relay_block).await
698 }
699}
700
701async fn select_headers_to_prove<RBN, RBH, PBN, PBH>(
704 env: impl SelectHeadersToProveEnvironment<RBN, RBH, PBN, PBH>,
705 required_parachain_header: PBN,
706) -> Result<(bool, HeaderId<RBH, RBN>, HeaderId<PBH, PBN>), SubstrateError>
707where
708 RBH: Copy,
709 RBN: BlockNumberBase,
710 PBH: Copy,
711 PBN: BlockNumberBase,
712{
713 let best_finalized_relay_block_at_source = env.best_finalized_relay_block_at_source().await?;
716 let best_finalized_relay_block_at_target = env.best_finalized_relay_block_at_target().await?;
717
718 let best_possible_parachain_block = env
723 .best_finalized_para_block_at_source(best_finalized_relay_block_at_source)
724 .await?
725 .filter(|best_possible_parachain_block| {
726 best_possible_parachain_block.number() >= required_parachain_header
727 })
728 .ok_or(SubstrateError::MissingRequiredParachainHead(
729 env.parachain_id(),
730 required_parachain_header.unique_saturated_into(),
731 ))?;
732
733 let may_use_state_at_best_finalized_relay_block_at_target = !is_ancient_block(
737 best_finalized_relay_block_at_target.number(),
738 best_finalized_relay_block_at_source.number(),
739 );
740
741 let selection = if may_use_state_at_best_finalized_relay_block_at_target {
744 env.best_finalized_para_block_at_source(best_finalized_relay_block_at_target)
745 .await?
746 .filter(|best_finalized_para_block_at_target| {
747 best_finalized_para_block_at_target.number() >= required_parachain_header
748 })
749 .map(|best_finalized_para_block_at_target| {
750 (false, best_finalized_relay_block_at_target, best_finalized_para_block_at_target)
751 })
752 } else {
753 None
754 };
755
756 Ok(selection.unwrap_or((
757 true,
758 best_finalized_relay_block_at_source,
759 best_possible_parachain_block,
760 )))
761}
762
763#[cfg(test)]
764mod tests {
765 use super::*;
766
767 #[test]
768 fn relay_waits_for_relay_header_to_be_delivered() {
769 assert_eq!(
770 select_headers_to_relay(
771 &RelayData {
772 required_para_header: 90,
773 para_header_at_target: Some(50),
774 para_header_at_source: Some(HeaderId(110, 110)),
775 relay_header_at_source: 800,
776 relay_header_at_target: Some(700),
777 para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
778 },
779 RelayState::RelayingRelayHeader(750),
780 ),
781 RelayState::RelayingRelayHeader(750),
782 );
783 }
784
785 #[test]
786 fn relay_starts_relaying_requested_para_header_after_relay_header_is_delivered() {
787 assert_eq!(
788 select_headers_to_relay(
789 &RelayData {
790 required_para_header: 90,
791 para_header_at_target: Some(50),
792 para_header_at_source: Some(HeaderId(110, 110)),
793 relay_header_at_source: 800,
794 relay_header_at_target: Some(750),
795 para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
796 },
797 RelayState::RelayingRelayHeader(750),
798 ),
799 RelayState::RelayingParaHeader(HeaderId(100, 100)),
800 );
801 }
802
803 #[test]
804 fn relay_selects_better_para_header_after_better_relay_header_is_delivered() {
805 assert_eq!(
806 select_headers_to_relay(
807 &RelayData {
808 required_para_header: 90,
809 para_header_at_target: Some(50),
810 para_header_at_source: Some(HeaderId(110, 110)),
811 relay_header_at_source: 800,
812 relay_header_at_target: Some(780),
813 para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
814 },
815 RelayState::RelayingRelayHeader(750),
816 ),
817 RelayState::RelayingParaHeader(HeaderId(105, 105)),
818 );
819 }
820 #[test]
821 fn relay_waits_for_para_header_to_be_delivered() {
822 assert_eq!(
823 select_headers_to_relay(
824 &RelayData {
825 required_para_header: 90,
826 para_header_at_target: Some(50),
827 para_header_at_source: Some(HeaderId(110, 110)),
828 relay_header_at_source: 800,
829 relay_header_at_target: Some(780),
830 para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
831 },
832 RelayState::RelayingParaHeader(HeaderId(105, 105)),
833 ),
834 RelayState::RelayingParaHeader(HeaderId(105, 105)),
835 );
836 }
837
838 #[test]
839 fn relay_stays_idle_if_required_para_header_is_already_delivered() {
840 assert_eq!(
841 select_headers_to_relay(
842 &RelayData {
843 required_para_header: 90,
844 para_header_at_target: Some(105),
845 para_header_at_source: Some(HeaderId(110, 110)),
846 relay_header_at_source: 800,
847 relay_header_at_target: Some(780),
848 para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
849 },
850 RelayState::Idle,
851 ),
852 RelayState::Idle,
853 );
854 }
855
856 #[test]
857 fn relay_waits_for_required_para_header_to_appear_at_source_1() {
858 assert_eq!(
859 select_headers_to_relay(
860 &RelayData {
861 required_para_header: 120,
862 para_header_at_target: Some(105),
863 para_header_at_source: None,
864 relay_header_at_source: 800,
865 relay_header_at_target: Some(780),
866 para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
867 },
868 RelayState::Idle,
869 ),
870 RelayState::Idle,
871 );
872 }
873
874 #[test]
875 fn relay_waits_for_required_para_header_to_appear_at_source_2() {
876 assert_eq!(
877 select_headers_to_relay(
878 &RelayData {
879 required_para_header: 120,
880 para_header_at_target: Some(105),
881 para_header_at_source: Some(HeaderId(110, 110)),
882 relay_header_at_source: 800,
883 relay_header_at_target: Some(780),
884 para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
885 },
886 RelayState::Idle,
887 ),
888 RelayState::Idle,
889 );
890 }
891
892 #[test]
893 fn relay_starts_relaying_relay_header_when_new_para_header_is_requested() {
894 assert_eq!(
895 select_headers_to_relay(
896 &RelayData {
897 required_para_header: 120,
898 para_header_at_target: Some(105),
899 para_header_at_source: Some(HeaderId(125, 125)),
900 relay_header_at_source: 800,
901 relay_header_at_target: Some(780),
902 para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
903 },
904 RelayState::Idle,
905 ),
906 RelayState::RelayingRelayHeader(800),
907 );
908 }
909
910 #[test]
911 fn relay_starts_relaying_para_header_when_new_para_header_is_requested() {
912 assert_eq!(
913 select_headers_to_relay(
914 &RelayData {
915 required_para_header: 120,
916 para_header_at_target: Some(105),
917 para_header_at_source: Some(HeaderId(125, 125)),
918 relay_header_at_source: 800,
919 relay_header_at_target: Some(800),
920 para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
921 },
922 RelayState::Idle,
923 ),
924 RelayState::RelayingParaHeader(HeaderId(125, 125)),
925 );
926 }
927
928 #[test]
929 fn relay_goes_idle_when_parachain_is_deregistered() {
930 assert_eq!(
931 select_headers_to_relay::<i32, _, _>(
932 &RelayData {
933 required_para_header: 120,
934 para_header_at_target: Some(105),
935 para_header_at_source: None,
936 relay_header_at_source: 800,
937 relay_header_at_target: Some(800),
938 para_header_at_relay_header_at_target: None,
939 },
940 RelayState::RelayingRelayHeader(800),
941 ),
942 RelayState::Idle,
943 );
944 }
945
946 #[test]
947 fn relay_starts_relaying_first_parachain_header() {
948 assert_eq!(
949 select_headers_to_relay::<i32, _, _>(
950 &RelayData {
951 required_para_header: 0,
952 para_header_at_target: None,
953 para_header_at_source: Some(HeaderId(125, 125)),
954 relay_header_at_source: 800,
955 relay_header_at_target: Some(800),
956 para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
957 },
958 RelayState::Idle,
959 ),
960 RelayState::RelayingParaHeader(HeaderId(125, 125)),
961 );
962 }
963
964 #[test]
965 fn relay_starts_relaying_relay_header_to_relay_first_parachain_header() {
966 assert_eq!(
967 select_headers_to_relay::<i32, _, _>(
968 &RelayData {
969 required_para_header: 0,
970 para_header_at_target: None,
971 para_header_at_source: Some(HeaderId(125, 125)),
972 relay_header_at_source: 800,
973 relay_header_at_target: Some(700),
974 para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
975 },
976 RelayState::Idle,
977 ),
978 RelayState::RelayingRelayHeader(800),
979 );
980 }
981
982 #[async_trait]
989 impl SelectHeadersToProveEnvironment<u32, u32, u32, u32> for (u32, u32, u32, u32) {
990 fn parachain_id(&self) -> ParaId {
991 ParaId(0)
992 }
993
994 async fn best_finalized_relay_block_at_source(
995 &self,
996 ) -> Result<HeaderId<u32, u32>, SubstrateError> {
997 Ok(HeaderId(self.0, self.0))
998 }
999
1000 async fn best_finalized_relay_block_at_target(
1001 &self,
1002 ) -> Result<HeaderId<u32, u32>, SubstrateError> {
1003 Ok(HeaderId(self.1, self.1))
1004 }
1005
1006 async fn best_finalized_para_block_at_source(
1007 &self,
1008 at_relay_block: HeaderId<u32, u32>,
1009 ) -> Result<Option<HeaderId<u32, u32>>, SubstrateError> {
1010 if at_relay_block.0 == self.0 {
1011 Ok(Some(HeaderId(self.2, self.2)))
1012 } else if at_relay_block.0 == self.1 {
1013 Ok(Some(HeaderId(self.3, self.3)))
1014 } else {
1015 Ok(None)
1016 }
1017 }
1018 }
1019
1020 #[async_std::test]
1021 async fn select_headers_to_prove_returns_err_if_required_para_block_is_missing_at_source() {
1022 assert!(matches!(
1023 select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 300_u32,).await,
1024 Err(SubstrateError::MissingRequiredParachainHead(ParaId(0), 300_u64)),
1025 ));
1026 }
1027
1028 #[async_std::test]
1029 async fn select_headers_to_prove_fails_to_use_existing_ancient_relay_block() {
1030 assert_eq!(
1031 select_headers_to_prove((220_u32, 10_u32, 200_u32, 100_u32), 100_u32,)
1032 .await
1033 .map_err(drop),
1034 Ok((true, HeaderId(220, 220), HeaderId(200, 200))),
1035 );
1036 }
1037
1038 #[async_std::test]
1039 async fn select_headers_to_prove_is_able_to_use_existing_recent_relay_block() {
1040 assert_eq!(
1041 select_headers_to_prove((40_u32, 10_u32, 200_u32, 100_u32), 100_u32,)
1042 .await
1043 .map_err(drop),
1044 Ok((false, HeaderId(10, 10), HeaderId(100, 100))),
1045 );
1046 }
1047
1048 #[async_std::test]
1049 async fn select_headers_to_prove_uses_new_relay_block() {
1050 assert_eq!(
1051 select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 200_u32,)
1052 .await
1053 .map_err(drop),
1054 Ok((true, HeaderId(20, 20), HeaderId(200, 200))),
1055 );
1056 }
1057}