referrerpolicy=no-referrer-when-downgrade

zombienet_backchannel/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Provides the possibility to coordination between malicious actors and
18//! the zombienet test-runner, allowing to reference runtime's generated
19//! values in the test specifications, through a bidirectional message passing
20//! implemented as a `backchannel`.
21
22use codec;
23use futures_util::{SinkExt, StreamExt};
24use serde::{Deserialize, Serialize};
25use std::{env, sync::Mutex};
26use tokio::sync::broadcast;
27use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
28
29mod errors;
30use errors::BackchannelError;
31
32pub static ZOMBIENET_BACKCHANNEL: Mutex<Option<ZombienetBackchannel>> = Mutex::new(None);
33
34#[derive(Debug)]
35pub struct ZombienetBackchannel {
36	broadcast_tx: broadcast::Sender<BackchannelItem>,
37	ws_tx: broadcast::Sender<BackchannelItem>,
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize)]
41pub struct BackchannelItem {
42	key: String,
43	value: String,
44}
45
46pub struct Broadcaster;
47
48pub const ZOMBIENET: &str = "🧟ZOMBIENET🧟";
49
50impl Broadcaster {
51	/// Return a subscriber that will receive all message broadcasted by the zombienet backchannel
52	/// websocket server.
53	pub fn subscribe(&self) -> Result<broadcast::Receiver<BackchannelItem>, BackchannelError> {
54		let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
55		let zombienet_bkc = zombienet_bkc.as_mut().ok_or(BackchannelError::Uninitialized)?;
56		let sender = zombienet_bkc.broadcast_tx.clone();
57		Ok(sender.subscribe())
58	}
59
60	/// Provides a simple API to send a key/value to the zombienet websocket server.
61	pub async fn send(
62		&mut self,
63		key: &'static str,
64		val: impl codec::Encode,
65	) -> Result<(), BackchannelError> {
66		let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
67		let zombienet_bkc = zombienet_bkc.as_mut().ok_or(BackchannelError::Uninitialized)?;
68
69		let encoded = val.encode();
70		let backchannel_item = BackchannelItem {
71			key: key.to_string(),
72			value: String::from_utf8_lossy(&encoded).to_string(),
73		};
74
75		let sender = zombienet_bkc.ws_tx.clone();
76		sender.send(backchannel_item).map_err(|e| {
77			gum::error!(target: ZOMBIENET, "Error sending new item: {}", e);
78			BackchannelError::SendItemFail
79		})?;
80
81		Ok(())
82	}
83}
84
85impl ZombienetBackchannel {
86	pub async fn init() -> Result<(), BackchannelError> {
87		let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
88		if zombienet_bkc.is_none() {
89			let backchannel_host =
90				env::var("BACKCHANNEL_HOST").unwrap_or_else(|_| "backchannel".to_string());
91			let backchannel_port =
92				env::var("BACKCHANNEL_PORT").unwrap_or_else(|_| "3000".to_string());
93
94			// validate port
95			backchannel_port.parse::<u16>().map_err(|_| BackchannelError::InvalidPort)?;
96			// validate non empty string for host
97			if backchannel_host.trim().is_empty() {
98				return Err(BackchannelError::InvalidHost)
99			};
100
101			let ws_url = format!("ws://{}:{}/ws", backchannel_host, backchannel_port);
102			gum::debug!(target: ZOMBIENET, "Connecting to : {}", &ws_url);
103			let (ws_stream, _) =
104				connect_async(ws_url).await.map_err(|_| BackchannelError::CantConnectToWS)?;
105			let (mut write, mut read) = ws_stream.split();
106
107			let (tx, _rx) = broadcast::channel(256);
108			let (tx_relay, mut rx_relay) = broadcast::channel::<BackchannelItem>(256);
109
110			// receive from the ws and send to all subcribers
111			let tx1 = tx.clone();
112			tokio::spawn(async move {
113				while let Some(Ok(Message::Text(text))) = read.next().await {
114					match serde_json::from_str::<BackchannelItem>(&text) {
115						Ok(backchannel_item) =>
116							if tx1.send(backchannel_item).is_err() {
117								gum::error!(target: ZOMBIENET, "Error sending through the channel");
118								return
119							},
120						Err(_) => {
121							gum::error!(target: ZOMBIENET, "Invalid payload received");
122						},
123					}
124				}
125			});
126
127			// receive from subscribers and relay to ws
128			tokio::spawn(async move {
129				while let Ok(item) = rx_relay.recv().await {
130					if write
131						.send(Message::Text(serde_json::to_string(&item).unwrap().into()))
132						.await
133						.is_err()
134					{
135						gum::error!(target: ZOMBIENET, "Error sending through ws");
136					}
137				}
138			});
139
140			*zombienet_bkc = Some(ZombienetBackchannel { broadcast_tx: tx, ws_tx: tx_relay });
141			return Ok(())
142		}
143
144		Err(BackchannelError::AlreadyInitialized)
145	}
146
147	/// Ensure that the backchannel is initialized and return a broadcaster instance
148	/// allowing to subscribe or send new items.
149	pub fn broadcaster() -> Result<Broadcaster, BackchannelError> {
150		if ZOMBIENET_BACKCHANNEL.lock().unwrap().is_some() {
151			Ok(Broadcaster {})
152		} else {
153			Err(BackchannelError::Uninitialized)
154		}
155	}
156}