1use sp_api::RuntimeApiInfo;
25use sp_consensus::block_validation::{
26 BlockAnnounceValidator as BlockAnnounceValidatorT, Validation,
27};
28use sp_core::traits::SpawnNamed;
29use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
30
31use cumulus_relay_chain_interface::RelayChainInterface;
32use polkadot_node_primitives::{CollationSecondedSignal, Statement};
33use polkadot_node_subsystem::messages::RuntimeApiRequest;
34use polkadot_parachain_primitives::primitives::HeadData;
35use polkadot_primitives::{
36 CandidateReceiptV2 as CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId,
37 OccupiedCoreAssumption, SigningContext, UncheckedSigned,
38};
39
40use codec::{Decode, DecodeAll, Encode};
41use futures::{channel::oneshot, future::FutureExt, Future};
42use std::{fmt, marker::PhantomData, pin::Pin, sync::Arc};
43
44#[cfg(test)]
45mod tests;
46
47const LOG_TARGET: &str = "sync::cumulus";
48
49type BoxedError = Box<dyn std::error::Error + Send>;
50
51#[derive(Debug)]
52struct BlockAnnounceError(String);
53impl std::error::Error for BlockAnnounceError {}
54
55impl fmt::Display for BlockAnnounceError {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 self.0.fmt(f)
58 }
59}
60
61#[derive(Encode, Debug)]
66pub struct BlockAnnounceData {
67 receipt: CandidateReceipt,
69 statement: UncheckedSigned<CompactStatement>,
71 relay_parent: PHash,
73}
74
75impl Decode for BlockAnnounceData {
76 fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
77 let receipt = CandidateReceipt::decode(input)?;
78 let statement = UncheckedSigned::<CompactStatement>::decode(input)?;
79
80 let relay_parent = match PHash::decode(input) {
81 Ok(p) => p,
82 Err(_) => receipt.descriptor.relay_parent(),
84 };
85
86 Ok(Self { receipt, statement, relay_parent })
87 }
88}
89
90impl BlockAnnounceData {
91 fn validate(&self, encoded_header: Vec<u8>) -> Result<(), Validation> {
96 let candidate_hash =
97 if let CompactStatement::Seconded(h) = self.statement.unchecked_payload() {
98 h
99 } else {
100 tracing::debug!(target: LOG_TARGET, "`CompactStatement` isn't the candidate variant!",);
101 return Err(Validation::Failure { disconnect: true })
102 };
103
104 if *candidate_hash != self.receipt.hash() {
105 tracing::debug!(
106 target: LOG_TARGET,
107 "Receipt candidate hash doesn't match candidate hash in statement",
108 );
109 return Err(Validation::Failure { disconnect: true })
110 }
111
112 if HeadData(encoded_header).hash() != self.receipt.descriptor.para_head() {
113 tracing::debug!(
114 target: LOG_TARGET,
115 "Receipt para head hash doesn't match the hash of the header in the block announcement",
116 );
117 return Err(Validation::Failure { disconnect: true })
118 }
119
120 Ok(())
121 }
122
123 async fn check_signature<RCInterface>(
127 self,
128 relay_chain_client: &RCInterface,
129 ) -> Result<Validation, BlockAnnounceError>
130 where
131 RCInterface: RelayChainInterface + 'static,
132 {
133 let validator_index = self.statement.unchecked_validator_index();
134
135 let session_index =
136 match relay_chain_client.session_index_for_child(self.relay_parent).await {
137 Ok(r) => r,
138 Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
139 };
140
141 let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
142
143 let authorities = match relay_chain_client.validators(self.relay_parent).await {
145 Ok(r) => r,
146 Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
147 };
148 let signer = match authorities.get(validator_index.0 as usize) {
149 Some(r) => r,
150 None => {
151 tracing::debug!(
152 target: LOG_TARGET,
153 "Block announcement justification signer is a validator index out of bound",
154 );
155
156 return Ok(Validation::Failure { disconnect: true })
157 },
158 };
159
160 if self.statement.try_into_checked(&signing_context, signer).is_err() {
162 tracing::debug!(
163 target: LOG_TARGET,
164 "Block announcement justification signature is invalid.",
165 );
166
167 return Ok(Validation::Failure { disconnect: true })
168 }
169
170 Ok(Validation::Success { is_new_best: true })
171 }
172}
173
174impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
175 type Error = ();
176
177 fn try_from(signal: &CollationSecondedSignal) -> Result<BlockAnnounceData, ()> {
178 let receipt = if let Statement::Seconded(receipt) = signal.statement.payload() {
179 receipt.to_plain()
180 } else {
181 return Err(())
182 };
183
184 Ok(BlockAnnounceData {
185 receipt,
186 statement: signal.statement.convert_payload().into(),
187 relay_parent: signal.relay_parent,
188 })
189 }
190}
191
192#[deprecated = "This has been renamed to RequireSecondedInBlockAnnounce"]
194pub type BlockAnnounceValidator<Block, RCInterface> =
195 RequireSecondedInBlockAnnounce<Block, RCInterface>;
196
197#[derive(Clone)]
229pub struct RequireSecondedInBlockAnnounce<Block, RCInterface> {
230 phantom: PhantomData<Block>,
231 relay_chain_interface: RCInterface,
232 para_id: ParaId,
233}
234
235impl<Block, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
236where
237 RCInterface: Clone,
238{
239 pub fn new(relay_chain_interface: RCInterface, para_id: ParaId) -> Self {
241 Self { phantom: Default::default(), relay_chain_interface, para_id }
242 }
243}
244
245impl<Block: BlockT, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
246where
247 RCInterface: RelayChainInterface + Clone,
248{
249 async fn included_block(
251 relay_chain_interface: &RCInterface,
252 hash: PHash,
253 para_id: ParaId,
254 ) -> Result<Block::Header, BoxedError> {
255 let validation_data = relay_chain_interface
256 .persisted_validation_data(hash, para_id, OccupiedCoreAssumption::TimedOut)
257 .await
258 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
259 .ok_or_else(|| {
260 Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
261 as Box<_>
262 })?;
263 let para_head =
264 Block::Header::decode(&mut &validation_data.parent_head.0[..]).map_err(|e| {
265 Box::new(BlockAnnounceError(format!("Failed to decode parachain head: {:?}", e)))
266 as Box<_>
267 })?;
268
269 Ok(para_head)
270 }
271
272 async fn backed_block_hashes(
274 relay_chain_interface: &RCInterface,
275 hash: PHash,
276 para_id: ParaId,
277 ) -> Result<impl Iterator<Item = PHash>, BoxedError> {
278 let runtime_api_version = relay_chain_interface
279 .version(hash)
280 .await
281 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
282 let parachain_host_runtime_api_version =
283 runtime_api_version
284 .api_version(
285 &<dyn polkadot_primitives::runtime_api::ParachainHost<
286 polkadot_primitives::Block,
287 >>::ID,
288 )
289 .unwrap_or_default();
290
291 let candidate_receipts = if parachain_host_runtime_api_version <
294 RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
295 {
296 #[allow(deprecated)]
297 relay_chain_interface
298 .candidate_pending_availability(hash, para_id)
299 .await
300 .map(|c| c.into_iter().collect::<Vec<_>>())
301 } else {
302 relay_chain_interface.candidates_pending_availability(hash, para_id).await
303 }
304 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
305
306 Ok(candidate_receipts.into_iter().map(|cr| cr.descriptor.para_head()))
307 }
308
309 async fn handle_empty_block_announce_data(
311 &self,
312 header: Block::Header,
313 ) -> Result<Validation, BoxedError> {
314 let relay_chain_interface = self.relay_chain_interface.clone();
315 let para_id = self.para_id;
316
317 let relay_chain_best_hash = relay_chain_interface
319 .best_block_hash()
320 .await
321 .map_err(|e| Box::new(e) as Box<_>)?;
322 let block_number = header.number();
323
324 let best_head =
325 Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?;
326 let known_best_number = best_head.number();
327
328 if best_head == header {
329 tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
330
331 return Ok(Validation::Success { is_new_best: true })
332 }
333
334 let mut backed_blocks =
335 Self::backed_block_hashes(&relay_chain_interface, relay_chain_best_hash, para_id)
336 .await?;
337
338 let head_hash = HeadData(header.encode()).hash();
339
340 if backed_blocks.any(|block_hash| block_hash == head_hash) {
341 tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
342
343 Ok(Validation::Success { is_new_best: true })
344 } else if block_number >= known_best_number {
345 tracing::debug!(
346 target: LOG_TARGET,
347 "Validation failed because a justification is needed if the block at the top of the chain."
348 );
349
350 Ok(Validation::Failure { disconnect: false })
351 } else {
352 Ok(Validation::Success { is_new_best: false })
353 }
354 }
355}
356
357impl<Block: BlockT, RCInterface> BlockAnnounceValidatorT<Block>
358 for RequireSecondedInBlockAnnounce<Block, RCInterface>
359where
360 RCInterface: RelayChainInterface + Clone + 'static,
361{
362 fn validate(
363 &mut self,
364 header: &Block::Header,
365 data: &[u8],
366 ) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
367 let relay_chain_interface = self.relay_chain_interface.clone();
368 let data = data.to_vec();
369 let header = header.clone();
370 let header_encoded = header.encode();
371 let block_announce_validator = self.clone();
372
373 async move {
374 let relay_chain_is_syncing = relay_chain_interface
375 .is_major_syncing()
376 .await
377 .map_err(
378 |e| tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e),
379 )
380 .unwrap_or(false);
381
382 if relay_chain_is_syncing {
383 return Ok(Validation::Success { is_new_best: false })
384 }
385
386 if data.is_empty() {
387 return block_announce_validator.handle_empty_block_announce_data(header).await
388 }
389
390 let block_announce_data = match BlockAnnounceData::decode_all(&mut data.as_slice()) {
391 Ok(r) => r,
392 Err(err) =>
393 return Err(Box::new(BlockAnnounceError(format!(
394 "Can not decode the `BlockAnnounceData`: {:?}",
395 err
396 ))) as Box<_>),
397 };
398
399 if let Err(e) = block_announce_data.validate(header_encoded) {
400 return Ok(e)
401 }
402
403 let relay_parent = block_announce_data.receipt.descriptor.relay_parent();
404
405 relay_chain_interface
406 .wait_for_block(relay_parent)
407 .await
408 .map_err(|e| Box::new(BlockAnnounceError(e.to_string())) as Box<_>)?;
409
410 block_announce_data
411 .check_signature(&relay_chain_interface)
412 .await
413 .map_err(|e| Box::new(e) as Box<_>)
414 }
415 .boxed()
416 }
417}
418
419pub struct WaitToAnnounce<Block: BlockT> {
425 spawner: Arc<dyn SpawnNamed + Send + Sync>,
426 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
427}
428
429impl<Block: BlockT> WaitToAnnounce<Block> {
430 pub fn new(
432 spawner: Arc<dyn SpawnNamed + Send + Sync>,
433 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
434 ) -> WaitToAnnounce<Block> {
435 WaitToAnnounce { spawner, announce_block }
436 }
437
438 pub fn wait_to_announce(
441 &mut self,
442 block_hash: <Block as BlockT>::Hash,
443 signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
444 ) {
445 let announce_block = self.announce_block.clone();
446
447 self.spawner.spawn(
448 "cumulus-wait-to-announce",
449 None,
450 async move {
451 tracing::debug!(
452 target: "cumulus-network",
453 "waiting for announce block in a background task...",
454 );
455
456 wait_to_announce::<Block>(block_hash, announce_block, signed_stmt_recv).await;
457
458 tracing::debug!(
459 target: "cumulus-network",
460 "block announcement finished",
461 );
462 }
463 .boxed(),
464 );
465 }
466}
467
468async fn wait_to_announce<Block: BlockT>(
469 block_hash: <Block as BlockT>::Hash,
470 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
471 signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
472) {
473 let signal = match signed_stmt_recv.await {
474 Ok(s) => s,
475 Err(_) => {
476 tracing::debug!(
477 target: "cumulus-network",
478 block = ?block_hash,
479 "Wait to announce stopped, because sender was dropped.",
480 );
481 return
482 },
483 };
484
485 if let Ok(data) = BlockAnnounceData::try_from(&signal) {
486 announce_block(block_hash, Some(data.encode()));
487 } else {
488 tracing::debug!(
489 target: "cumulus-network",
490 ?signal,
491 block = ?block_hash,
492 "Received invalid statement while waiting to announce block.",
493 );
494 }
495}
496
497#[derive(Debug, Clone)]
500pub struct AssumeSybilResistance(bool);
501
502impl AssumeSybilResistance {
503 pub fn allow_seconded_messages() -> Self {
510 AssumeSybilResistance(true)
511 }
512
513 pub fn reject_seconded_messages() -> Self {
516 AssumeSybilResistance(false)
517 }
518}
519
520impl<Block: BlockT> BlockAnnounceValidatorT<Block> for AssumeSybilResistance {
521 fn validate(
522 &mut self,
523 _header: &Block::Header,
524 data: &[u8],
525 ) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
526 let allow_seconded_messages = self.0;
527 let data = data.to_vec();
528
529 async move {
530 Ok(if data.is_empty() {
531 Validation::Success { is_new_best: false }
532 } else if !allow_seconded_messages {
533 Validation::Failure { disconnect: false }
534 } else {
535 match BlockAnnounceData::decode_all(&mut data.as_slice()) {
536 Ok(_) => Validation::Success { is_new_best: false },
537 Err(_) => Validation::Failure { disconnect: true },
538 }
539 })
540 }
541 .boxed()
542 }
543}