substrate_cli_test_utils/
lib.rs1#![cfg(unix)]
19
20use assert_cmd::cargo::cargo_bin;
21use nix::{
22 sys::signal::{kill, Signal, Signal::SIGINT},
23 unistd::Pid,
24};
25use node_primitives::{Hash, Header};
26use regex::Regex;
27use sp_rpc::{list::ListOrValue, number::NumberOrHex};
28use std::{
29 io::{BufRead, BufReader, Read},
30 ops::{Deref, DerefMut},
31 path::{Path, PathBuf},
32 process::{self, Child, Command},
33 time::Duration,
34};
35use tokio::io::{AsyncBufReadExt, AsyncRead};
36
37pub fn start_node_inline(args: Vec<&str>) -> Result<(), sc_service::error::Error> {
60 use sc_cli::SubstrateCli;
61
62 let cli_call = std::iter::once("node-template").chain(args);
64 let cli = node_cli::Cli::from_iter(cli_call);
65 let runner = cli.create_runner(&cli.run).unwrap();
66 runner.run_node_until_exit(|config| async move { node_cli::service::new_full(config, cli) })
67}
68
69pub fn start_node() -> Child {
95 Command::new(cargo_bin("substrate-node"))
96 .stdout(process::Stdio::piped())
97 .stderr(process::Stdio::piped())
98 .args(&["--dev", "--tmp", "--rpc-port=45789", "--no-hardware-benchmarks"])
99 .spawn()
100 .unwrap()
101}
102
103pub fn build_substrate(args: &[&str]) {
132 let is_release_build = !cfg!(build_profile = "debug");
133
134 let mut cmd = Command::new("cargo");
136
137 cmd.arg("build").arg("-p=staging-node-cli");
138
139 if is_release_build {
140 cmd.arg("--release");
141 }
142
143 let output = cmd
144 .args(args)
145 .output()
146 .expect(format!("Failed to execute 'cargo b' with args {:?}'", args).as_str());
147
148 if !output.status.success() {
149 panic!(
150 "Failed to execute 'cargo b' with args {:?}': \n{}",
151 args,
152 String::from_utf8_lossy(&output.stderr)
153 );
154 }
155}
156
157pub async fn wait_for_stream_pattern_match<R>(stream: R, re: Regex) -> Result<(), String>
192where
193 R: AsyncRead + Unpin,
194{
195 let mut stdio_reader = tokio::io::BufReader::new(stream).lines();
196 while let Ok(Some(line)) = stdio_reader.next_line().await {
197 match re.find(line.as_str()) {
198 Some(_) => return Ok(()),
199 None => (),
200 }
201 }
202 Err(String::from("Stream closed without any lines matching the regex."))
203}
204
205pub async fn run_with_timeout(timeout: Duration, future: impl futures::Future<Output = ()>) {
207 tokio::time::timeout(timeout, future).await.expect("Hit timeout");
208}
209
210pub async fn wait_n_finalized_blocks(n: usize, url: &str) {
212 use substrate_rpc_client::{ws_client, ChainApi};
213
214 let mut built_blocks = std::collections::HashSet::new();
215 let block_duration = Duration::from_secs(2);
216 let mut interval = tokio::time::interval(block_duration);
217 let rpc = ws_client(url).await.unwrap();
218
219 loop {
220 if let Ok(block) = ChainApi::<(), Hash, Header, ()>::finalized_head(&rpc).await {
221 built_blocks.insert(block);
222 if built_blocks.len() > n {
223 break
224 }
225 };
226 interval.tick().await;
227 }
228}
229
230pub async fn run_node_for_a_while(base_path: &Path, args: &[&str]) {
232 run_with_timeout(Duration::from_secs(60 * 10), async move {
233 let mut cmd = Command::new(cargo_bin("substrate-node"))
234 .stdout(process::Stdio::piped())
235 .stderr(process::Stdio::piped())
236 .args(args)
237 .arg("-d")
238 .arg(base_path)
239 .spawn()
240 .unwrap();
241
242 let stderr = cmd.stderr.take().unwrap();
243
244 let mut child = KillChildOnDrop(cmd);
245
246 let ws_url = extract_info_from_output(stderr).0.ws_url;
247
248 wait_n_finalized_blocks(3, &ws_url).await;
250
251 child.assert_still_running();
252
253 child.stop();
255 })
256 .await
257}
258
259pub async fn block_hash(block_number: u64, url: &str) -> Result<Hash, String> {
260 use substrate_rpc_client::{ws_client, ChainApi};
261
262 let rpc = ws_client(url).await.unwrap();
263
264 let result = ChainApi::<(), Hash, Header, ()>::block_hash(
265 &rpc,
266 Some(ListOrValue::Value(NumberOrHex::Number(block_number))),
267 )
268 .await
269 .map_err(|_| "Couldn't get block hash".to_string())?;
270
271 match result {
272 ListOrValue::Value(maybe_block_hash) if maybe_block_hash.is_some() =>
273 Ok(maybe_block_hash.unwrap()),
274 _ => Err("Couldn't get block hash".to_string()),
275 }
276}
277
278pub struct KillChildOnDrop(pub Child);
279
280impl KillChildOnDrop {
281 pub fn stop(&mut self) {
285 self.stop_with_signal(SIGINT);
286 }
287
288 pub fn stop_with_signal(&mut self, signal: Signal) {
290 kill(Pid::from_raw(self.id().try_into().unwrap()), signal).unwrap();
291 assert!(self.wait().unwrap().success());
292 }
293
294 pub fn assert_still_running(&mut self) {
296 assert!(self.try_wait().unwrap().is_none(), "the process should still be running");
297 }
298}
299
300impl Drop for KillChildOnDrop {
301 fn drop(&mut self) {
302 let _ = self.0.kill();
303 }
304}
305
306impl Deref for KillChildOnDrop {
307 type Target = Child;
308
309 fn deref(&self) -> &Self::Target {
310 &self.0
311 }
312}
313
314impl DerefMut for KillChildOnDrop {
315 fn deref_mut(&mut self) -> &mut Self::Target {
316 &mut self.0
317 }
318}
319
320pub struct NodeInfo {
322 pub ws_url: String,
323 pub db_path: PathBuf,
324}
325
326pub fn extract_info_from_output(read: impl Read + Send) -> (NodeInfo, String) {
330 let mut data = String::new();
331
332 let ws_url = BufReader::new(read)
333 .lines()
334 .find_map(|line| {
335 let line = line.expect("failed to obtain next line while extracting node info");
336 data.push_str(&line);
337 data.push_str("\n");
338
339 let sock_addr = match line.split_once("Running JSON-RPC server: addr=") {
341 None => return None,
342 Some((_, after)) => after.split_once(",").unwrap().0,
343 };
344
345 Some(format!("ws://{}", sock_addr))
346 })
347 .unwrap_or_else(|| {
348 eprintln!("Observed node output:\n{}", data);
349 panic!("We should get a WebSocket address")
350 });
351
352 let re = Regex::new(r"Database: .+ at (\S+)").unwrap();
354 let db_path = PathBuf::from(re.captures(data.as_str()).unwrap().get(1).unwrap().as_str());
355
356 (NodeInfo { ws_url, db_path }, data)
357}