referrerpolicy=no-referrer-when-downgrade

relay_substrate_client/
guard.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Pallet provides a set of guard functions that are running in background threads
18//! and are aborting process if some condition fails.
19
20use crate::{error::Error, Chain, Client};
21
22use async_trait::async_trait;
23use sp_version::RuntimeVersion;
24use std::{
25	fmt::Display,
26	time::{Duration, Instant},
27};
28
29/// Guards environment.
30#[async_trait]
31pub trait Environment<C>: Send + Sync + 'static {
32	/// Error type.
33	type Error: Display + Send + Sync + 'static;
34
35	/// Return current runtime version.
36	async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error>;
37
38	/// Return current time.
39	fn now(&self) -> Instant {
40		Instant::now()
41	}
42
43	/// Sleep given amount of time.
44	async fn sleep(&mut self, duration: Duration) {
45		async_std::task::sleep(duration).await
46	}
47
48	/// Abort current process. Called when guard condition check fails.
49	async fn abort(&mut self) {
50		std::process::abort();
51	}
52}
53
54/// Abort when runtime spec version is different from specified.
55pub fn abort_on_spec_version_change<C: Chain>(
56	mut env: impl Environment<C>,
57	expected_spec_version: u32,
58) {
59	async_std::task::spawn(async move {
60		log::info!(
61			target: "bridge-guard",
62			"Starting spec_version guard for {}. Expected spec_version: {}",
63			C::NAME,
64			expected_spec_version,
65		);
66
67		loop {
68			let actual_spec_version = env.runtime_version().await;
69			match actual_spec_version {
70				Ok(version) if version.spec_version == expected_spec_version => (),
71				Ok(version) => {
72					log::error!(
73						target: "bridge-guard",
74						"{} runtime spec version has changed from {} to {}. Aborting relay",
75						C::NAME,
76						expected_spec_version,
77						version.spec_version,
78					);
79
80					env.abort().await;
81				},
82				Err(error) => log::warn!(
83					target: "bridge-guard",
84					"Failed to read {} runtime version: {}. Relay may need to be stopped manually",
85					C::NAME,
86					error,
87				),
88			}
89
90			env.sleep(conditions_check_delay::<C>()).await;
91		}
92	});
93}
94
95/// Delay between conditions check.
96fn conditions_check_delay<C: Chain>() -> Duration {
97	C::AVERAGE_BLOCK_INTERVAL * (10 + rand::random::<u32>() % 10)
98}
99
100#[async_trait]
101impl<C: Chain, Clnt: Client<C>> Environment<C> for Clnt {
102	type Error = Error;
103
104	async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error> {
105		Client::<C>::runtime_version(self).await
106	}
107}
108
109#[cfg(test)]
110pub(crate) mod tests {
111	use super::*;
112	use crate::test_chain::TestChain;
113	use futures::{
114		channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
115		future::FutureExt,
116		stream::StreamExt,
117		SinkExt,
118	};
119
120	pub struct TestEnvironment {
121		pub runtime_version_rx: UnboundedReceiver<RuntimeVersion>,
122		pub slept_tx: UnboundedSender<()>,
123		pub aborted_tx: UnboundedSender<()>,
124	}
125
126	#[async_trait]
127	impl Environment<TestChain> for TestEnvironment {
128		type Error = Error;
129
130		async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error> {
131			Ok(self.runtime_version_rx.next().await.unwrap_or_default())
132		}
133
134		async fn sleep(&mut self, _duration: Duration) {
135			let _ = self.slept_tx.send(()).await;
136		}
137
138		async fn abort(&mut self) {
139			let _ = self.aborted_tx.send(()).await;
140			// simulate process abort :)
141			async_std::task::sleep(Duration::from_secs(60)).await;
142		}
143	}
144
145	#[test]
146	fn aborts_when_spec_version_is_changed() {
147		async_std::task::block_on(async {
148			let (
149				(mut runtime_version_tx, runtime_version_rx),
150				(slept_tx, mut slept_rx),
151				(aborted_tx, mut aborted_rx),
152			) = (unbounded(), unbounded(), unbounded());
153			abort_on_spec_version_change(
154				TestEnvironment { runtime_version_rx, slept_tx, aborted_tx },
155				0,
156			);
157
158			// client responds with wrong version
159			runtime_version_tx
160				.send(RuntimeVersion { spec_version: 42, ..Default::default() })
161				.await
162				.unwrap();
163
164			// then the `abort` function is called
165			aborted_rx.next().await;
166			// and we do not reach the `sleep` function call
167			assert!(slept_rx.next().now_or_never().is_none());
168		});
169	}
170
171	#[test]
172	fn does_not_aborts_when_spec_version_is_unchanged() {
173		async_std::task::block_on(async {
174			let (
175				(mut runtime_version_tx, runtime_version_rx),
176				(slept_tx, mut slept_rx),
177				(aborted_tx, mut aborted_rx),
178			) = (unbounded(), unbounded(), unbounded());
179			abort_on_spec_version_change(
180				TestEnvironment { runtime_version_rx, slept_tx, aborted_tx },
181				42,
182			);
183
184			// client responds with the same version
185			runtime_version_tx
186				.send(RuntimeVersion { spec_version: 42, ..Default::default() })
187				.await
188				.unwrap();
189
190			// then the `sleep` function is called
191			slept_rx.next().await;
192			// and the `abort` function is not called
193			assert!(aborted_rx.next().now_or_never().is_none());
194		});
195	}
196}