use codec::{Decode, Encode};
use futures::{channel::oneshot, StreamExt};
use futures_timer::Delay;
use polkadot_cli::ProvideRuntimeApi;
use polkadot_node_primitives::{
maybe_compress_pov, AvailableData, Collation, CollationResult, CollationSecondedSignal,
CollatorFn, MaybeCompressedPoV, PoV, Statement, UpwardMessages,
};
use polkadot_node_subsystem::messages::CollatorProtocolMessage;
use polkadot_primitives::{
vstaging::{
CandidateDescriptorV2, CandidateReceiptV2, ClaimQueueOffset, DEFAULT_CLAIM_QUEUE_OFFSET,
},
CandidateCommitments, CollatorId, CollatorPair, CoreIndex, Hash, Id as ParaId,
OccupiedCoreAssumption,
};
use polkadot_service::{Handle, NewFull, ParachainHost};
use sc_client_api::client::BlockchainEvents;
use sp_core::Pair;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
time::Duration,
};
use test_parachain_undying::{
execute, hash_state, BlockData, GraveyardState, HeadData, StateMismatch,
};
pub const LOG_TARGET: &str = "parachain::undying-collator";
const DEFAULT_POV_SIZE: usize = 1000;
const DEFAULT_PVF_COMPLEXITY: u32 = 1;
fn calculate_head_and_state_for_number(
number: u64,
graveyard_size: usize,
pvf_complexity: u32,
experimental_send_approved_peer: bool,
) -> Result<(HeadData, GraveyardState), StateMismatch> {
let index = 0u64;
let mut graveyard = vec![0u8; graveyard_size * graveyard_size];
let zombies = 0;
let seal = [0u8; 32];
let core_selector_number = 0;
graveyard.iter_mut().enumerate().for_each(|(i, grave)| {
*grave = i as u8;
});
let mut state = GraveyardState { index, graveyard, zombies, seal, core_selector_number };
let mut head =
HeadData { number: 0, parent_hash: Hash::default().into(), post_state: hash_state(&state) };
while head.number < number {
let block = BlockData {
state,
tombstones: 1_000,
iterations: pvf_complexity,
experimental_send_approved_peer,
};
let (new_head, new_state, _) = execute(head.hash(), head.clone(), block)?;
head = new_head;
state = new_state;
}
Ok((head, state))
}
struct State {
head_to_state: HashMap<Arc<HeadData>, GraveyardState>,
number_to_head: HashMap<u64, Arc<HeadData>>,
best_block: u64,
pvf_complexity: u32,
graveyard_size: usize,
experimental_send_approved_peer: bool,
}
impl State {
fn genesis(
graveyard_size: usize,
pvf_complexity: u32,
experimental_send_approved_peer: bool,
) -> Self {
let index = 0u64;
let mut graveyard = vec![0u8; graveyard_size * graveyard_size];
let zombies = 0;
let seal = [0u8; 32];
let core_selector_number = 0;
graveyard.iter_mut().enumerate().for_each(|(i, grave)| {
*grave = i as u8;
});
let state = GraveyardState { index, graveyard, zombies, seal, core_selector_number };
let head_data =
HeadData { number: 0, parent_hash: Default::default(), post_state: hash_state(&state) };
let head_data = Arc::new(head_data);
Self {
head_to_state: vec![(head_data.clone(), state.clone())].into_iter().collect(),
number_to_head: vec![(0, head_data)].into_iter().collect(),
best_block: 0,
pvf_complexity,
graveyard_size,
experimental_send_approved_peer,
}
}
fn advance(
&mut self,
parent_head: HeadData,
) -> Result<(BlockData, HeadData, UpwardMessages), StateMismatch> {
self.best_block = parent_head.number;
let state = if let Some(state) = self
.number_to_head
.get(&self.best_block)
.and_then(|head_data| self.head_to_state.get(head_data).cloned())
{
state
} else {
let (_, state) = calculate_head_and_state_for_number(
parent_head.number,
self.graveyard_size,
self.pvf_complexity,
self.experimental_send_approved_peer,
)?;
state
};
let block = BlockData {
state,
tombstones: 1000,
iterations: self.pvf_complexity,
experimental_send_approved_peer: self.experimental_send_approved_peer,
};
let (new_head, new_state, upward_messages) =
execute(parent_head.hash(), parent_head, block.clone())?;
let new_head_arc = Arc::new(new_head.clone());
self.head_to_state.insert(new_head_arc.clone(), new_state);
self.number_to_head.insert(new_head.number, new_head_arc);
Ok((block, new_head, upward_messages))
}
}
pub struct Collator {
state: Arc<Mutex<State>>,
key: CollatorPair,
seconded_collations: Arc<AtomicU32>,
}
impl Default for Collator {
fn default() -> Self {
Self::new(DEFAULT_POV_SIZE, DEFAULT_PVF_COMPLEXITY, false)
}
}
impl Collator {
pub fn new(
pov_size: usize,
pvf_complexity: u32,
experimental_send_approved_peer: bool,
) -> Self {
let graveyard_size = ((pov_size / std::mem::size_of::<u8>()) as f64).sqrt().ceil() as usize;
log::info!(
target: LOG_TARGET,
"PoV target size: {} bytes. Graveyard size: ({} x {})",
pov_size,
graveyard_size,
graveyard_size,
);
log::info!(
target: LOG_TARGET,
"PVF time complexity: {}",
pvf_complexity,
);
Self {
state: Arc::new(Mutex::new(State::genesis(
graveyard_size,
pvf_complexity,
experimental_send_approved_peer,
))),
key: CollatorPair::generate().0,
seconded_collations: Arc::new(AtomicU32::new(0)),
}
}
pub fn genesis_head(&self) -> Vec<u8> {
self.state
.lock()
.unwrap()
.number_to_head
.get(&0)
.expect("Genesis header exists")
.encode()
}
pub fn validation_code(&self) -> &[u8] {
test_parachain_undying::wasm_binary_unwrap()
}
pub fn collator_key(&self) -> CollatorPair {
self.key.clone()
}
pub fn collator_id(&self) -> CollatorId {
self.key.public()
}
pub fn create_collation_function(
&self,
spawner: impl SpawnNamed + Clone + 'static,
) -> CollatorFn {
use futures::FutureExt as _;
let state = self.state.clone();
let seconded_collations = self.seconded_collations.clone();
Box::new(move |relay_parent, validation_data| {
let parent = match HeadData::decode(&mut &validation_data.parent_head.0[..]) {
Err(err) => {
log::error!(
target: LOG_TARGET,
"Requested to build on top of malformed head-data: {:?}",
err,
);
return futures::future::ready(None).boxed()
},
Ok(p) => p,
};
let (block_data, head_data, upward_messages) =
match state.lock().unwrap().advance(parent.clone()) {
Err(err) => {
log::error!(
target: LOG_TARGET,
"Unable to build on top of {:?}: {:?}",
parent,
err,
);
return futures::future::ready(None).boxed()
},
Ok(x) => x,
};
log::info!(
target: LOG_TARGET,
"created a new collation on relay-parent({}): {:?}",
relay_parent,
head_data,
);
let pov = PoV { block_data: block_data.encode().into() };
let collation = Collation {
upward_messages,
horizontal_messages: Default::default(),
new_validation_code: None,
head_data: head_data.encode().into(),
proof_of_validity: MaybeCompressedPoV::Raw(pov.clone()),
processed_downward_messages: 0,
hrmp_watermark: validation_data.relay_parent_number,
};
log::info!(
target: LOG_TARGET,
"Raw PoV size for collation: {} bytes",
pov.block_data.0.len(),
);
let compressed_pov = maybe_compress_pov(pov);
log::info!(
target: LOG_TARGET,
"Compressed PoV size for collation: {} bytes",
compressed_pov.block_data.0.len(),
);
let (result_sender, recv) = oneshot::channel::<CollationSecondedSignal>();
let seconded_collations = seconded_collations.clone();
spawner.spawn(
"undying-collator-seconded",
None,
async move {
if let Ok(res) = recv.await {
if !matches!(
res.statement.payload(),
Statement::Seconded(s) if s.descriptor.pov_hash() == compressed_pov.hash(),
) {
log::error!(
target: LOG_TARGET,
"Seconded statement should match our collation: {:?}",
res.statement.payload(),
);
}
seconded_collations.fetch_add(1, Ordering::Relaxed);
}
}
.boxed(),
);
async move { Some(CollationResult { collation, result_sender: Some(result_sender) }) }
.boxed()
})
}
pub async fn wait_for_blocks(&self, blocks: u64) {
let start_block = self.state.lock().unwrap().best_block;
loop {
Delay::new(Duration::from_secs(1)).await;
let current_block = self.state.lock().unwrap().best_block;
if start_block + blocks <= current_block {
return
}
}
}
pub async fn wait_for_seconded_collations(&self, seconded: u32) {
let seconded_collations = self.seconded_collations.clone();
loop {
Delay::new(Duration::from_secs(1)).await;
if seconded <= seconded_collations.load(Ordering::Relaxed) {
return
}
}
}
pub fn send_same_collations_to_all_assigned_cores(
&self,
full_node: &NewFull,
mut overseer_handle: Handle,
para_id: ParaId,
) {
let client = full_node.client.clone();
let collation_function =
self.create_collation_function(full_node.task_manager.spawn_handle());
full_node
.task_manager
.spawn_handle()
.spawn("malus-undying-collator", None, async move {
let mut import_notifications = client.import_notification_stream();
while let Some(notification) = import_notifications.next().await {
let relay_parent = notification.hash;
let claim_queue = match client.runtime_api().claim_queue(relay_parent) {
Ok(claim_queue) => claim_queue,
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query claim queue runtime API: {error:?}",
);
continue;
},
};
let claim_queue_offset = ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET);
let scheduled_cores: Vec<CoreIndex> = claim_queue
.iter()
.filter_map(move |(core_index, paras)| {
paras.get(claim_queue_offset.0 as usize).and_then(|core_para_id| {
(core_para_id == ¶_id).then_some(*core_index)
})
})
.collect();
if scheduled_cores.is_empty() {
log::info!(
target: LOG_TARGET,
"Scheduled cores is empty.",
);
continue;
}
if scheduled_cores.len() == 1 {
log::info!(
target: LOG_TARGET,
"Malus collator configured with duplicate collations, but only 1 core assigned. \
Collator will not do anything malicious.",
);
}
let validation_data = match client.runtime_api().persisted_validation_data(
relay_parent,
para_id,
OccupiedCoreAssumption::Included,
) {
Ok(Some(validation_data)) => validation_data,
Ok(None) => {
log::info!(
target: LOG_TARGET,
"Persisted validation data is None.",
);
continue;
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query persisted validation data runtime API: {error:?}",
);
continue;
},
};
let collation =
match collation_function(relay_parent, &validation_data).await {
Some(collation) => collation,
None => {
log::info!(
target: LOG_TARGET,
"Collation result is None.",
);
continue;
},
}
.collation;
let validation_code_hash = match client.runtime_api().validation_code_hash(
relay_parent,
para_id,
OccupiedCoreAssumption::Included,
) {
Ok(Some(validation_code_hash)) => validation_code_hash,
Ok(None) => {
log::info!(
target: LOG_TARGET,
"Validation code hash is None.",
);
continue;
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query validation code hash runtime API: {error:?}",
);
continue;
},
};
let session_index =
match client.runtime_api().session_index_for_child(relay_parent) {
Ok(session_index) => session_index,
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query session index for child runtime API: {error:?}",
);
continue;
},
};
let persisted_validation_data_hash = validation_data.hash();
let parent_head_data = validation_data.parent_head.clone();
let parent_head_data_hash = validation_data.parent_head.hash();
let pov = {
let pov = collation.proof_of_validity.into_compressed();
let encoded_size = pov.encoded_size();
let max_pov_size = validation_data.max_pov_size as usize;
if encoded_size > max_pov_size {
log::error!(
target: LOG_TARGET,
"PoV size {encoded_size} exceeded maximum size of {max_pov_size}",
);
continue;
}
pov
};
let pov_hash = pov.hash();
let session_info =
match client.runtime_api().session_info(relay_parent, session_index) {
Ok(Some(session_info)) => session_info,
Ok(None) => {
log::info!(
target: LOG_TARGET,
"Session info is None.",
);
continue;
},
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to query session info runtime API: {error:?}",
);
continue;
},
};
let n_validators = session_info.validators.len();
let available_data =
AvailableData { validation_data, pov: Arc::new(pov.clone()) };
let chunks = match polkadot_erasure_coding::obtain_chunks_v1(
n_validators,
&available_data,
) {
Ok(chunks) => chunks,
Err(error) => {
log::error!(
target: LOG_TARGET,
"Failed to obtain chunks v1: {error:?}",
);
continue;
},
};
let erasure_root = polkadot_erasure_coding::branches(&chunks).root();
let commitments = CandidateCommitments {
upward_messages: collation.upward_messages,
horizontal_messages: collation.horizontal_messages,
new_validation_code: collation.new_validation_code,
head_data: collation.head_data,
processed_downward_messages: collation.processed_downward_messages,
hrmp_watermark: collation.hrmp_watermark,
};
for core_index in &scheduled_cores {
let candidate_receipt = CandidateReceiptV2 {
descriptor: CandidateDescriptorV2::new(
para_id,
relay_parent,
*core_index,
session_index,
persisted_validation_data_hash,
pov_hash,
erasure_root,
commitments.head_data.hash(),
validation_code_hash,
),
commitments_hash: commitments.hash(),
};
overseer_handle
.send_msg(
CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
parent_head_data_hash,
pov: pov.clone(),
parent_head_data: parent_head_data.clone(),
result_sender: None,
core_index: *core_index,
},
"Collator",
)
.await;
}
}
});
}
}
use sp_core::traits::SpawnNamed;
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
use polkadot_parachain_primitives::primitives::{ValidationParams, ValidationResult};
use polkadot_primitives::{Hash, PersistedValidationData};
#[test]
fn collator_works() {
let spawner = sp_core::testing::TaskExecutor::new();
let collator = Collator::new(1_000, 1, false);
let collation_function = collator.create_collation_function(spawner);
for i in 0..5 {
let parent_head =
collator.state.lock().unwrap().number_to_head.get(&i).unwrap().clone();
let validation_data = PersistedValidationData {
parent_head: parent_head.encode().into(),
..Default::default()
};
let collation =
block_on(collation_function(Default::default(), &validation_data)).unwrap();
validate_collation(&collator, (*parent_head).clone(), collation.collation);
}
}
fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
use polkadot_node_core_pvf::testing::validate_candidate;
let block_data = match collation.proof_of_validity {
MaybeCompressedPoV::Raw(pov) => pov.block_data,
MaybeCompressedPoV::Compressed(_) => panic!("Only works with uncompressed povs"),
};
let ret_buf = validate_candidate(
collator.validation_code(),
&ValidationParams {
parent_head: parent_head.encode().into(),
block_data,
relay_parent_number: 1,
relay_parent_storage_root: Hash::zero(),
}
.encode(),
)
.unwrap();
let ret = ValidationResult::decode(&mut &ret_buf[..]).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
assert_eq!(
**collator
.state
.lock()
.unwrap()
.number_to_head
.get(&(parent_head.number + 1))
.unwrap(),
new_head
);
}
#[test]
fn advance_to_state_when_parent_head_is_missing() {
let collator = Collator::new(1_000, 1, false);
let graveyard_size = collator.state.lock().unwrap().graveyard_size;
let mut head = calculate_head_and_state_for_number(10, graveyard_size, 1, false).unwrap().0;
for i in 1..10 {
head = collator.state.lock().unwrap().advance(head).unwrap().1;
assert_eq!(10 + i, head.number);
}
let collator = Collator::new(1_000, 1, false);
let mut second_head = collator
.state
.lock()
.unwrap()
.number_to_head
.get(&0)
.cloned()
.unwrap()
.as_ref()
.clone();
for _ in 1..20 {
second_head = collator.state.lock().unwrap().advance(second_head.clone()).unwrap().1;
}
assert_eq!(second_head, head);
}
}