relay_substrate_client/
guard.rs1use crate::{error::Error, Chain, Client};
21
22use async_trait::async_trait;
23use sp_version::RuntimeVersion;
24use std::{
25 fmt::Display,
26 time::{Duration, Instant},
27};
28
29#[async_trait]
31pub trait Environment<C>: Send + Sync + 'static {
32 type Error: Display + Send + Sync + 'static;
34
35 async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error>;
37
38 fn now(&self) -> Instant {
40 Instant::now()
41 }
42
43 async fn sleep(&mut self, duration: Duration) {
45 async_std::task::sleep(duration).await
46 }
47
48 async fn abort(&mut self) {
50 std::process::abort();
51 }
52}
53
54pub fn abort_on_spec_version_change<C: Chain>(
56 mut env: impl Environment<C>,
57 expected_spec_version: u32,
58) {
59 async_std::task::spawn(async move {
60 log::info!(
61 target: "bridge-guard",
62 "Starting spec_version guard for {}. Expected spec_version: {}",
63 C::NAME,
64 expected_spec_version,
65 );
66
67 loop {
68 let actual_spec_version = env.runtime_version().await;
69 match actual_spec_version {
70 Ok(version) if version.spec_version == expected_spec_version => (),
71 Ok(version) => {
72 log::error!(
73 target: "bridge-guard",
74 "{} runtime spec version has changed from {} to {}. Aborting relay",
75 C::NAME,
76 expected_spec_version,
77 version.spec_version,
78 );
79
80 env.abort().await;
81 },
82 Err(error) => log::warn!(
83 target: "bridge-guard",
84 "Failed to read {} runtime version: {}. Relay may need to be stopped manually",
85 C::NAME,
86 error,
87 ),
88 }
89
90 env.sleep(conditions_check_delay::<C>()).await;
91 }
92 });
93}
94
95fn conditions_check_delay<C: Chain>() -> Duration {
97 C::AVERAGE_BLOCK_INTERVAL * (10 + rand::random::<u32>() % 10)
98}
99
100#[async_trait]
101impl<C: Chain, Clnt: Client<C>> Environment<C> for Clnt {
102 type Error = Error;
103
104 async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error> {
105 Client::<C>::runtime_version(self).await
106 }
107}
108
109#[cfg(test)]
110pub(crate) mod tests {
111 use super::*;
112 use crate::test_chain::TestChain;
113 use futures::{
114 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
115 future::FutureExt,
116 stream::StreamExt,
117 SinkExt,
118 };
119
120 pub struct TestEnvironment {
121 pub runtime_version_rx: UnboundedReceiver<RuntimeVersion>,
122 pub slept_tx: UnboundedSender<()>,
123 pub aborted_tx: UnboundedSender<()>,
124 }
125
126 #[async_trait]
127 impl Environment<TestChain> for TestEnvironment {
128 type Error = Error;
129
130 async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error> {
131 Ok(self.runtime_version_rx.next().await.unwrap_or_default())
132 }
133
134 async fn sleep(&mut self, _duration: Duration) {
135 let _ = self.slept_tx.send(()).await;
136 }
137
138 async fn abort(&mut self) {
139 let _ = self.aborted_tx.send(()).await;
140 async_std::task::sleep(Duration::from_secs(60)).await;
142 }
143 }
144
145 #[test]
146 fn aborts_when_spec_version_is_changed() {
147 async_std::task::block_on(async {
148 let (
149 (mut runtime_version_tx, runtime_version_rx),
150 (slept_tx, mut slept_rx),
151 (aborted_tx, mut aborted_rx),
152 ) = (unbounded(), unbounded(), unbounded());
153 abort_on_spec_version_change(
154 TestEnvironment { runtime_version_rx, slept_tx, aborted_tx },
155 0,
156 );
157
158 runtime_version_tx
160 .send(RuntimeVersion { spec_version: 42, ..Default::default() })
161 .await
162 .unwrap();
163
164 aborted_rx.next().await;
166 assert!(slept_rx.next().now_or_never().is_none());
168 });
169 }
170
171 #[test]
172 fn does_not_aborts_when_spec_version_is_unchanged() {
173 async_std::task::block_on(async {
174 let (
175 (mut runtime_version_tx, runtime_version_rx),
176 (slept_tx, mut slept_rx),
177 (aborted_tx, mut aborted_rx),
178 ) = (unbounded(), unbounded(), unbounded());
179 abort_on_spec_version_change(
180 TestEnvironment { runtime_version_rx, slept_tx, aborted_tx },
181 42,
182 );
183
184 runtime_version_tx
186 .send(RuntimeVersion { spec_version: 42, ..Default::default() })
187 .await
188 .unwrap();
189
190 slept_rx.next().await;
192 assert!(aborted_rx.next().now_or_never().is_none());
194 });
195 }
196}