zombienet_backchannel/
lib.rs1use codec;
23use futures_util::{SinkExt, StreamExt};
24use serde::{Deserialize, Serialize};
25use std::{env, sync::Mutex};
26use tokio::sync::broadcast;
27use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
28
29mod errors;
30use errors::BackchannelError;
31
32pub static ZOMBIENET_BACKCHANNEL: Mutex<Option<ZombienetBackchannel>> = Mutex::new(None);
33
34#[derive(Debug)]
35pub struct ZombienetBackchannel {
36 broadcast_tx: broadcast::Sender<BackchannelItem>,
37 ws_tx: broadcast::Sender<BackchannelItem>,
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize)]
41pub struct BackchannelItem {
42 key: String,
43 value: String,
44}
45
46pub struct Broadcaster;
47
48pub const ZOMBIENET: &str = "🧟ZOMBIENET🧟";
49
50impl Broadcaster {
51 pub fn subscribe(&self) -> Result<broadcast::Receiver<BackchannelItem>, BackchannelError> {
54 let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
55 let zombienet_bkc = zombienet_bkc.as_mut().ok_or(BackchannelError::Uninitialized)?;
56 let sender = zombienet_bkc.broadcast_tx.clone();
57 Ok(sender.subscribe())
58 }
59
60 pub async fn send(
62 &mut self,
63 key: &'static str,
64 val: impl codec::Encode,
65 ) -> Result<(), BackchannelError> {
66 let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
67 let zombienet_bkc = zombienet_bkc.as_mut().ok_or(BackchannelError::Uninitialized)?;
68
69 let encoded = val.encode();
70 let backchannel_item = BackchannelItem {
71 key: key.to_string(),
72 value: String::from_utf8_lossy(&encoded).to_string(),
73 };
74
75 let sender = zombienet_bkc.ws_tx.clone();
76 sender.send(backchannel_item).map_err(|e| {
77 gum::error!(target: ZOMBIENET, "Error sending new item: {}", e);
78 BackchannelError::SendItemFail
79 })?;
80
81 Ok(())
82 }
83}
84
85impl ZombienetBackchannel {
86 pub async fn init() -> Result<(), BackchannelError> {
87 let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
88 if zombienet_bkc.is_none() {
89 let backchannel_host =
90 env::var("BACKCHANNEL_HOST").unwrap_or_else(|_| "backchannel".to_string());
91 let backchannel_port =
92 env::var("BACKCHANNEL_PORT").unwrap_or_else(|_| "3000".to_string());
93
94 backchannel_port.parse::<u16>().map_err(|_| BackchannelError::InvalidPort)?;
96 if backchannel_host.trim().is_empty() {
98 return Err(BackchannelError::InvalidHost)
99 };
100
101 let ws_url = format!("ws://{}:{}/ws", backchannel_host, backchannel_port);
102 gum::debug!(target: ZOMBIENET, "Connecting to : {}", &ws_url);
103 let (ws_stream, _) =
104 connect_async(ws_url).await.map_err(|_| BackchannelError::CantConnectToWS)?;
105 let (mut write, mut read) = ws_stream.split();
106
107 let (tx, _rx) = broadcast::channel(256);
108 let (tx_relay, mut rx_relay) = broadcast::channel::<BackchannelItem>(256);
109
110 let tx1 = tx.clone();
112 tokio::spawn(async move {
113 while let Some(Ok(Message::Text(text))) = read.next().await {
114 match serde_json::from_str::<BackchannelItem>(&text) {
115 Ok(backchannel_item) =>
116 if tx1.send(backchannel_item).is_err() {
117 gum::error!(target: ZOMBIENET, "Error sending through the channel");
118 return
119 },
120 Err(_) => {
121 gum::error!(target: ZOMBIENET, "Invalid payload received");
122 },
123 }
124 }
125 });
126
127 tokio::spawn(async move {
129 while let Ok(item) = rx_relay.recv().await {
130 if write
131 .send(Message::Text(serde_json::to_string(&item).unwrap().into()))
132 .await
133 .is_err()
134 {
135 gum::error!(target: ZOMBIENET, "Error sending through ws");
136 }
137 }
138 });
139
140 *zombienet_bkc = Some(ZombienetBackchannel { broadcast_tx: tx, ws_tx: tx_relay });
141 return Ok(())
142 }
143
144 Err(BackchannelError::AlreadyInitialized)
145 }
146
147 pub fn broadcaster() -> Result<Broadcaster, BackchannelError> {
150 if ZOMBIENET_BACKCHANNEL.lock().unwrap().is_some() {
151 Ok(Broadcaster {})
152 } else {
153 Err(BackchannelError::Uninitialized)
154 }
155 }
156}