referrerpolicy=no-referrer-when-downgrade

cumulus_zombienet_sdk_helpers/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::anyhow;
5use codec::{Compact, Decode};
6use cumulus_primitives_core::{relay_chain, rpsr_digest::RPSR_CONSENSUS_ID};
7use futures::stream::StreamExt;
8use polkadot_primitives::{CandidateReceiptV2, Id as ParaId};
9use std::{cmp::max, collections::HashMap, ops::Range};
10use tokio::{
11	join,
12	time::{sleep, Duration},
13};
14use zombienet_sdk::subxt::{
15	self,
16	blocks::Block,
17	config::{polkadot::PolkadotExtrinsicParamsBuilder, substrate::DigestItem},
18	dynamic::Value,
19	events::Events,
20	ext::scale_value::value,
21	tx::{signer::Signer, DynamicPayload, TxStatus},
22	utils::H256,
23	OnlineClient, PolkadotConfig,
24};
25
26// Maximum number of blocks to wait for a session change.
27// If it does not arrive for whatever reason, we should not wait forever.
28const WAIT_MAX_BLOCKS_FOR_SESSION: u32 = 50;
29
30/// Create a batch call to assign cores to a parachain.
31pub fn create_assign_core_call(core_and_para: &[(u32, u32)]) -> DynamicPayload {
32	let mut assign_cores = vec![];
33	for (core, para_id) in core_and_para.iter() {
34		assign_cores.push(value! {
35			Coretime(assign_core { core : *core, begin: 0, assignment: ((Task(*para_id), 57600)), end_hint: None() })
36		});
37	}
38
39	zombienet_sdk::subxt::tx::dynamic(
40		"Sudo",
41		"sudo",
42		vec![value! {
43			Utility(batch { calls: assign_cores })
44		}],
45	)
46}
47
48/// Find an event in subxt `Events` and attempt to decode the fields fo the event.
49fn find_event_and_decode_fields<T: Decode>(
50	events: &Events<PolkadotConfig>,
51	pallet: &str,
52	variant: &str,
53) -> Result<Vec<T>, anyhow::Error> {
54	let mut result = vec![];
55	for event in events.iter() {
56		let event = event?;
57		if event.pallet_name() == pallet && event.variant_name() == variant {
58			let field_bytes = event.field_bytes().to_vec();
59			result.push(T::decode(&mut &field_bytes[..])?);
60		}
61	}
62	Ok(result)
63}
64
65// Helper function for asserting the throughput of parachains, after the first session change.
66//
67// The throughput is measured as total number of backed candidates in a window of relay chain
68// blocks. Relay chain blocks with session changes are generally ignores.
69pub async fn assert_para_throughput(
70	relay_client: &OnlineClient<PolkadotConfig>,
71	stop_after: u32,
72	expected_candidate_ranges: HashMap<ParaId, Range<u32>>,
73) -> Result<(), anyhow::Error> {
74	let mut blocks_sub = relay_client.blocks().subscribe_finalized().await?;
75	let mut candidate_count: HashMap<ParaId, u32> = HashMap::new();
76	let mut current_block_count = 0;
77
78	let valid_para_ids: Vec<ParaId> = expected_candidate_ranges.keys().cloned().collect();
79
80	// Wait for the first session, block production on the parachain will start after that.
81	wait_for_first_session_change(&mut blocks_sub).await?;
82
83	while let Some(block) = blocks_sub.next().await {
84		let block = block?;
85		log::debug!("Finalized relay chain block {}", block.number());
86		let events = block.events().await?;
87		let is_session_change = events.iter().any(|event| {
88			event.as_ref().is_ok_and(|event| {
89				event.pallet_name() == "Session" && event.variant_name() == "NewSession"
90			})
91		});
92
93		// Do not count blocks with session changes, no backed blocks there.
94		if is_session_change {
95			continue;
96		}
97
98		current_block_count += 1;
99
100		let receipts = find_event_and_decode_fields::<CandidateReceiptV2<H256>>(
101			&events,
102			"ParaInclusion",
103			"CandidateBacked",
104		)?;
105
106		for receipt in receipts {
107			let para_id = receipt.descriptor.para_id();
108			log::debug!("Block backed for para_id {para_id}");
109			if !valid_para_ids.contains(&para_id) {
110				return Err(anyhow!("Invalid ParaId detected: {}", para_id));
111			};
112			*(candidate_count.entry(para_id).or_default()) += 1;
113		}
114
115		if current_block_count == stop_after {
116			break;
117		}
118	}
119
120	log::info!(
121		"Reached {stop_after} finalized relay chain blocks that contain backed candidates. The per-parachain distribution is: {:#?}",
122		candidate_count.iter().map(|(para_id, count)| format!("{para_id} has {count} backed candidates")).collect::<Vec<_>>()
123	);
124
125	for (para_id, expected_candidate_range) in expected_candidate_ranges {
126		let actual = candidate_count
127			.get(&para_id)
128			.ok_or_else(|| anyhow!("ParaId did not have any backed candidates"))?;
129
130		if !expected_candidate_range.contains(actual) {
131			return Err(anyhow!(
132				"Candidate count {actual} not within range {expected_candidate_range:?}"
133			))
134		}
135	}
136
137	Ok(())
138}
139
140/// Wait for the first block with a session change.
141///
142/// The session change is detected by inspecting the events in the block.
143pub async fn wait_for_first_session_change(
144	blocks_sub: &mut zombienet_sdk::subxt::backend::StreamOfResults<
145		Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
146	>,
147) -> Result<(), anyhow::Error> {
148	wait_for_nth_session_change(blocks_sub, 1).await
149}
150
151/// Wait for the first block with the Nth session change.
152///
153/// The session change is detected by inspecting the events in the block.
154pub async fn wait_for_nth_session_change(
155	blocks_sub: &mut zombienet_sdk::subxt::backend::StreamOfResults<
156		Block<PolkadotConfig, OnlineClient<PolkadotConfig>>,
157	>,
158	mut sessions_to_wait: u32,
159) -> Result<(), anyhow::Error> {
160	let mut waited_block_num = 0;
161	while let Some(block) = blocks_sub.next().await {
162		let block = block?;
163		log::debug!("Finalized relay chain block {}", block.number());
164		let events = block.events().await?;
165		let is_session_change = events.iter().any(|event| {
166			event.as_ref().is_ok_and(|event| {
167				event.pallet_name() == "Session" && event.variant_name() == "NewSession"
168			})
169		});
170
171		if is_session_change {
172			sessions_to_wait -= 1;
173			if sessions_to_wait == 0 {
174				return Ok(());
175			}
176
177			waited_block_num = 0;
178		} else {
179			if waited_block_num >= WAIT_MAX_BLOCKS_FOR_SESSION {
180				return Err(anyhow::format_err!("Waited for {WAIT_MAX_BLOCKS_FOR_SESSION}, a new session should have been arrived by now."));
181			}
182
183			waited_block_num += 1;
184		}
185	}
186	Ok(())
187}
188
189// Helper function that asserts the maximum finality lag.
190pub async fn assert_finality_lag(
191	client: &OnlineClient<PolkadotConfig>,
192	maximum_lag: u32,
193) -> Result<(), anyhow::Error> {
194	let mut best_stream = client.blocks().subscribe_best().await?;
195	let mut fut_stream = client.blocks().subscribe_finalized().await?;
196	let (Some(Ok(best)), Some(Ok(finalized))) = join!(best_stream.next(), fut_stream.next()) else {
197		return Err(anyhow::format_err!("Unable to fetch best an finalized block!"));
198	};
199	let finality_lag = best.number() - finalized.number();
200
201	log::info!(
202		"Finality lagged by {finality_lag} blocks, maximum expected was {maximum_lag} blocks"
203	);
204
205	assert!(finality_lag <= maximum_lag, "Expected finality to lag by a maximum of {maximum_lag} blocks, but was lagging by {finality_lag} blocks.");
206	Ok(())
207}
208
209/// Assert that finality has not stalled.
210pub async fn assert_blocks_are_being_finalized(
211	client: &OnlineClient<PolkadotConfig>,
212) -> Result<(), anyhow::Error> {
213	let sleep_duration = Duration::from_secs(12);
214	let mut finalized_blocks = client.blocks().subscribe_finalized().await?;
215	let first_measurement = finalized_blocks
216		.next()
217		.await
218		.ok_or(anyhow::anyhow!("Can't get finalized block from stream"))??
219		.number();
220	sleep(sleep_duration).await;
221	let second_measurement = finalized_blocks
222		.next()
223		.await
224		.ok_or(anyhow::anyhow!("Can't get finalized block from stream"))??
225		.number();
226
227	log::info!(
228		"Finalized {} blocks within {sleep_duration:?}",
229		second_measurement - first_measurement
230	);
231	assert!(second_measurement > first_measurement);
232
233	Ok(())
234}
235
236/// Asserts that parachain blocks have the correct relay parent offset.
237///
238/// # Arguments
239///
240/// * `relay_client` - Client connected to a relay chain node
241/// * `para_client` - Client connected to a parachain node
242/// * `offset` - Expected minimum offset between relay parent and highest seen relay block
243/// * `block_limit` - Number of parachain blocks to verify before completing
244pub async fn assert_relay_parent_offset(
245	relay_client: &OnlineClient<PolkadotConfig>,
246	para_client: &OnlineClient<PolkadotConfig>,
247	offset: u32,
248	block_limit: u32,
249) -> Result<(), anyhow::Error> {
250	let mut relay_block_stream = relay_client.blocks().subscribe_all().await?;
251
252	// First parachain header #0 does not contains RSPR digest item.
253	let mut para_block_stream = para_client.blocks().subscribe_all().await?.skip(1);
254	let mut highest_relay_block_seen = 0;
255	let mut num_para_blocks_seen = 0;
256	loop {
257		tokio::select! {
258			Some(Ok(relay_block)) = relay_block_stream.next() => {
259				highest_relay_block_seen = max(relay_block.number(), highest_relay_block_seen);
260				if highest_relay_block_seen > 15 && num_para_blocks_seen == 0 {
261					return Err(anyhow!("No parachain blocks produced!"))
262				}
263			},
264			Some(Ok(para_block)) = para_block_stream.next() => {
265				let logs = &para_block.header().digest.logs;
266
267				let Some((_, relay_parent_number)): Option<(H256, u32)> = logs.iter().find_map(extract_relay_parent_storage_root) else {
268					return Err(anyhow!("No RPSR digest found in header #{}", para_block.number()));
269				};
270				log::debug!("Parachain block #{} was built on relay parent #{relay_parent_number}, highest seen was {highest_relay_block_seen}", para_block.number());
271				assert!(highest_relay_block_seen < offset || relay_parent_number <= highest_relay_block_seen.saturating_sub(offset), "Relay parent is not at the correct offset! relay_parent: #{relay_parent_number} highest_seen_relay_block: #{highest_relay_block_seen}");
272				num_para_blocks_seen += 1;
273				if num_para_blocks_seen >= block_limit {
274					log::info!("Successfully verified relay parent offset of {offset} for {num_para_blocks_seen} parachain blocks.");
275					break;
276				}
277			}
278		}
279	}
280	Ok(())
281}
282
283/// Extract relay parent information from the digest logs.
284fn extract_relay_parent_storage_root(
285	digest: &DigestItem,
286) -> Option<(relay_chain::Hash, relay_chain::BlockNumber)> {
287	match digest {
288		DigestItem::Consensus(id, val) if id == &RPSR_CONSENSUS_ID => {
289			let (h, n): (relay_chain::Hash, Compact<relay_chain::BlockNumber>) =
290				Decode::decode(&mut &val[..]).ok()?;
291
292			Some((h, n.0))
293		},
294		_ => None,
295	}
296}
297
298/// Submits the given `call` as transaction and waits for it successful finalization.
299///
300/// The transaction is send as immortal transaction.
301pub async fn submit_extrinsic_and_wait_for_finalization_success<S: Signer<PolkadotConfig>>(
302	client: &OnlineClient<PolkadotConfig>,
303	call: &DynamicPayload,
304	signer: &S,
305) -> Result<(), anyhow::Error> {
306	let extensions = PolkadotExtrinsicParamsBuilder::new().immortal().build();
307
308	let mut tx = client
309		.tx()
310		.create_signed(call, signer, extensions)
311		.await?
312		.submit_and_watch()
313		.await?;
314
315	// Below we use the low level API to replicate the `wait_for_in_block` behaviour
316	// which was removed in subxt 0.33.0. See https://github.com/paritytech/subxt/pull/1237.
317	while let Some(status) = tx.next().await {
318		let status = status?;
319		match &status {
320			TxStatus::InBestBlock(tx_in_block) | TxStatus::InFinalizedBlock(tx_in_block) => {
321				let _result = tx_in_block.wait_for_success().await?;
322				let block_status =
323					if status.as_finalized().is_some() { "Finalized" } else { "Best" };
324				log::info!("[{}] In block: {:#?}", block_status, tx_in_block.block_hash());
325			},
326			TxStatus::Error { message } |
327			TxStatus::Invalid { message } |
328			TxStatus::Dropped { message } => {
329				return Err(anyhow::format_err!("Error submitting tx: {message}"));
330			},
331			_ => continue,
332		}
333	}
334	Ok(())
335}
336
337/// Submits the given `call` as transaction and waits `timeout_secs` for it successful finalization.
338///
339/// If the transaction does not reach the finalized state in `timeout_secs` an error is returned.
340/// The transaction is send as immortal transaction.
341pub async fn submit_extrinsic_and_wait_for_finalization_success_with_timeout<
342	S: Signer<PolkadotConfig>,
343>(
344	client: &OnlineClient<PolkadotConfig>,
345	call: &DynamicPayload,
346	signer: &S,
347	timeout_secs: impl Into<u64>,
348) -> Result<(), anyhow::Error> {
349	let secs = timeout_secs.into();
350	let res = tokio::time::timeout(
351		Duration::from_secs(secs),
352		submit_extrinsic_and_wait_for_finalization_success(client, call, signer),
353	)
354	.await;
355
356	match res {
357		Ok(Ok(_)) => Ok(()),
358		Ok(Err(e)) => Err(anyhow!("Error waiting for metric: {}", e)),
359		// timeout
360		Err(_) => Err(anyhow!("Timeout ({secs}), waiting for extrinsic finalization")),
361	}
362}
363
364/// Asserts that the given `para_id` is registered at the relay chain.
365pub async fn assert_para_is_registered(
366	relay_client: &OnlineClient<PolkadotConfig>,
367	para_id: ParaId,
368	blocks_to_wait: u32,
369) -> Result<(), anyhow::Error> {
370	let mut blocks_sub = relay_client.blocks().subscribe_all().await?;
371	let para_id: u32 = para_id.into();
372
373	let keys: Vec<Value> = vec![];
374	let query = subxt::dynamic::storage("Paras", "Parachains", keys);
375
376	let mut blocks_cnt = 0;
377	while let Some(block) = blocks_sub.next().await {
378		let block = block?;
379		log::debug!("Relay block #{}, checking if para_id {para_id} is registered", block.number(),);
380		let parachains = block.storage().fetch(&query).await?;
381
382		let parachains: Vec<u32> = match parachains {
383			Some(parachains) => parachains.as_type()?,
384			None => vec![],
385		};
386
387		log::debug!("Registered para_ids: {:?}", parachains);
388
389		if parachains.iter().any(|p| para_id.eq(p)) {
390			log::debug!("para_id {para_id} registered");
391			return Ok(());
392		}
393		if blocks_cnt >= blocks_to_wait {
394			return Err(anyhow!(
395				"Parachain {para_id} not registered within {blocks_to_wait} blocks"
396			));
397		}
398		blocks_cnt += 1;
399	}
400
401	Err(anyhow!("No more blocks to check"))
402}