1use codec::{Decode, Encode};
20use futures::{channel::oneshot, StreamExt};
21use futures_timer::Delay;
22use polkadot_cli::ProvideRuntimeApi;
23use polkadot_node_primitives::{
24 maybe_compress_pov, AvailableData, Collation, CollationResult, CollationSecondedSignal,
25 CollatorFn, MaybeCompressedPoV, PoV, Statement, UpwardMessages,
26};
27use polkadot_node_subsystem::messages::CollatorProtocolMessage;
28use polkadot_primitives::{
29 CandidateCommitments, CandidateDescriptorV2, CandidateReceiptV2, ClaimQueueOffset, CollatorId,
30 CollatorPair, CoreIndex, Hash, Id as ParaId, OccupiedCoreAssumption,
31 DEFAULT_CLAIM_QUEUE_OFFSET,
32};
33use polkadot_service::{Handle, NewFull, ParachainHost};
34use sc_client_api::client::BlockchainEvents;
35use sp_core::Pair;
36
37use std::{
38 collections::HashMap,
39 sync::{
40 atomic::{AtomicU32, Ordering},
41 Arc, Mutex,
42 },
43 time::Duration,
44};
45use test_parachain_undying::{
46 execute, hash_state, BlockData, GraveyardState, HeadData, StateMismatch,
47};
48
49pub const LOG_TARGET: &str = "parachain::undying-collator";
50
51const DEFAULT_POV_SIZE: usize = 1000;
53const DEFAULT_PVF_COMPLEXITY: u32 = 1;
55
56fn calculate_head_and_state_for_number(
58 number: u64,
59 graveyard_size: usize,
60 pvf_complexity: u32,
61 experimental_send_approved_peer: bool,
62) -> Result<(HeadData, GraveyardState), StateMismatch> {
63 let index = 0u64;
64 let mut graveyard = vec![0u8; graveyard_size * graveyard_size];
65 let zombies = 0;
66 let seal = [0u8; 32];
67 let core_selector_number = 0;
68
69 graveyard.iter_mut().enumerate().for_each(|(i, grave)| {
71 *grave = i as u8;
72 });
73
74 let mut state = GraveyardState { index, graveyard, zombies, seal, core_selector_number };
75 let mut head =
76 HeadData { number: 0, parent_hash: Hash::default().into(), post_state: hash_state(&state) };
77
78 while head.number < number {
79 let block = BlockData {
80 state,
81 tombstones: 1_000,
82 iterations: pvf_complexity,
83 experimental_send_approved_peer,
84 };
85 let (new_head, new_state, _) = execute(head.hash(), head.clone(), block)?;
86 head = new_head;
87 state = new_state;
88 }
89
90 Ok((head, state))
91}
92
93struct State {
95 head_to_state: HashMap<Arc<HeadData>, GraveyardState>,
99 number_to_head: HashMap<u64, Arc<HeadData>>,
100 best_block: u64,
102 pvf_complexity: u32,
104 graveyard_size: usize,
111 experimental_send_approved_peer: bool,
112}
113
114impl State {
115 fn genesis(
117 graveyard_size: usize,
118 pvf_complexity: u32,
119 experimental_send_approved_peer: bool,
120 ) -> Self {
121 let index = 0u64;
122 let mut graveyard = vec![0u8; graveyard_size * graveyard_size];
123 let zombies = 0;
124 let seal = [0u8; 32];
125 let core_selector_number = 0;
126
127 graveyard.iter_mut().enumerate().for_each(|(i, grave)| {
129 *grave = i as u8;
130 });
131
132 let state = GraveyardState { index, graveyard, zombies, seal, core_selector_number };
133
134 let head_data =
135 HeadData { number: 0, parent_hash: Default::default(), post_state: hash_state(&state) };
136 let head_data = Arc::new(head_data);
137
138 Self {
139 head_to_state: vec![(head_data.clone(), state.clone())].into_iter().collect(),
140 number_to_head: vec![(0, head_data)].into_iter().collect(),
141 best_block: 0,
142 pvf_complexity,
143 graveyard_size,
144 experimental_send_approved_peer,
145 }
146 }
147
148 fn advance(
152 &mut self,
153 parent_head: HeadData,
154 ) -> Result<(BlockData, HeadData, UpwardMessages), StateMismatch> {
155 self.best_block = parent_head.number;
156
157 let state = if let Some(state) = self
158 .number_to_head
159 .get(&self.best_block)
160 .and_then(|head_data| self.head_to_state.get(head_data).cloned())
161 {
162 state
163 } else {
164 let (_, state) = calculate_head_and_state_for_number(
165 parent_head.number,
166 self.graveyard_size,
167 self.pvf_complexity,
168 self.experimental_send_approved_peer,
169 )?;
170 state
171 };
172
173 let block = BlockData {
175 state,
176 tombstones: 1000,
177 iterations: self.pvf_complexity,
178 experimental_send_approved_peer: self.experimental_send_approved_peer,
179 };
180
181 let (new_head, new_state, upward_messages) =
182 execute(parent_head.hash(), parent_head, block.clone())?;
183
184 let new_head_arc = Arc::new(new_head.clone());
185
186 self.head_to_state.insert(new_head_arc.clone(), new_state);
187 self.number_to_head.insert(new_head.number, new_head_arc);
188
189 Ok((block, new_head, upward_messages))
190 }
191}
192
193pub struct Collator {
195 state: Arc<Mutex<State>>,
196 key: CollatorPair,
197 seconded_collations: Arc<AtomicU32>,
198}
199
200impl Default for Collator {
201 fn default() -> Self {
202 Self::new(DEFAULT_POV_SIZE, DEFAULT_PVF_COMPLEXITY, false)
203 }
204}
205
206impl Collator {
207 pub fn new(
210 pov_size: usize,
211 pvf_complexity: u32,
212 experimental_send_approved_peer: bool,
213 ) -> Self {
214 let graveyard_size = ((pov_size / std::mem::size_of::<u8>()) as f64).sqrt().ceil() as usize;
215
216 log::info!(
217 target: LOG_TARGET,
218 "PoV target size: {} bytes. Graveyard size: ({} x {})",
219 pov_size,
220 graveyard_size,
221 graveyard_size,
222 );
223
224 log::info!(
225 target: LOG_TARGET,
226 "PVF time complexity: {}",
227 pvf_complexity,
228 );
229
230 Self {
231 state: Arc::new(Mutex::new(State::genesis(
232 graveyard_size,
233 pvf_complexity,
234 experimental_send_approved_peer,
235 ))),
236 key: CollatorPair::generate().0,
237 seconded_collations: Arc::new(AtomicU32::new(0)),
238 }
239 }
240
241 pub fn genesis_head(&self) -> Vec<u8> {
243 self.state
244 .lock()
245 .unwrap()
246 .number_to_head
247 .get(&0)
248 .expect("Genesis header exists")
249 .encode()
250 }
251
252 pub fn validation_code(&self) -> &[u8] {
254 test_parachain_undying::wasm_binary_unwrap()
255 }
256
257 pub fn collator_key(&self) -> CollatorPair {
259 self.key.clone()
260 }
261
262 pub fn collator_id(&self) -> CollatorId {
264 self.key.public()
265 }
266
267 pub fn create_collation_function(
272 &self,
273 spawner: impl SpawnNamed + Clone + 'static,
274 ) -> CollatorFn {
275 use futures::FutureExt as _;
276
277 let state = self.state.clone();
278 let seconded_collations = self.seconded_collations.clone();
279
280 Box::new(move |relay_parent, validation_data| {
281 let parent = match HeadData::decode(&mut &validation_data.parent_head.0[..]) {
282 Err(err) => {
283 log::error!(
284 target: LOG_TARGET,
285 "Requested to build on top of malformed head-data: {:?}",
286 err,
287 );
288 return futures::future::ready(None).boxed()
289 },
290 Ok(p) => p,
291 };
292
293 let (block_data, head_data, upward_messages) =
294 match state.lock().unwrap().advance(parent.clone()) {
295 Err(err) => {
296 log::error!(
297 target: LOG_TARGET,
298 "Unable to build on top of {:?}: {:?}",
299 parent,
300 err,
301 );
302 return futures::future::ready(None).boxed()
303 },
304 Ok(x) => x,
305 };
306
307 log::info!(
308 target: LOG_TARGET,
309 "created a new collation on relay-parent({}): {:?}",
310 relay_parent,
311 head_data,
312 );
313
314 let pov = PoV { block_data: block_data.encode().into() };
316
317 let collation = Collation {
318 upward_messages,
319 horizontal_messages: Default::default(),
320 new_validation_code: None,
321 head_data: head_data.encode().into(),
322 proof_of_validity: MaybeCompressedPoV::Raw(pov.clone()),
323 processed_downward_messages: 0,
324 hrmp_watermark: validation_data.relay_parent_number,
325 };
326
327 log::info!(
328 target: LOG_TARGET,
329 "Raw PoV size for collation: {} bytes",
330 pov.block_data.0.len(),
331 );
332 let compressed_pov = maybe_compress_pov(pov);
333
334 log::info!(
335 target: LOG_TARGET,
336 "Compressed PoV size for collation: {} bytes",
337 compressed_pov.block_data.0.len(),
338 );
339
340 let (result_sender, recv) = oneshot::channel::<CollationSecondedSignal>();
341 let seconded_collations = seconded_collations.clone();
342 spawner.spawn(
343 "undying-collator-seconded",
344 None,
345 async move {
346 if let Ok(res) = recv.await {
347 if !matches!(
348 res.statement.payload(),
349 Statement::Seconded(s) if s.descriptor.pov_hash() == compressed_pov.hash(),
350 ) {
351 log::error!(
352 target: LOG_TARGET,
353 "Seconded statement should match our collation: {:?}",
354 res.statement.payload(),
355 );
356 }
357
358 seconded_collations.fetch_add(1, Ordering::Relaxed);
359 }
360 }
361 .boxed(),
362 );
363
364 async move { Some(CollationResult { collation, result_sender: Some(result_sender) }) }
365 .boxed()
366 })
367 }
368
369 pub async fn wait_for_blocks(&self, blocks: u64) {
371 let start_block = self.state.lock().unwrap().best_block;
372 loop {
373 Delay::new(Duration::from_secs(1)).await;
374
375 let current_block = self.state.lock().unwrap().best_block;
376
377 if start_block + blocks <= current_block {
378 return
379 }
380 }
381 }
382
383 pub async fn wait_for_seconded_collations(&self, seconded: u32) {
389 let seconded_collations = self.seconded_collations.clone();
390 loop {
391 Delay::new(Duration::from_secs(1)).await;
392
393 if seconded <= seconded_collations.load(Ordering::Relaxed) {
394 return
395 }
396 }
397 }
398
399 pub fn send_same_collations_to_all_assigned_cores(
400 &self,
401 full_node: &NewFull,
402 mut overseer_handle: Handle,
403 para_id: ParaId,
404 ) {
405 let client = full_node.client.clone();
406
407 let collation_function =
408 self.create_collation_function(full_node.task_manager.spawn_handle());
409
410 full_node
411 .task_manager
412 .spawn_handle()
413 .spawn("malus-undying-collator", None, async move {
414 let mut import_notifications = client.import_notification_stream();
418
419 while let Some(notification) = import_notifications.next().await {
420 let relay_parent = notification.hash;
421
422 let claim_queue = match client.runtime_api().claim_queue(relay_parent) {
424 Ok(claim_queue) => claim_queue,
425 Err(error) => {
426 log::error!(
427 target: LOG_TARGET,
428 "Failed to query claim queue runtime API: {error:?}",
429 );
430 continue;
431 },
432 };
433
434 let claim_queue_offset = ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET);
435
436 let scheduled_cores: Vec<CoreIndex> = claim_queue
437 .iter()
438 .filter_map(move |(core_index, paras)| {
439 paras.get(claim_queue_offset.0 as usize).and_then(|core_para_id| {
440 (core_para_id == ¶_id).then_some(*core_index)
441 })
442 })
443 .collect();
444
445 if scheduled_cores.is_empty() {
446 log::info!(
447 target: LOG_TARGET,
448 "Scheduled cores is empty.",
449 );
450 continue;
451 }
452
453 if scheduled_cores.len() == 1 {
454 log::info!(
455 target: LOG_TARGET,
456 "Malus collator configured with duplicate collations, but only 1 core assigned. \
457 Collator will not do anything malicious.",
458 );
459 }
460
461 let validation_data = match client.runtime_api().persisted_validation_data(
463 relay_parent,
464 para_id,
465 OccupiedCoreAssumption::Included,
466 ) {
467 Ok(Some(validation_data)) => validation_data,
468 Ok(None) => {
469 log::info!(
470 target: LOG_TARGET,
471 "Persisted validation data is None.",
472 );
473 continue;
474 },
475 Err(error) => {
476 log::error!(
477 target: LOG_TARGET,
478 "Failed to query persisted validation data runtime API: {error:?}",
479 );
480 continue;
481 },
482 };
483
484 let collation =
486 match collation_function(relay_parent, &validation_data).await {
487 Some(collation) => collation,
488 None => {
489 log::info!(
490 target: LOG_TARGET,
491 "Collation result is None.",
492 );
493 continue;
494 },
495 }
496 .collation;
497
498 let validation_code_hash = match client.runtime_api().validation_code_hash(
500 relay_parent,
501 para_id,
502 OccupiedCoreAssumption::Included,
503 ) {
504 Ok(Some(validation_code_hash)) => validation_code_hash,
505 Ok(None) => {
506 log::info!(
507 target: LOG_TARGET,
508 "Validation code hash is None.",
509 );
510 continue;
511 },
512 Err(error) => {
513 log::error!(
514 target: LOG_TARGET,
515 "Failed to query validation code hash runtime API: {error:?}",
516 );
517 continue;
518 },
519 };
520
521 let session_index =
523 match client.runtime_api().session_index_for_child(relay_parent) {
524 Ok(session_index) => session_index,
525 Err(error) => {
526 log::error!(
527 target: LOG_TARGET,
528 "Failed to query session index for child runtime API: {error:?}",
529 );
530 continue;
531 },
532 };
533
534 let persisted_validation_data_hash = validation_data.hash();
535 let parent_head_data = validation_data.parent_head.clone();
536 let parent_head_data_hash = validation_data.parent_head.hash();
537
538 let pov = {
540 let pov = collation.proof_of_validity.into_compressed();
541 let encoded_size = pov.encoded_size();
542 let max_pov_size = validation_data.max_pov_size as usize;
543
544 if encoded_size > max_pov_size {
551 log::error!(
552 target: LOG_TARGET,
553 "PoV size {encoded_size} exceeded maximum size of {max_pov_size}",
554 );
555 continue;
556 }
557
558 pov
559 };
560
561 let pov_hash = pov.hash();
562
563 let session_info =
565 match client.runtime_api().session_info(relay_parent, session_index) {
566 Ok(Some(session_info)) => session_info,
567 Ok(None) => {
568 log::info!(
569 target: LOG_TARGET,
570 "Session info is None.",
571 );
572 continue;
573 },
574 Err(error) => {
575 log::error!(
576 target: LOG_TARGET,
577 "Failed to query session info runtime API: {error:?}",
578 );
579 continue;
580 },
581 };
582
583 let n_validators = session_info.validators.len();
584
585 let available_data =
586 AvailableData { validation_data, pov: Arc::new(pov.clone()) };
587 let chunks = match polkadot_erasure_coding::obtain_chunks_v1(
588 n_validators,
589 &available_data,
590 ) {
591 Ok(chunks) => chunks,
592 Err(error) => {
593 log::error!(
594 target: LOG_TARGET,
595 "Failed to obtain chunks v1: {error:?}",
596 );
597 continue;
598 },
599 };
600 let erasure_root = polkadot_erasure_coding::branches(&chunks).root();
601
602 let commitments = CandidateCommitments {
603 upward_messages: collation.upward_messages,
604 horizontal_messages: collation.horizontal_messages,
605 new_validation_code: collation.new_validation_code,
606 head_data: collation.head_data,
607 processed_downward_messages: collation.processed_downward_messages,
608 hrmp_watermark: collation.hrmp_watermark,
609 };
610
611 for core_index in &scheduled_cores {
613 let candidate_receipt = CandidateReceiptV2 {
614 descriptor: CandidateDescriptorV2::new(
615 para_id,
616 relay_parent,
617 *core_index,
618 session_index,
619 persisted_validation_data_hash,
620 pov_hash,
621 erasure_root,
622 commitments.head_data.hash(),
623 validation_code_hash,
624 ),
625 commitments_hash: commitments.hash(),
626 };
627
628 overseer_handle
635 .send_msg(
636 CollatorProtocolMessage::DistributeCollation {
637 candidate_receipt,
638 parent_head_data_hash,
639 pov: pov.clone(),
640 parent_head_data: parent_head_data.clone(),
641 result_sender: None,
642 core_index: *core_index,
643 },
644 "Collator",
645 )
646 .await;
647 }
648 }
649 });
650 }
651}
652
653use sp_core::traits::SpawnNamed;
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658 use futures::executor::block_on;
659 use polkadot_parachain_primitives::primitives::{ValidationParams, ValidationResult};
660 use polkadot_primitives::{Hash, PersistedValidationData};
661
662 #[test]
663 fn collator_works() {
664 let spawner = sp_core::testing::TaskExecutor::new();
665 let collator = Collator::new(1_000, 1, false);
666 let collation_function = collator.create_collation_function(spawner);
667
668 for i in 0..5 {
669 let parent_head =
670 collator.state.lock().unwrap().number_to_head.get(&i).unwrap().clone();
671
672 let validation_data = PersistedValidationData {
673 parent_head: parent_head.encode().into(),
674 ..Default::default()
675 };
676
677 let collation =
678 block_on(collation_function(Default::default(), &validation_data)).unwrap();
679 validate_collation(&collator, (*parent_head).clone(), collation.collation);
680 }
681 }
682
683 fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
684 use polkadot_node_core_pvf::testing::validate_candidate;
685
686 let block_data = match collation.proof_of_validity {
687 MaybeCompressedPoV::Raw(pov) => pov.block_data,
688 MaybeCompressedPoV::Compressed(_) => panic!("Only works with uncompressed povs"),
689 };
690
691 let ret_buf = validate_candidate(
692 collator.validation_code(),
693 &ValidationParams {
694 parent_head: parent_head.encode().into(),
695 block_data,
696 relay_parent_number: 1,
697 relay_parent_storage_root: Hash::zero(),
698 }
699 .encode(),
700 )
701 .unwrap();
702 let ret = ValidationResult::decode(&mut &ret_buf[..]).unwrap();
703
704 let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
705 assert_eq!(
706 **collator
707 .state
708 .lock()
709 .unwrap()
710 .number_to_head
711 .get(&(parent_head.number + 1))
712 .unwrap(),
713 new_head
714 );
715 }
716
717 #[test]
718 fn advance_to_state_when_parent_head_is_missing() {
719 let collator = Collator::new(1_000, 1, false);
720 let graveyard_size = collator.state.lock().unwrap().graveyard_size;
721
722 let mut head = calculate_head_and_state_for_number(10, graveyard_size, 1, false).unwrap().0;
723
724 for i in 1..10 {
725 head = collator.state.lock().unwrap().advance(head).unwrap().1;
726 assert_eq!(10 + i, head.number);
727 }
728
729 let collator = Collator::new(1_000, 1, false);
730 let mut second_head = collator
731 .state
732 .lock()
733 .unwrap()
734 .number_to_head
735 .get(&0)
736 .cloned()
737 .unwrap()
738 .as_ref()
739 .clone();
740
741 for _ in 1..20 {
742 second_head = collator.state.lock().unwrap().advance(second_head.clone()).unwrap().1;
743 }
744
745 assert_eq!(second_head, head);
746 }
747}