1use crate::finality::SubmitFinalityProofCallBuilder;
20
21use async_std::sync::{Arc, Mutex};
22use async_trait::async_trait;
23use bp_header_chain::ConsensusLogReader;
24use bp_runtime::HeaderIdProvider;
25use futures::{select, FutureExt};
26use num_traits::{One, Saturating, Zero};
27use sp_runtime::traits::Header;
28
29use finality_relay::{FinalitySyncParams, HeadersToRelay, TargetClient as FinalityTargetClient};
30use relay_substrate_client::{
31 AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError,
32 HeaderIdOf,
33};
34use relay_utils::{
35 metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError,
36 STALL_TIMEOUT,
37};
38
39use crate::{
40 finality::{
41 source::{RequiredHeaderNumberRef, SubstrateFinalitySource},
42 target::SubstrateFinalityTarget,
43 SubstrateFinalitySyncPipeline, RECENT_FINALITY_PROOFS_LIMIT,
44 },
45 finality_base::engine::Engine,
46 on_demand::OnDemandRelay,
47 TransactionParams,
48};
49
50#[derive(Clone)]
56pub struct OnDemandHeadersRelay<P: SubstrateFinalitySyncPipeline, SourceClnt, TargetClnt> {
57 relay_task_name: String,
59 required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
61 source_client: SourceClnt,
63 target_client: TargetClnt,
65}
66
67impl<
68 P: SubstrateFinalitySyncPipeline,
69 SourceClnt: Client<P::SourceChain>,
70 TargetClnt: Client<P::TargetChain>,
71 > OnDemandHeadersRelay<P, SourceClnt, TargetClnt>
72{
73 pub fn new(
78 source_client: SourceClnt,
79 target_client: TargetClnt,
80 target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
81 headers_to_relay: HeadersToRelay,
82 metrics_params: Option<MetricsParams>,
83 ) -> Self
84 where
85 AccountIdOf<P::TargetChain>:
86 From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
87 {
88 let required_header_number = Arc::new(Mutex::new(Zero::zero()));
89 let this = OnDemandHeadersRelay {
90 relay_task_name: on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>(),
91 required_header_number: required_header_number.clone(),
92 source_client: source_client.clone(),
93 target_client: target_client.clone(),
94 };
95 async_std::task::spawn(async move {
96 background_task::<P>(
97 source_client,
98 target_client,
99 target_transaction_params,
100 headers_to_relay,
101 required_header_number,
102 metrics_params,
103 )
104 .await;
105 });
106
107 this
108 }
109}
110
111#[async_trait]
112impl<
113 P: SubstrateFinalitySyncPipeline,
114 SourceClnt: Client<P::SourceChain>,
115 TargetClnt: Client<P::TargetChain>,
116 > OnDemandRelay<P::SourceChain, P::TargetChain>
117 for OnDemandHeadersRelay<P, SourceClnt, TargetClnt>
118{
119 async fn reconnect(&self) -> Result<(), SubstrateError> {
120 self.source_client.clone().reconnect().await?;
123 self.target_client.clone().reconnect().await
124 }
125
126 async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceChain>) {
127 let mut required_header_number = self.required_header_number.lock().await;
128 if required_header > *required_header_number {
129 log::trace!(
130 target: "bridge",
131 "[{}] More {} headers required. Going to sync up to the {}",
132 self.relay_task_name,
133 P::SourceChain::NAME,
134 required_header,
135 );
136
137 *required_header_number = required_header;
138 }
139 }
140
141 async fn prove_header(
142 &self,
143 required_header: BlockNumberOf<P::SourceChain>,
144 ) -> Result<(HeaderIdOf<P::SourceChain>, Vec<CallOf<P::TargetChain>>), SubstrateError> {
145 const MAX_ITERATIONS: u32 = 4;
146 let mut iterations = 0;
147 let mut current_required_header = required_header;
148 loop {
149 let finality_source =
151 SubstrateFinalitySource::<P, _>::new(self.source_client.clone(), None);
152 let (header, mut proof) =
153 finality_source.prove_block_finality(current_required_header).await?;
154 let header_id = header.id();
155
156 let context = P::FinalityEngine::verify_and_optimize_proof(
158 &self.target_client,
159 &header,
160 &mut proof,
161 )
162 .await?;
163
164 let check_result = P::FinalityEngine::check_max_expected_call_limits(&header, &proof);
167 if check_result.is_weight_limit_exceeded || check_result.extra_size != 0 {
168 iterations += 1;
169 current_required_header = header_id.number().saturating_add(One::one());
170 if iterations < MAX_ITERATIONS {
171 log::debug!(
172 target: "bridge",
173 "[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?}. But it exceeds limits: {:?}. \
174 Going to select next header",
175 self.relay_task_name,
176 P::SourceChain::NAME,
177 required_header,
178 P::SourceChain::NAME,
179 header_id,
180 check_result,
181 );
182
183 continue;
184 }
185 }
186
187 log::debug!(
188 target: "bridge",
189 "[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?} (after {} iterations)",
190 self.relay_task_name,
191 P::SourceChain::NAME,
192 required_header,
193 P::SourceChain::NAME,
194 header_id,
195 iterations,
196 );
197
198 let call = P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(
200 header, proof, false, context,
201 );
202
203 return Ok((header_id, vec![call]));
204 }
205 }
206}
207
208async fn background_task<P: SubstrateFinalitySyncPipeline>(
210 source_client: impl Client<P::SourceChain>,
211 target_client: impl Client<P::TargetChain>,
212 target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
213 headers_to_relay: HeadersToRelay,
214 required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
215 metrics_params: Option<MetricsParams>,
216) where
217 AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
218{
219 let relay_task_name = on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>();
220 let target_transactions_mortality = target_transaction_params.mortality;
221 let mut finality_source = SubstrateFinalitySource::<P, _>::new(
222 source_client.clone(),
223 Some(required_header_number.clone()),
224 );
225 let mut finality_target =
226 SubstrateFinalityTarget::new(target_client.clone(), target_transaction_params);
227 let mut latest_non_mandatory_at_source = Zero::zero();
228
229 let mut restart_relay = true;
230 let finality_relay_task = futures::future::Fuse::terminated();
231 futures::pin_mut!(finality_relay_task);
232
233 loop {
234 select! {
235 _ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
236 _ = finality_relay_task => {
237 restart_relay = true;
239 },
240 }
241
242 let best_finalized_source_header_at_source =
244 best_finalized_source_header_at_source(&finality_source, &relay_task_name).await;
245 if matches!(best_finalized_source_header_at_source, Err(ref e) if e.is_connection_error()) {
246 relay_utils::relay_loop::reconnect_failed_client(
247 FailedClient::Source,
248 relay_utils::relay_loop::RECONNECT_DELAY,
249 &mut finality_source,
250 &mut finality_target,
251 )
252 .await;
253 continue
254 }
255
256 let best_finalized_source_header_at_target =
258 best_finalized_source_header_at_target::<P, _>(&finality_target, &relay_task_name)
259 .await;
260 if matches!(best_finalized_source_header_at_target, Err(ref e) if e.is_connection_error()) {
261 relay_utils::relay_loop::reconnect_failed_client(
262 FailedClient::Target,
263 relay_utils::relay_loop::RECONNECT_DELAY,
264 &mut finality_source,
265 &mut finality_target,
266 )
267 .await;
268 continue
269 }
270
271 let best_finalized_source_header_at_source_fmt =
273 format!("{best_finalized_source_header_at_source:?}");
274 let best_finalized_source_header_at_target_fmt =
275 format!("{best_finalized_source_header_at_target:?}");
276 let required_header_number_value = *required_header_number.lock().await;
277 let mandatory_scan_range = mandatory_headers_scan_range::<P::SourceChain>(
278 best_finalized_source_header_at_source.ok(),
279 best_finalized_source_header_at_target.ok(),
280 required_header_number_value,
281 )
282 .await;
283
284 log::trace!(
285 target: "bridge",
286 "[{}] Mandatory headers scan range: ({:?}, {:?}, {:?}) -> {:?}",
287 relay_task_name,
288 required_header_number_value,
289 best_finalized_source_header_at_source_fmt,
290 best_finalized_source_header_at_target_fmt,
291 mandatory_scan_range,
292 );
293
294 if let Some(mandatory_scan_range) = mandatory_scan_range {
295 let relay_mandatory_header_result = relay_mandatory_header_from_range(
296 &finality_source,
297 &required_header_number,
298 best_finalized_source_header_at_target_fmt,
299 (
300 std::cmp::max(mandatory_scan_range.0, latest_non_mandatory_at_source),
301 mandatory_scan_range.1,
302 ),
303 &relay_task_name,
304 )
305 .await;
306 match relay_mandatory_header_result {
307 Ok(true) => (),
308 Ok(false) => {
309 latest_non_mandatory_at_source = mandatory_scan_range.1;
312
313 log::trace!(
314 target: "bridge",
315 "[{}] No mandatory {} headers in the range {:?}",
316 relay_task_name,
317 P::SourceChain::NAME,
318 mandatory_scan_range,
319 );
320 },
321 Err(e) => {
322 log::warn!(
323 target: "bridge",
324 "[{}] Failed to scan mandatory {} headers range ({:?}): {:?}",
325 relay_task_name,
326 P::SourceChain::NAME,
327 mandatory_scan_range,
328 e,
329 );
330
331 if e.is_connection_error() {
332 relay_utils::relay_loop::reconnect_failed_client(
333 FailedClient::Source,
334 relay_utils::relay_loop::RECONNECT_DELAY,
335 &mut finality_source,
336 &mut finality_target,
337 )
338 .await;
339 continue
340 }
341 },
342 }
343 }
344
345 if restart_relay {
347 let stall_timeout = relay_substrate_client::transaction_stall_timeout(
348 target_transactions_mortality,
349 P::TargetChain::AVERAGE_BLOCK_INTERVAL,
350 STALL_TIMEOUT,
351 );
352
353 log::info!(
354 target: "bridge",
355 "[{}] Starting on-demand headers relay task\n\t\
356 Headers to relay: {:?}\n\t\
357 Tx mortality: {:?} (~{}m)\n\t\
358 Stall timeout: {:?}",
359 relay_task_name,
360 headers_to_relay,
361 target_transactions_mortality,
362 stall_timeout.as_secs_f64() / 60.0f64,
363 stall_timeout,
364 );
365
366 finality_relay_task.set(
367 finality_relay::run(
368 finality_source.clone(),
369 finality_target.clone(),
370 FinalitySyncParams {
371 tick: std::cmp::max(
372 P::SourceChain::AVERAGE_BLOCK_INTERVAL,
373 P::TargetChain::AVERAGE_BLOCK_INTERVAL,
374 ),
375 recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
376 stall_timeout,
377 headers_to_relay,
378 },
379 metrics_params.clone().unwrap_or_else(MetricsParams::disabled),
380 futures::future::pending(),
381 )
382 .fuse(),
383 );
384
385 restart_relay = false;
386 }
387 }
388}
389
390async fn mandatory_headers_scan_range<C: Chain>(
393 best_finalized_source_header_at_source: Option<C::BlockNumber>,
394 best_finalized_source_header_at_target: Option<C::BlockNumber>,
395 required_header_number: BlockNumberOf<C>,
396) -> Option<(C::BlockNumber, C::BlockNumber)> {
397 let best_finalized_source_header_at_target =
401 best_finalized_source_header_at_target.unwrap_or(required_header_number);
402
403 let best_finalized_source_header_at_source =
406 best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
407
408 if required_header_number >= best_finalized_source_header_at_source {
410 return None
411 }
412
413 Some((
414 best_finalized_source_header_at_target + One::one(),
415 best_finalized_source_header_at_source,
416 ))
417}
418
419async fn relay_mandatory_header_from_range<P, SourceClnt>(
424 finality_source: &SubstrateFinalitySource<P, SourceClnt>,
425 required_header_number: &RequiredHeaderNumberRef<P::SourceChain>,
426 best_finalized_source_header_at_target: String,
427 range: (BlockNumberOf<P::SourceChain>, BlockNumberOf<P::SourceChain>),
428 relay_task_name: &str,
429) -> Result<bool, relay_substrate_client::Error>
430where
431 P: SubstrateFinalitySyncPipeline,
432 SourceClnt: Client<P::SourceChain>,
433{
434 let mandatory_source_header_number =
436 find_mandatory_header_in_range(finality_source, range).await?;
437
438 let mandatory_source_header_number = match mandatory_source_header_number {
440 Some(mandatory_source_header_number) => mandatory_source_header_number,
441 None => return Ok(false),
442 };
443
444 let mut required_header_number = required_header_number.lock().await;
447 if *required_header_number >= mandatory_source_header_number {
448 return Ok(false)
449 }
450
451 log::trace!(
452 target: "bridge",
453 "[{}] Too many {} headers missing at target ({} vs {}). Going to sync up to the mandatory {}",
454 relay_task_name,
455 P::SourceChain::NAME,
456 best_finalized_source_header_at_target,
457 range.1,
458 mandatory_source_header_number,
459 );
460
461 *required_header_number = mandatory_source_header_number;
462 Ok(true)
463}
464
465async fn best_finalized_source_header_at_source<P, SourceClnt>(
469 finality_source: &SubstrateFinalitySource<P, SourceClnt>,
470 relay_task_name: &str,
471) -> Result<BlockNumberOf<P::SourceChain>, relay_substrate_client::Error>
472where
473 P: SubstrateFinalitySyncPipeline,
474 SourceClnt: Client<P::SourceChain>,
475{
476 finality_source.on_chain_best_finalized_block_number().await.map_err(|error| {
477 log::error!(
478 target: "bridge",
479 "[{}] Failed to read best finalized source header from source: {:?}",
480 relay_task_name,
481 error,
482 );
483
484 error
485 })
486}
487
488async fn best_finalized_source_header_at_target<P, TargetClnt>(
492 finality_target: &SubstrateFinalityTarget<P, TargetClnt>,
493 relay_task_name: &str,
494) -> Result<
495 BlockNumberOf<P::SourceChain>,
496 <SubstrateFinalityTarget<P, TargetClnt> as RelayClient>::Error,
497>
498where
499 P: SubstrateFinalitySyncPipeline,
500 TargetClnt: Client<P::TargetChain>,
501 AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as sp_core::Pair>::Public>,
502{
503 finality_target
504 .best_finalized_source_block_id()
505 .await
506 .map_err(|error| {
507 log::error!(
508 target: "bridge",
509 "[{}] Failed to read best finalized source header from target: {:?}",
510 relay_task_name,
511 error,
512 );
513
514 error
515 })
516 .map(|id| id.0)
517}
518
519async fn find_mandatory_header_in_range<P, SourceClnt>(
523 finality_source: &SubstrateFinalitySource<P, SourceClnt>,
524 range: (BlockNumberOf<P::SourceChain>, BlockNumberOf<P::SourceChain>),
525) -> Result<Option<BlockNumberOf<P::SourceChain>>, relay_substrate_client::Error>
526where
527 P: SubstrateFinalitySyncPipeline,
528 SourceClnt: Client<P::SourceChain>,
529{
530 let mut current = range.0;
531 while current <= range.1 {
532 let header = finality_source.client().header_by_number(current).await?;
533 if <P::FinalityEngine as Engine<P::SourceChain>>::ConsensusLogReader::schedules_authorities_change(
534 header.digest(),
535 ) {
536 return Ok(Some(current))
537 }
538
539 current += One::one();
540 }
541
542 Ok(None)
543}
544
545fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
547 format!("{}-to-{}-on-demand-headers", SourceChain::NAME, TargetChain::NAME)
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use relay_substrate_client::test_chain::TestChain;
554
555 const AT_SOURCE: Option<BlockNumberOf<TestChain>> = Some(10);
556 const AT_TARGET: Option<BlockNumberOf<TestChain>> = Some(1);
557
558 #[async_std::test]
559 async fn mandatory_headers_scan_range_selects_range_if_some_headers_are_missing() {
560 assert_eq!(
561 mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, 0,).await,
562 Some((AT_TARGET.unwrap() + 1, AT_SOURCE.unwrap())),
563 );
564 }
565
566 #[async_std::test]
567 async fn mandatory_headers_scan_range_selects_nothing_if_already_queued() {
568 assert_eq!(
569 mandatory_headers_scan_range::<TestChain>(AT_SOURCE, AT_TARGET, AT_SOURCE.unwrap(),)
570 .await,
571 None,
572 );
573 }
574}