//! Collator for the adder test parachain.
use codec::{Decode, Encode};
use futures::channel::oneshot;
use futures_timer::Delay;
use polkadot_node_primitives::{
Collation, CollationResult, CollationSecondedSignal, CollatorFn, MaybeCompressedPoV, PoV,
use polkadot_primitives::{CollatorId, CollatorPair};
use sp_core::{traits::SpawnNamed, Pair};
use std::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
use test_parachain_adder::{execute, hash_state, BlockData, HeadData};
/// The amount we add when producing a new block.
/// This is a constant to make tests easily reproducible.
const ADD: u64 = 2;
/// Calculates the head and state for the block with the given `number`.
fn calculate_head_and_state_for_number(number: u64) -> (HeadData, u64) {
let mut head =
HeadData { number: 0, parent_hash: Default::default(), post_state: hash_state(0) };
let mut state = 0u64;
while head.number < number {
let block = BlockData { state, add: ADD };
head = execute(head.hash(), head.clone(), &block).expect("Produces valid block");
state = state.wrapping_add(ADD);
(head, state)
/// The state of the adder parachain.
struct State {
head_to_state: HashMap<Arc<HeadData>, u64>,
number_to_head: HashMap<u64, Arc<HeadData>>,
/// Block number of the best block.
best_block: u64,
impl State {
/// Init the genesis state.
fn genesis() -> Self {
let genesis_state = Arc::new(calculate_head_and_state_for_number(0).0);
Self {
head_to_state: vec![(genesis_state.clone(), 0)].into_iter().collect(),
number_to_head: vec![(0, genesis_state)].into_iter().collect(),
best_block: 0,
/// Advance the state and produce a new block based on the given `parent_head`.
/// Returns the new [`BlockData`] and the new [`HeadData`].
fn advance(&mut self, parent_head: HeadData) -> (BlockData, HeadData) {
self.best_block = parent_head.number;
let block = BlockData {
state: self
.unwrap_or_else(|| calculate_head_and_state_for_number(parent_head.number).1),
add: ADD,
let new_head =
execute(parent_head.hash(), parent_head, &block).expect("Produces valid block");
let new_head_arc = Arc::new(new_head.clone());
self.head_to_state.insert(new_head_arc.clone(), block.state.wrapping_add(ADD));
self.number_to_head.insert(new_head.number, new_head_arc);
(block, new_head)
/// Local collator state so we can compute how fast we are advancing
/// per relay parent.
pub struct LocalCollatorState {
/// First relay block number on which we've built on.
first_relay_parent: Option<u32>,
/// Last relay block number on which we've built on.
last_relay_parent: Option<u32>,
impl LocalCollatorState {
fn advance(&mut self, new_relay_parent: u32, best_block: u64) {
match (self.first_relay_parent, self.last_relay_parent) {
(Some(first_relay_parent), Some(last_relay_parent)) => {
// Compute the parachain velocity when relay parent changes vs our last
// recorded relay parent. We do this to only print out the velocity
// once per relay parent.
if new_relay_parent > last_relay_parent {
let building_for = (new_relay_parent - first_relay_parent) as f32;
// Round it up, as we don't expect perfect runs in CI.
let velocity = (best_block as f32 / building_for).ceil() as u32;
log::info!("Parachain velocity: {:}", velocity);
_ => {},
if self.first_relay_parent.is_none() {
self.first_relay_parent = Some(new_relay_parent);
self.last_relay_parent = Some(new_relay_parent);
/// The collator of the adder parachain.
pub struct Collator {
state: Arc<Mutex<State>>,
key: CollatorPair,
seconded_collations: Arc<AtomicU32>,
collator_state: Arc<Mutex<LocalCollatorState>>,
impl Collator {
/// Create a new collator instance with the state initialized as genesis.
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(State::genesis())),
key: CollatorPair::generate().0,
seconded_collations: Arc::new(AtomicU32::new(0)),
collator_state: Default::default(),
/// Get the SCALE encoded genesis head of the adder parachain.
pub fn genesis_head(&self) -> Vec<u8> {
.expect("Genesis header exists")
/// Get the validation code of the adder parachain.
pub fn validation_code(&self) -> &[u8] {
/// Get the collator key.
pub fn collator_key(&self) -> CollatorPair {
/// Get the collator id.
pub fn collator_id(&self) -> CollatorId {
/// Create the collation function.
/// This collation function can be plugged into the overseer to generate collations for the
/// adder parachain.
pub fn create_collation_function(
spawner: impl SpawnNamed + Clone + 'static,
) -> CollatorFn {
use futures::FutureExt as _;
let state = self.state.clone();
let collator_state = self.collator_state.clone();
let seconded_collations = self.seconded_collations.clone();
Box::new(move |relay_parent, validation_data| {
let parent = HeadData::decode(&mut &validation_data.parent_head.0[..])
.expect("Decodes parent head");
.advance(validation_data.relay_parent_number, parent.number);
let (block_data, head_data) = state.lock().unwrap().advance(parent);
"created a new collation on relay-parent({}): {:?}",
let pov = PoV { block_data: block_data.encode().into() };
let collation = Collation {
upward_messages: Default::default(),
horizontal_messages: Default::default(),
new_validation_code: None,
head_data: head_data.encode().into(),
proof_of_validity: MaybeCompressedPoV::Raw(pov.clone()),
processed_downward_messages: 0,
hrmp_watermark: validation_data.relay_parent_number,
let compressed_pov = polkadot_node_primitives::maybe_compress_pov(pov);
let (result_sender, recv) = oneshot::channel::<CollationSecondedSignal>();
let seconded_collations = seconded_collations.clone();
async move {
if let Ok(res) = recv.await {
if !matches!(
Statement::Seconded(s) if s.descriptor.pov_hash() == compressed_pov.hash(),
) {
"Seconded statement should match our collation: {:?}",
seconded_collations.fetch_add(1, Ordering::Relaxed);
async move { Some(CollationResult { collation, result_sender: Some(result_sender) }) }
/// Wait until `blocks` are built and enacted.
pub async fn wait_for_blocks(&self, blocks: u64) {
let start_block = self.state.lock().unwrap().best_block;
loop {
let current_block = self.state.lock().unwrap().best_block;
if start_block + blocks <= current_block {
/// Wait until `seconded` collations of this collator are seconded by a parachain validator.
/// The internal counter isn't de-duplicating the collations when counting the number of
/// seconded collations. This means when one collation is seconded by X validators, we record X
/// seconded messages.
pub async fn wait_for_seconded_collations(&self, seconded: u32) {
let seconded_collations = self.seconded_collations.clone();
loop {
if seconded <= seconded_collations.load(Ordering::Relaxed) {
mod tests {
use super::*;
use futures::executor::block_on;
use polkadot_parachain_primitives::primitives::{ValidationParams, ValidationResult};
use polkadot_primitives::PersistedValidationData;
fn collator_works() {
let spawner = sp_core::testing::TaskExecutor::new();
let collator = Collator::new();
let collation_function = collator.create_collation_function(spawner);
for i in 0..5 {
let parent_head =
let validation_data = PersistedValidationData {
parent_head: parent_head.encode().into(),
let collation =
block_on(collation_function(Default::default(), &validation_data)).unwrap();
validate_collation(&collator, (*parent_head).clone(), collation.collation);
fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
use polkadot_node_core_pvf::testing::validate_candidate;
let block_data = match collation.proof_of_validity {
MaybeCompressedPoV::Raw(pov) => pov.block_data,
MaybeCompressedPoV::Compressed(_) => panic!("Only works with uncompressed povs"),
let ret_buf = validate_candidate(
&ValidationParams {
parent_head: parent_head.encode().into(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
let ret = ValidationResult::decode(&mut &ret_buf[..]).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
.get(&(parent_head.number + 1))
fn advance_to_state_when_parent_head_is_missing() {
let collator = Collator::new();
let mut head = calculate_head_and_state_for_number(10).0;
for i in 1..10 {
head = collator.state.lock().unwrap().advance(head).1;
assert_eq!(10 + i, head.number);
let collator = Collator::new();
let mut second_head = collator
for _ in 1..20 {
second_head = collator.state.lock().unwrap().advance(second_head.clone()).1;
assert_eq!(second_head, head);