cumulus_zombienet_sdk_helpers/
lib.rs1use anyhow::anyhow;
5use codec::{Compact, Decode};
6use cumulus_primitives_core::{relay_chain, rpsr_digest::RPSR_CONSENSUS_ID};
7use futures::stream::StreamExt;
8use polkadot_primitives::{CandidateReceiptV2, Id as ParaId};
9use std::{cmp::max, collections::HashMap, ops::Range};
10use tokio::{
11 join,
12 time::{sleep, Duration},
13};
14use zombienet_sdk::subxt::{
15 self,
16 blocks::Block,
17 config::{polkadot::PolkadotExtrinsicParamsBuilder, substrate::DigestItem},
18 dynamic::Value,
19 events::Events,
20 ext::scale_value::value,
21 tx::{signer::Signer, DynamicPayload, TxStatus},
22 utils::H256,
23 OnlineClient, PolkadotConfig,
24};
25
26const WAIT_MAX_BLOCKS_FOR_SESSION: u32 = 50;
29
30pub fn create_assign_core_call(core_and_para: &[(u32, u32)]) -> DynamicPayload {
32 let mut assign_cores = vec![];
33 for (core, para_id) in core_and_para.iter() {
34 assign_cores.push(value! {
35 Coretime(assign_core { core : *core, begin: 0, assignment: ((Task(*para_id), 57600)), end_hint: None() })
36 });
37 }
38
39 zombienet_sdk::subxt::tx::dynamic(
40 "Sudo",
41 "sudo",
42 vec![value! {
43 Utility(batch { calls: assign_cores })
44 }],
45 )
46}
47
48fn find_event_and_decode_fields<T: Decode>(
50 events: &Events<PolkadotConfig>,
51 pallet: &str,
52 variant: &str,
53) -> Result<Vec<T>, anyhow::Error> {
54 let mut result = vec![];
55 for event in events.iter() {
56 let event = event?;
57 if event.pallet_name() == pallet && event.variant_name() == variant {
58 let field_bytes = event.field_bytes().to_vec();
59 result.push(T::decode(&mut &field_bytes[..])?);
60 }
61 }
62 Ok(result)
63}
64
65pub async fn assert_para_throughput(
70 relay_client: &OnlineClient<PolkadotConfig>,
71 stop_after: u32,
72 expected_candidate_ranges: HashMap<ParaId, Range<u32>>,
73) -> Result<(), anyhow::Error> {
74 let mut blocks_sub = relay_client.blocks().subscribe_finalized().await?;
75 let mut candidate_count: HashMap<ParaId, u32> = HashMap::new();
76 let mut current_block_count = 0;
77
78 let valid_para_ids: Vec<ParaId> = expected_candidate_ranges.keys().cloned().collect();
79
80 wait_for_first_session_change(&mut blocks_sub).await?;
82
83 while let Some(block) = blocks_sub.next().await {
84 let block = block?;
85 log::debug!("Finalized relay chain block {}", block.number());
86 let events = block.events().await?;
87 let is_session_change = events.iter().any(|event| {
88 event.as_ref().is_ok_and(|event| {
89 event.pallet_name() == "Session" && event.variant_name() == "NewSession"
90 })
91 });
92
93 if is_session_change {
95 continue;
96 }
97
98 current_block_count += 1;
99
100 let receipts = find_event_and_decode_fields::<CandidateReceiptV2<H256>>(
101 &events,
102 "ParaInclusion",
103 "CandidateBacked",
104 )?;
105
106 for receipt in receipts {
107 let para_id = receipt.descriptor.para_id();
108 log::debug!("Block backed for para_id {para_id}");
109 if !valid_para_ids.contains(¶_id) {
110 return Err(anyhow!("Invalid ParaId detected: {}", para_id));
111 };
112 *(candidate_count.entry(para_id).or_default()) += 1;
113 }
114
115 if current_block_count == stop_after {
116 break;
117 }
118 }
119
120 log::info!(
121 "Reached {stop_after} finalized relay chain blocks that contain backed candidates. The per-parachain distribution is: {:#?}",
122 candidate_count.iter().map(|(para_id, count)| format!("{para_id} has {count} backed candidates")).collect::<Vec<_>>()
123 );
124
125 for (para_id, expected_candidate_range) in expected_candidate_ranges {
126 let actual = candidate_count
127 .get(¶_id)
128 .ok_or_else(|| anyhow!("ParaId did not have any backed candidates"))?;
129
130 if !expected_candidate_range.contains(actual) {
131 return Err(anyhow!(
132 "Candidate count {actual} not within range {expected_candidate_range:?}"
133 ))
134 }
135 }
136
137 Ok(())
138}
139
140pub async fn wait_for_first_session_change(
144 blocks_sub: &mut zombienet_sdk::subxt::backend::StreamOfResults<
145 Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
146 >,
147) -> Result<(), anyhow::Error> {
148 wait_for_nth_session_change(blocks_sub, 1).await
149}
150
151pub async fn wait_for_nth_session_change(
155 blocks_sub: &mut zombienet_sdk::subxt::backend::StreamOfResults<
156 Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
157 >,
158 mut sessions_to_wait: u32,
159) -> Result<(), anyhow::Error> {
160 let mut waited_block_num = 0;
161 while let Some(block) = blocks_sub.next().await {
162 let block = block?;
163 log::debug!("Finalized relay chain block {}", block.number());
164 let events = block.events().await?;
165 let is_session_change = events.iter().any(|event| {
166 event.as_ref().is_ok_and(|event| {
167 event.pallet_name() == "Session" && event.variant_name() == "NewSession"
168 })
169 });
170
171 if is_session_change {
172 sessions_to_wait -= 1;
173 if sessions_to_wait == 0 {
174 return Ok(());
175 }
176
177 waited_block_num = 0;
178 } else {
179 if waited_block_num >= WAIT_MAX_BLOCKS_FOR_SESSION {
180 return Err(anyhow::format_err!("Waited for {WAIT_MAX_BLOCKS_FOR_SESSION}, a new session should have been arrived by now."));
181 }
182
183 waited_block_num += 1;
184 }
185 }
186 Ok(())
187}
188
189pub async fn assert_finality_lag(
191 client: &OnlineClient<PolkadotConfig>,
192 maximum_lag: u32,
193) -> Result<(), anyhow::Error> {
194 let mut best_stream = client.blocks().subscribe_best().await?;
195 let mut fut_stream = client.blocks().subscribe_finalized().await?;
196 let (Some(Ok(best)), Some(Ok(finalized))) = join!(best_stream.next(), fut_stream.next()) else {
197 return Err(anyhow::format_err!("Unable to fetch best an finalized block!"));
198 };
199 let finality_lag = best.number() - finalized.number();
200
201 log::info!(
202 "Finality lagged by {finality_lag} blocks, maximum expected was {maximum_lag} blocks"
203 );
204
205 assert!(finality_lag <= maximum_lag, "Expected finality to lag by a maximum of {maximum_lag} blocks, but was lagging by {finality_lag} blocks.");
206 Ok(())
207}
208
209pub async fn assert_blocks_are_being_finalized(
211 client: &OnlineClient<PolkadotConfig>,
212) -> Result<(), anyhow::Error> {
213 let sleep_duration = Duration::from_secs(12);
214 let mut finalized_blocks = client.blocks().subscribe_finalized().await?;
215 let first_measurement = finalized_blocks
216 .next()
217 .await
218 .ok_or(anyhow::anyhow!("Can't get finalized block from stream"))??
219 .number();
220 sleep(sleep_duration).await;
221 let second_measurement = finalized_blocks
222 .next()
223 .await
224 .ok_or(anyhow::anyhow!("Can't get finalized block from stream"))??
225 .number();
226
227 log::info!(
228 "Finalized {} blocks within {sleep_duration:?}",
229 second_measurement - first_measurement
230 );
231 assert!(second_measurement > first_measurement);
232
233 Ok(())
234}
235
236pub async fn assert_relay_parent_offset(
245 relay_client: &OnlineClient<PolkadotConfig>,
246 para_client: &OnlineClient<PolkadotConfig>,
247 offset: u32,
248 block_limit: u32,
249) -> Result<(), anyhow::Error> {
250 let mut relay_block_stream = relay_client.blocks().subscribe_all().await?;
251
252 let mut para_block_stream = para_client.blocks().subscribe_all().await?.skip(1);
254 let mut highest_relay_block_seen = 0;
255 let mut num_para_blocks_seen = 0;
256 loop {
257 tokio::select! {
258 Some(Ok(relay_block)) = relay_block_stream.next() => {
259 highest_relay_block_seen = max(relay_block.number(), highest_relay_block_seen);
260 if highest_relay_block_seen > 15 && num_para_blocks_seen == 0 {
261 return Err(anyhow!("No parachain blocks produced!"))
262 }
263 },
264 Some(Ok(para_block)) = para_block_stream.next() => {
265 let logs = ¶_block.header().digest.logs;
266
267 let Some((_, relay_parent_number)): Option<(H256, u32)> = logs.iter().find_map(extract_relay_parent_storage_root) else {
268 return Err(anyhow!("No RPSR digest found in header #{}", para_block.number()));
269 };
270 log::debug!("Parachain block #{} was built on relay parent #{relay_parent_number}, highest seen was {highest_relay_block_seen}", para_block.number());
271 assert!(highest_relay_block_seen < offset || relay_parent_number <= highest_relay_block_seen.saturating_sub(offset), "Relay parent is not at the correct offset! relay_parent: #{relay_parent_number} highest_seen_relay_block: #{highest_relay_block_seen}");
272 num_para_blocks_seen += 1;
273 if num_para_blocks_seen >= block_limit {
274 log::info!("Successfully verified relay parent offset of {offset} for {num_para_blocks_seen} parachain blocks.");
275 break;
276 }
277 }
278 }
279 }
280 Ok(())
281}
282
283fn extract_relay_parent_storage_root(
285 digest: &DigestItem,
286) -> Option<(relay_chain::Hash, relay_chain::BlockNumber)> {
287 match digest {
288 DigestItem::Consensus(id, val) if id == &RPSR_CONSENSUS_ID => {
289 let (h, n): (relay_chain::Hash, Compact<relay_chain::BlockNumber>) =
290 Decode::decode(&mut &val[..]).ok()?;
291
292 Some((h, n.0))
293 },
294 _ => None,
295 }
296}
297
298pub async fn submit_extrinsic_and_wait_for_finalization_success<S: Signer<PolkadotConfig>>(
302 client: &OnlineClient<PolkadotConfig>,
303 call: &DynamicPayload,
304 signer: &S,
305) -> Result<(), anyhow::Error> {
306 let extensions = PolkadotExtrinsicParamsBuilder::new().immortal().build();
307
308 let mut tx = client
309 .tx()
310 .create_signed(call, signer, extensions)
311 .await?
312 .submit_and_watch()
313 .await?;
314
315 while let Some(status) = tx.next().await {
318 let status = status?;
319 match &status {
320 TxStatus::InBestBlock(tx_in_block) | TxStatus::InFinalizedBlock(tx_in_block) => {
321 let _result = tx_in_block.wait_for_success().await?;
322 let block_status =
323 if status.as_finalized().is_some() { "Finalized" } else { "Best" };
324 log::info!("[{}] In block: {:#?}", block_status, tx_in_block.block_hash());
325 },
326 TxStatus::Error { message } |
327 TxStatus::Invalid { message } |
328 TxStatus::Dropped { message } => {
329 return Err(anyhow::format_err!("Error submitting tx: {message}"));
330 },
331 _ => continue,
332 }
333 }
334 Ok(())
335}
336
337pub async fn submit_extrinsic_and_wait_for_finalization_success_with_timeout<
342 S: Signer<PolkadotConfig>,
343>(
344 client: &OnlineClient<PolkadotConfig>,
345 call: &DynamicPayload,
346 signer: &S,
347 timeout_secs: impl Into<u64>,
348) -> Result<(), anyhow::Error> {
349 let secs = timeout_secs.into();
350 let res = tokio::time::timeout(
351 Duration::from_secs(secs),
352 submit_extrinsic_and_wait_for_finalization_success(client, call, signer),
353 )
354 .await;
355
356 match res {
357 Ok(Ok(_)) => Ok(()),
358 Ok(Err(e)) => Err(anyhow!("Error waiting for metric: {}", e)),
359 Err(_) => Err(anyhow!("Timeout ({secs}), waiting for extrinsic finalization")),
361 }
362}
363
364pub async fn assert_para_is_registered(
366 relay_client: &OnlineClient<PolkadotConfig>,
367 para_id: ParaId,
368 blocks_to_wait: u32,
369) -> Result<(), anyhow::Error> {
370 let mut blocks_sub = relay_client.blocks().subscribe_all().await?;
371 let para_id: u32 = para_id.into();
372
373 let keys: Vec<Value> = vec![];
374 let query = subxt::dynamic::storage("Paras", "Parachains", keys);
375
376 let mut blocks_cnt = 0;
377 while let Some(block) = blocks_sub.next().await {
378 let block = block?;
379 log::debug!("Relay block #{}, checking if para_id {para_id} is registered", block.number(),);
380 let parachains = block.storage().fetch(&query).await?;
381
382 let parachains: Vec<u32> = match parachains {
383 Some(parachains) => parachains.as_type()?,
384 None => vec![],
385 };
386
387 log::debug!("Registered para_ids: {:?}", parachains);
388
389 if parachains.iter().any(|p| para_id.eq(p)) {
390 log::debug!("para_id {para_id} registered");
391 return Ok(());
392 }
393 if blocks_cnt >= blocks_to_wait {
394 return Err(anyhow!(
395 "Parachain {para_id} not registered within {blocks_to_wait} blocks"
396 ));
397 }
398 blocks_cnt += 1;
399 }
400
401 Err(anyhow!("No more blocks to check"))
402}