test_parachain_adder_collator/
lib.rs1use codec::{Decode, Encode};
20use futures::channel::oneshot;
21use futures_timer::Delay;
22use polkadot_node_primitives::{
23 Collation, CollationResult, CollationSecondedSignal, CollatorFn, MaybeCompressedPoV, PoV,
24 Statement,
25};
26use polkadot_primitives::{CollatorId, CollatorPair};
27use sp_core::{traits::SpawnNamed, Pair};
28use std::{
29 collections::HashMap,
30 sync::{
31 atomic::{AtomicU32, Ordering},
32 Arc, Mutex,
33 },
34 time::Duration,
35};
36use test_parachain_adder::{execute, hash_state, BlockData, HeadData};
37
38const ADD: u64 = 2;
42
43fn calculate_head_and_state_for_number(number: u64) -> (HeadData, u64) {
45 let mut head =
46 HeadData { number: 0, parent_hash: Default::default(), post_state: hash_state(0) };
47
48 let mut state = 0u64;
49
50 while head.number < number {
51 let block = BlockData { state, add: ADD };
52 head = execute(head.hash(), head.clone(), &block).expect("Produces valid block");
53 state = state.wrapping_add(ADD);
54 }
55
56 (head, state)
57}
58
59struct State {
61 head_to_state: HashMap<Arc<HeadData>, u64>,
62 number_to_head: HashMap<u64, Arc<HeadData>>,
63 best_block: u64,
65}
66
67impl State {
68 fn genesis() -> Self {
70 let genesis_state = Arc::new(calculate_head_and_state_for_number(0).0);
71
72 Self {
73 head_to_state: vec![(genesis_state.clone(), 0)].into_iter().collect(),
74 number_to_head: vec![(0, genesis_state)].into_iter().collect(),
75 best_block: 0,
76 }
77 }
78
79 fn advance(&mut self, parent_head: HeadData) -> (BlockData, HeadData) {
83 self.best_block = parent_head.number;
84
85 let block = BlockData {
86 state: self
87 .head_to_state
88 .get(&parent_head)
89 .copied()
90 .unwrap_or_else(|| calculate_head_and_state_for_number(parent_head.number).1),
91 add: ADD,
92 };
93
94 let new_head =
95 execute(parent_head.hash(), parent_head, &block).expect("Produces valid block");
96
97 let new_head_arc = Arc::new(new_head.clone());
98 self.head_to_state.insert(new_head_arc.clone(), block.state.wrapping_add(ADD));
99 self.number_to_head.insert(new_head.number, new_head_arc);
100
101 (block, new_head)
102 }
103}
104
105#[derive(Default)]
108pub struct LocalCollatorState {
109 first_relay_parent: Option<u32>,
111 last_relay_parent: Option<u32>,
113}
114
115impl LocalCollatorState {
116 fn advance(&mut self, new_relay_parent: u32, best_block: u64) {
117 match (self.first_relay_parent, self.last_relay_parent) {
118 (Some(first_relay_parent), Some(last_relay_parent)) => {
119 if new_relay_parent > last_relay_parent {
123 let building_for = (new_relay_parent - first_relay_parent) as f32;
124 let velocity = (best_block as f32 / building_for).ceil() as u32;
126
127 log::info!("Parachain velocity: {:}", velocity);
128 }
129 },
130 _ => {},
131 }
132
133 if self.first_relay_parent.is_none() {
134 self.first_relay_parent = Some(new_relay_parent);
135 }
136 self.last_relay_parent = Some(new_relay_parent);
137 }
138}
139
140pub struct Collator {
142 state: Arc<Mutex<State>>,
143 key: CollatorPair,
144 seconded_collations: Arc<AtomicU32>,
145 collator_state: Arc<Mutex<LocalCollatorState>>,
146}
147
148impl Collator {
149 pub fn new() -> Self {
151 Self {
152 state: Arc::new(Mutex::new(State::genesis())),
153 key: CollatorPair::generate().0,
154 seconded_collations: Arc::new(AtomicU32::new(0)),
155 collator_state: Default::default(),
156 }
157 }
158
159 pub fn genesis_head(&self) -> Vec<u8> {
161 self.state
162 .lock()
163 .unwrap()
164 .number_to_head
165 .get(&0)
166 .expect("Genesis header exists")
167 .encode()
168 }
169
170 pub fn validation_code(&self) -> &[u8] {
172 test_parachain_adder::wasm_binary_unwrap()
173 }
174
175 pub fn collator_key(&self) -> CollatorPair {
177 self.key.clone()
178 }
179
180 pub fn collator_id(&self) -> CollatorId {
182 self.key.public()
183 }
184
185 pub fn create_collation_function(
190 &self,
191 spawner: impl SpawnNamed + Clone + 'static,
192 ) -> CollatorFn {
193 use futures::FutureExt as _;
194
195 let state = self.state.clone();
196 let collator_state = self.collator_state.clone();
197 let seconded_collations = self.seconded_collations.clone();
198
199 Box::new(move |relay_parent, validation_data| {
200 let parent = HeadData::decode(&mut &validation_data.parent_head.0[..])
201 .expect("Decodes parent head");
202
203 collator_state
204 .lock()
205 .unwrap()
206 .advance(validation_data.relay_parent_number, parent.number);
207
208 let (block_data, head_data) = state.lock().unwrap().advance(parent);
209
210 log::info!(
211 "created a new collation on relay-parent({}): {:?}",
212 relay_parent,
213 block_data,
214 );
215
216 let pov = PoV { block_data: block_data.encode().into() };
217
218 let collation = Collation {
219 upward_messages: Default::default(),
220 horizontal_messages: Default::default(),
221 new_validation_code: None,
222 head_data: head_data.encode().into(),
223 proof_of_validity: MaybeCompressedPoV::Raw(pov.clone()),
224 processed_downward_messages: 0,
225 hrmp_watermark: validation_data.relay_parent_number,
226 };
227
228 let compressed_pov = polkadot_node_primitives::maybe_compress_pov(pov);
229
230 let (result_sender, recv) = oneshot::channel::<CollationSecondedSignal>();
231 let seconded_collations = seconded_collations.clone();
232 spawner.spawn(
233 "adder-collator-seconded",
234 None,
235 async move {
236 if let Ok(res) = recv.await {
237 if !matches!(
238 res.statement.payload(),
239 Statement::Seconded(s) if s.descriptor.pov_hash() == compressed_pov.hash(),
240 ) {
241 log::error!(
242 "Seconded statement should match our collation: {:?}",
243 res.statement.payload()
244 );
245 std::process::exit(-1);
246 }
247
248 seconded_collations.fetch_add(1, Ordering::Relaxed);
249 }
250 }
251 .boxed(),
252 );
253
254 async move { Some(CollationResult { collation, result_sender: Some(result_sender) }) }
255 .boxed()
256 })
257 }
258
259 pub async fn wait_for_blocks(&self, blocks: u64) {
261 let start_block = self.state.lock().unwrap().best_block;
262 loop {
263 Delay::new(Duration::from_secs(1)).await;
264
265 let current_block = self.state.lock().unwrap().best_block;
266
267 if start_block + blocks <= current_block {
268 return
269 }
270 }
271 }
272
273 pub async fn wait_for_seconded_collations(&self, seconded: u32) {
279 let seconded_collations = self.seconded_collations.clone();
280 loop {
281 Delay::new(Duration::from_secs(1)).await;
282
283 if seconded <= seconded_collations.load(Ordering::Relaxed) {
284 return
285 }
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 use futures::executor::block_on;
295 use polkadot_parachain_primitives::primitives::{ValidationParams, ValidationResult};
296 use polkadot_primitives::PersistedValidationData;
297
298 #[test]
299 fn collator_works() {
300 let spawner = sp_core::testing::TaskExecutor::new();
301 let collator = Collator::new();
302 let collation_function = collator.create_collation_function(spawner);
303
304 for i in 0..5 {
305 let parent_head =
306 collator.state.lock().unwrap().number_to_head.get(&i).unwrap().clone();
307
308 let validation_data = PersistedValidationData {
309 parent_head: parent_head.encode().into(),
310 ..Default::default()
311 };
312
313 let collation =
314 block_on(collation_function(Default::default(), &validation_data)).unwrap();
315 validate_collation(&collator, (*parent_head).clone(), collation.collation);
316 }
317 }
318
319 fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
320 use polkadot_node_core_pvf::testing::validate_candidate;
321
322 let block_data = match collation.proof_of_validity {
323 MaybeCompressedPoV::Raw(pov) => pov.block_data,
324 MaybeCompressedPoV::Compressed(_) => panic!("Only works with uncompressed povs"),
325 };
326
327 let ret_buf = validate_candidate(
328 collator.validation_code(),
329 &ValidationParams {
330 parent_head: parent_head.encode().into(),
331 block_data,
332 relay_parent_number: 1,
333 relay_parent_storage_root: Default::default(),
334 }
335 .encode(),
336 )
337 .unwrap();
338 let ret = ValidationResult::decode(&mut &ret_buf[..]).unwrap();
339
340 let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
341 assert_eq!(
342 **collator
343 .state
344 .lock()
345 .unwrap()
346 .number_to_head
347 .get(&(parent_head.number + 1))
348 .unwrap(),
349 new_head
350 );
351 }
352
353 #[test]
354 fn advance_to_state_when_parent_head_is_missing() {
355 let collator = Collator::new();
356
357 let mut head = calculate_head_and_state_for_number(10).0;
358
359 for i in 1..10 {
360 head = collator.state.lock().unwrap().advance(head).1;
361 assert_eq!(10 + i, head.number);
362 }
363
364 let collator = Collator::new();
365 let mut second_head = collator
366 .state
367 .lock()
368 .unwrap()
369 .number_to_head
370 .get(&0)
371 .cloned()
372 .unwrap()
373 .as_ref()
374 .clone();
375
376 for _ in 1..20 {
377 second_head = collator.state.lock().unwrap().advance(second_head.clone()).1;
378 }
379
380 assert_eq!(second_head, head);
381 }
382}