referrerpolicy=no-referrer-when-downgrade

test_parachain_undying_collator/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Collator for the `Undying` test parachain.
18
19use codec::{Decode, Encode};
20use futures::{channel::oneshot, StreamExt};
21use futures_timer::Delay;
22use polkadot_cli::ProvideRuntimeApi;
23use polkadot_node_primitives::{
24	maybe_compress_pov, AvailableData, Collation, CollationResult, CollationSecondedSignal,
25	CollatorFn, MaybeCompressedPoV, PoV, Statement, UpwardMessages,
26};
27use polkadot_node_subsystem::messages::CollatorProtocolMessage;
28use polkadot_primitives::{
29	CandidateCommitments, CandidateDescriptorV2, CandidateReceiptV2, ClaimQueueOffset, CollatorId,
30	CollatorPair, CoreIndex, Hash, Id as ParaId, OccupiedCoreAssumption,
31	DEFAULT_CLAIM_QUEUE_OFFSET,
32};
33use polkadot_service::{Handle, NewFull, ParachainHost};
34use sc_client_api::client::BlockchainEvents;
35use sp_core::Pair;
36
37use std::{
38	collections::HashMap,
39	sync::{
40		atomic::{AtomicU32, Ordering},
41		Arc, Mutex,
42	},
43	time::Duration,
44};
45use test_parachain_undying::{
46	execute, hash_state, BlockData, GraveyardState, HeadData, StateMismatch,
47};
48
49pub const LOG_TARGET: &str = "parachain::undying-collator";
50
51/// Default PoV size which also drives state size.
52const DEFAULT_POV_SIZE: usize = 1000;
53/// Default PVF time complexity - 1 signature per block.
54const DEFAULT_PVF_COMPLEXITY: u32 = 1;
55
56/// Calculates the head and state for the block with the given `number`.
57fn calculate_head_and_state_for_number(
58	number: u64,
59	graveyard_size: usize,
60	pvf_complexity: u32,
61	experimental_send_approved_peer: bool,
62) -> Result<(HeadData, GraveyardState), StateMismatch> {
63	let index = 0u64;
64	let mut graveyard = vec![0u8; graveyard_size * graveyard_size];
65	let zombies = 0;
66	let seal = [0u8; 32];
67	let core_selector_number = 0;
68
69	// Ensure a larger compressed PoV.
70	graveyard.iter_mut().enumerate().for_each(|(i, grave)| {
71		*grave = i as u8;
72	});
73
74	let mut state = GraveyardState { index, graveyard, zombies, seal, core_selector_number };
75	let mut head =
76		HeadData { number: 0, parent_hash: Hash::default().into(), post_state: hash_state(&state) };
77
78	while head.number < number {
79		let block = BlockData {
80			state,
81			tombstones: 1_000,
82			iterations: pvf_complexity,
83			experimental_send_approved_peer,
84		};
85		let (new_head, new_state, _) = execute(head.hash(), head.clone(), block)?;
86		head = new_head;
87		state = new_state;
88	}
89
90	Ok((head, state))
91}
92
93/// The state of the undying parachain.
94struct State {
95	// We need to keep these around until the including relay chain blocks are finalized.
96	// This is because disputes can trigger reverts up to last finalized block, so we
97	// want that state to collate on older relay chain heads.
98	head_to_state: HashMap<Arc<HeadData>, GraveyardState>,
99	number_to_head: HashMap<u64, Arc<HeadData>>,
100	/// Block number of the best block.
101	best_block: u64,
102	/// PVF time complexity.
103	pvf_complexity: u32,
104	/// Defines the state size (Vec<u8>). Our PoV includes the entire state so this value will
105	/// drive the PoV size.
106	/// Important note: block execution heavily clones this state, so something like 300.000 is
107	/// the max value here, otherwise we'll get OOM during wasm execution.
108	/// TODO: Implement a static state, and use `ballast` to inflate the PoV size. This way
109	/// we can just discard the `ballast` before processing the block.
110	graveyard_size: usize,
111	experimental_send_approved_peer: bool,
112}
113
114impl State {
115	/// Init the genesis state.
116	fn genesis(
117		graveyard_size: usize,
118		pvf_complexity: u32,
119		experimental_send_approved_peer: bool,
120	) -> Self {
121		let index = 0u64;
122		let mut graveyard = vec![0u8; graveyard_size * graveyard_size];
123		let zombies = 0;
124		let seal = [0u8; 32];
125		let core_selector_number = 0;
126
127		// Ensure a larger compressed PoV.
128		graveyard.iter_mut().enumerate().for_each(|(i, grave)| {
129			*grave = i as u8;
130		});
131
132		let state = GraveyardState { index, graveyard, zombies, seal, core_selector_number };
133
134		let head_data =
135			HeadData { number: 0, parent_hash: Default::default(), post_state: hash_state(&state) };
136		let head_data = Arc::new(head_data);
137
138		Self {
139			head_to_state: vec![(head_data.clone(), state.clone())].into_iter().collect(),
140			number_to_head: vec![(0, head_data)].into_iter().collect(),
141			best_block: 0,
142			pvf_complexity,
143			graveyard_size,
144			experimental_send_approved_peer,
145		}
146	}
147
148	/// Advance the state and produce a new block based on the given `parent_head`.
149	///
150	/// Returns the new [`BlockData`] and the new [`HeadData`].
151	fn advance(
152		&mut self,
153		parent_head: HeadData,
154	) -> Result<(BlockData, HeadData, UpwardMessages), StateMismatch> {
155		self.best_block = parent_head.number;
156
157		let state = if let Some(state) = self
158			.number_to_head
159			.get(&self.best_block)
160			.and_then(|head_data| self.head_to_state.get(head_data).cloned())
161		{
162			state
163		} else {
164			let (_, state) = calculate_head_and_state_for_number(
165				parent_head.number,
166				self.graveyard_size,
167				self.pvf_complexity,
168				self.experimental_send_approved_peer,
169			)?;
170			state
171		};
172
173		// Start with prev state and transaction to execute (place 1000 tombstones).
174		let block = BlockData {
175			state,
176			tombstones: 1000,
177			iterations: self.pvf_complexity,
178			experimental_send_approved_peer: self.experimental_send_approved_peer,
179		};
180
181		let (new_head, new_state, upward_messages) =
182			execute(parent_head.hash(), parent_head, block.clone())?;
183
184		let new_head_arc = Arc::new(new_head.clone());
185
186		self.head_to_state.insert(new_head_arc.clone(), new_state);
187		self.number_to_head.insert(new_head.number, new_head_arc);
188
189		Ok((block, new_head, upward_messages))
190	}
191}
192
193/// The collator of the undying parachain.
194pub struct Collator {
195	state: Arc<Mutex<State>>,
196	key: CollatorPair,
197	seconded_collations: Arc<AtomicU32>,
198}
199
200impl Default for Collator {
201	fn default() -> Self {
202		Self::new(DEFAULT_POV_SIZE, DEFAULT_PVF_COMPLEXITY, false)
203	}
204}
205
206impl Collator {
207	/// Create a new collator instance with the state initialized from genesis and `pov_size`
208	/// parameter. The same parameter needs to be passed when exporting the genesis state.
209	pub fn new(
210		pov_size: usize,
211		pvf_complexity: u32,
212		experimental_send_approved_peer: bool,
213	) -> Self {
214		let graveyard_size = ((pov_size / std::mem::size_of::<u8>()) as f64).sqrt().ceil() as usize;
215
216		log::info!(
217			target: LOG_TARGET,
218			"PoV target size: {} bytes. Graveyard size: ({} x {})",
219			pov_size,
220			graveyard_size,
221			graveyard_size,
222		);
223
224		log::info!(
225			target: LOG_TARGET,
226			"PVF time complexity: {}",
227			pvf_complexity,
228		);
229
230		Self {
231			state: Arc::new(Mutex::new(State::genesis(
232				graveyard_size,
233				pvf_complexity,
234				experimental_send_approved_peer,
235			))),
236			key: CollatorPair::generate().0,
237			seconded_collations: Arc::new(AtomicU32::new(0)),
238		}
239	}
240
241	/// Get the SCALE encoded genesis head of the parachain.
242	pub fn genesis_head(&self) -> Vec<u8> {
243		self.state
244			.lock()
245			.unwrap()
246			.number_to_head
247			.get(&0)
248			.expect("Genesis header exists")
249			.encode()
250	}
251
252	/// Get the validation code of the undying parachain.
253	pub fn validation_code(&self) -> &[u8] {
254		test_parachain_undying::wasm_binary_unwrap()
255	}
256
257	/// Get the collator key.
258	pub fn collator_key(&self) -> CollatorPair {
259		self.key.clone()
260	}
261
262	/// Get the collator id.
263	pub fn collator_id(&self) -> CollatorId {
264		self.key.public()
265	}
266
267	/// Create the collation function.
268	///
269	/// This collation function can be plugged into the overseer to generate collations for the
270	/// undying parachain.
271	pub fn create_collation_function(
272		&self,
273		spawner: impl SpawnNamed + Clone + 'static,
274	) -> CollatorFn {
275		use futures::FutureExt as _;
276
277		let state = self.state.clone();
278		let seconded_collations = self.seconded_collations.clone();
279
280		Box::new(move |relay_parent, validation_data| {
281			let parent = match HeadData::decode(&mut &validation_data.parent_head.0[..]) {
282				Err(err) => {
283					log::error!(
284						target: LOG_TARGET,
285						"Requested to build on top of malformed head-data: {:?}",
286						err,
287					);
288					return futures::future::ready(None).boxed()
289				},
290				Ok(p) => p,
291			};
292
293			let (block_data, head_data, upward_messages) =
294				match state.lock().unwrap().advance(parent.clone()) {
295					Err(err) => {
296						log::error!(
297							target: LOG_TARGET,
298							"Unable to build on top of {:?}: {:?}",
299							parent,
300							err,
301						);
302						return futures::future::ready(None).boxed()
303					},
304					Ok(x) => x,
305				};
306
307			log::info!(
308				target: LOG_TARGET,
309				"created a new collation on relay-parent({}): {:?}",
310				relay_parent,
311				head_data,
312			);
313
314			// The pov is the actually the initial state and the transactions.
315			let pov = PoV { block_data: block_data.encode().into() };
316
317			let collation = Collation {
318				upward_messages,
319				horizontal_messages: Default::default(),
320				new_validation_code: None,
321				head_data: head_data.encode().into(),
322				proof_of_validity: MaybeCompressedPoV::Raw(pov.clone()),
323				processed_downward_messages: 0,
324				hrmp_watermark: validation_data.relay_parent_number,
325			};
326
327			log::info!(
328				target: LOG_TARGET,
329				"Raw PoV size for collation: {} bytes",
330				pov.block_data.0.len(),
331			);
332			let compressed_pov = maybe_compress_pov(pov);
333
334			log::info!(
335				target: LOG_TARGET,
336				"Compressed PoV size for collation: {} bytes",
337				compressed_pov.block_data.0.len(),
338			);
339
340			let (result_sender, recv) = oneshot::channel::<CollationSecondedSignal>();
341			let seconded_collations = seconded_collations.clone();
342			spawner.spawn(
343				"undying-collator-seconded",
344				None,
345				async move {
346					if let Ok(res) = recv.await {
347						if !matches!(
348							res.statement.payload(),
349							Statement::Seconded(s) if s.descriptor.pov_hash() == compressed_pov.hash(),
350						) {
351							log::error!(
352								target: LOG_TARGET,
353								"Seconded statement should match our collation: {:?}",
354								res.statement.payload(),
355							);
356						}
357
358						seconded_collations.fetch_add(1, Ordering::Relaxed);
359					}
360				}
361				.boxed(),
362			);
363
364			async move { Some(CollationResult { collation, result_sender: Some(result_sender) }) }
365				.boxed()
366		})
367	}
368
369	/// Wait until `blocks` are built and enacted.
370	pub async fn wait_for_blocks(&self, blocks: u64) {
371		let start_block = self.state.lock().unwrap().best_block;
372		loop {
373			Delay::new(Duration::from_secs(1)).await;
374
375			let current_block = self.state.lock().unwrap().best_block;
376
377			if start_block + blocks <= current_block {
378				return
379			}
380		}
381	}
382
383	/// Wait until `seconded` collations of this collator are seconded by a parachain validator.
384	///
385	/// The internal counter isn't de-duplicating the collations when counting the number of
386	/// seconded collations. This means when one collation is seconded by X validators, we record X
387	/// seconded messages.
388	pub async fn wait_for_seconded_collations(&self, seconded: u32) {
389		let seconded_collations = self.seconded_collations.clone();
390		loop {
391			Delay::new(Duration::from_secs(1)).await;
392
393			if seconded <= seconded_collations.load(Ordering::Relaxed) {
394				return
395			}
396		}
397	}
398
399	pub fn send_same_collations_to_all_assigned_cores(
400		&self,
401		full_node: &NewFull,
402		mut overseer_handle: Handle,
403		para_id: ParaId,
404	) {
405		let client = full_node.client.clone();
406
407		let collation_function =
408			self.create_collation_function(full_node.task_manager.spawn_handle());
409
410		full_node
411			.task_manager
412			.spawn_handle()
413			.spawn("malus-undying-collator", None, async move {
414				// Subscribe to relay chain block import notifications. In each iteration, build a
415				// collation in response to a block import notification and submits it to all cores
416				// assigned to the parachain.
417				let mut import_notifications = client.import_notification_stream();
418
419				while let Some(notification) = import_notifications.next().await {
420					let relay_parent = notification.hash;
421
422					// Get the list of cores assigned to the parachain.
423					let claim_queue = match client.runtime_api().claim_queue(relay_parent) {
424						Ok(claim_queue) => claim_queue,
425						Err(error) => {
426							log::error!(
427								target: LOG_TARGET,
428								"Failed to query claim queue runtime API: {error:?}",
429							);
430							continue;
431						},
432					};
433
434					let claim_queue_offset = ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET);
435
436					let scheduled_cores: Vec<CoreIndex> = claim_queue
437						.iter()
438						.filter_map(move |(core_index, paras)| {
439							paras.get(claim_queue_offset.0 as usize).and_then(|core_para_id| {
440								(core_para_id == &para_id).then_some(*core_index)
441							})
442						})
443						.collect();
444
445					if scheduled_cores.is_empty() {
446						log::info!(
447							target: LOG_TARGET,
448							"Scheduled cores is empty.",
449						);
450						continue;
451					}
452
453					if scheduled_cores.len() == 1 {
454						log::info!(
455							target: LOG_TARGET,
456							"Malus collator configured with duplicate collations, but only 1 core assigned. \
457							Collator will not do anything malicious.",
458						);
459					}
460
461					// Fetch validation data for the collation.
462					let validation_data = match client.runtime_api().persisted_validation_data(
463						relay_parent,
464						para_id,
465						OccupiedCoreAssumption::Included,
466					) {
467						Ok(Some(validation_data)) => validation_data,
468						Ok(None) => {
469							log::info!(
470								target: LOG_TARGET,
471								"Persisted validation data is None.",
472							);
473							continue;
474						},
475						Err(error) => {
476							log::error!(
477								target: LOG_TARGET,
478								"Failed to query persisted validation data runtime API: {error:?}",
479							);
480							continue;
481						},
482					};
483
484					// Generate the collation.
485					let collation =
486						match collation_function(relay_parent, &validation_data).await {
487							Some(collation) => collation,
488							None => {
489								log::info!(
490									target: LOG_TARGET,
491									"Collation result is None.",
492								);
493								continue;
494							},
495						}
496						.collation;
497
498					// Fetch the validation code hash.
499					let validation_code_hash = match client.runtime_api().validation_code_hash(
500						relay_parent,
501						para_id,
502						OccupiedCoreAssumption::Included,
503					) {
504						Ok(Some(validation_code_hash)) => validation_code_hash,
505						Ok(None) => {
506							log::info!(
507								target: LOG_TARGET,
508								"Validation code hash is None.",
509							);
510							continue;
511						},
512						Err(error) => {
513							log::error!(
514								target: LOG_TARGET,
515								"Failed to query validation code hash runtime API: {error:?}",
516							);
517							continue;
518						},
519					};
520
521					// Fetch the session index.
522					let session_index =
523						match client.runtime_api().session_index_for_child(relay_parent) {
524							Ok(session_index) => session_index,
525							Err(error) => {
526								log::error!(
527									target: LOG_TARGET,
528									"Failed to query session index for child runtime API: {error:?}",
529								);
530								continue;
531							},
532						};
533
534					let persisted_validation_data_hash = validation_data.hash();
535					let parent_head_data = validation_data.parent_head.clone();
536					let parent_head_data_hash = validation_data.parent_head.hash();
537
538					// Apply compression to the block data.
539					let pov = {
540						let pov = collation.proof_of_validity.into_compressed();
541						let encoded_size = pov.encoded_size();
542						let max_pov_size = validation_data.max_pov_size as usize;
543
544						// As long as `POV_BOMB_LIMIT` is at least `max_pov_size`, this ensures
545						// that honest collators never produce a PoV which is uncompressed.
546						//
547						// As such, honest collators never produce an uncompressed PoV which starts
548						// with a compression magic number, which would lead validators to
549						// reject the collation.
550						if encoded_size > max_pov_size {
551							log::error!(
552								target: LOG_TARGET,
553								"PoV size {encoded_size} exceeded maximum size of {max_pov_size}",
554							);
555							continue;
556						}
557
558						pov
559					};
560
561					let pov_hash = pov.hash();
562
563					// Fetch the session info.
564					let session_info =
565						match client.runtime_api().session_info(relay_parent, session_index) {
566							Ok(Some(session_info)) => session_info,
567							Ok(None) => {
568								log::info!(
569									target: LOG_TARGET,
570									"Session info is None.",
571								);
572								continue;
573							},
574							Err(error) => {
575								log::error!(
576									target: LOG_TARGET,
577									"Failed to query session info runtime API: {error:?}",
578								);
579								continue;
580							},
581						};
582
583					let n_validators = session_info.validators.len();
584
585					let available_data =
586						AvailableData { validation_data, pov: Arc::new(pov.clone()) };
587					let chunks = match polkadot_erasure_coding::obtain_chunks_v1(
588						n_validators,
589						&available_data,
590					) {
591						Ok(chunks) => chunks,
592						Err(error) => {
593							log::error!(
594								target: LOG_TARGET,
595								"Failed to obtain chunks v1: {error:?}",
596							);
597							continue;
598						},
599					};
600					let erasure_root = polkadot_erasure_coding::branches(&chunks).root();
601
602					let commitments = CandidateCommitments {
603						upward_messages: collation.upward_messages,
604						horizontal_messages: collation.horizontal_messages,
605						new_validation_code: collation.new_validation_code,
606						head_data: collation.head_data,
607						processed_downward_messages: collation.processed_downward_messages,
608						hrmp_watermark: collation.hrmp_watermark,
609					};
610
611					// Submit the same collation to all assigned cores.
612					for core_index in &scheduled_cores {
613						let candidate_receipt = CandidateReceiptV2 {
614							descriptor: CandidateDescriptorV2::new(
615								para_id,
616								relay_parent,
617								*core_index,
618								session_index,
619								persisted_validation_data_hash,
620								pov_hash,
621								erasure_root,
622								commitments.head_data.hash(),
623								validation_code_hash,
624							),
625							commitments_hash: commitments.hash(),
626						};
627
628						// We cannot use SubmitCollation here because it includes an additional
629						// check for the core index by calling `parse_ump_signals`. This check
630						// enforces that the parachain always selects the correct core by comparing
631						// the descriptor and commitments core indexes. To bypass this check, we are
632						// simulating the behavior of SubmitCollation while skipping ump signals
633						// validation.
634						overseer_handle
635							.send_msg(
636								CollatorProtocolMessage::DistributeCollation {
637									candidate_receipt,
638									parent_head_data_hash,
639									pov: pov.clone(),
640									parent_head_data: parent_head_data.clone(),
641									result_sender: None,
642									core_index: *core_index,
643								},
644								"Collator",
645							)
646							.await;
647					}
648				}
649			});
650	}
651}
652
653use sp_core::traits::SpawnNamed;
654
655#[cfg(test)]
656mod tests {
657	use super::*;
658	use futures::executor::block_on;
659	use polkadot_parachain_primitives::primitives::{ValidationParams, ValidationResult};
660	use polkadot_primitives::{Hash, PersistedValidationData};
661
662	#[test]
663	fn collator_works() {
664		let spawner = sp_core::testing::TaskExecutor::new();
665		let collator = Collator::new(1_000, 1, false);
666		let collation_function = collator.create_collation_function(spawner);
667
668		for i in 0..5 {
669			let parent_head =
670				collator.state.lock().unwrap().number_to_head.get(&i).unwrap().clone();
671
672			let validation_data = PersistedValidationData {
673				parent_head: parent_head.encode().into(),
674				..Default::default()
675			};
676
677			let collation =
678				block_on(collation_function(Default::default(), &validation_data)).unwrap();
679			validate_collation(&collator, (*parent_head).clone(), collation.collation);
680		}
681	}
682
683	fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
684		use polkadot_node_core_pvf::testing::validate_candidate;
685
686		let block_data = match collation.proof_of_validity {
687			MaybeCompressedPoV::Raw(pov) => pov.block_data,
688			MaybeCompressedPoV::Compressed(_) => panic!("Only works with uncompressed povs"),
689		};
690
691		let ret_buf = validate_candidate(
692			collator.validation_code(),
693			&ValidationParams {
694				parent_head: parent_head.encode().into(),
695				block_data,
696				relay_parent_number: 1,
697				relay_parent_storage_root: Hash::zero(),
698			}
699			.encode(),
700		)
701		.unwrap();
702		let ret = ValidationResult::decode(&mut &ret_buf[..]).unwrap();
703
704		let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
705		assert_eq!(
706			**collator
707				.state
708				.lock()
709				.unwrap()
710				.number_to_head
711				.get(&(parent_head.number + 1))
712				.unwrap(),
713			new_head
714		);
715	}
716
717	#[test]
718	fn advance_to_state_when_parent_head_is_missing() {
719		let collator = Collator::new(1_000, 1, false);
720		let graveyard_size = collator.state.lock().unwrap().graveyard_size;
721
722		let mut head = calculate_head_and_state_for_number(10, graveyard_size, 1, false).unwrap().0;
723
724		for i in 1..10 {
725			head = collator.state.lock().unwrap().advance(head).unwrap().1;
726			assert_eq!(10 + i, head.number);
727		}
728
729		let collator = Collator::new(1_000, 1, false);
730		let mut second_head = collator
731			.state
732			.lock()
733			.unwrap()
734			.number_to_head
735			.get(&0)
736			.cloned()
737			.unwrap()
738			.as_ref()
739			.clone();
740
741		for _ in 1..20 {
742			second_head = collator.state.lock().unwrap().advance(second_head.clone()).unwrap().1;
743		}
744
745		assert_eq!(second_head, head);
746	}
747}