referrerpolicy=no-referrer-when-downgrade

cumulus_client_network/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Parachain specific networking
19//!
20//! Provides a custom block announcement implementation for parachains
21//! that use the relay chain provided consensus. See [`RequireSecondedInBlockAnnounce`]
22//! and [`WaitToAnnounce`] for more information about this implementation.
23
24use sp_api::RuntimeApiInfo;
25use sp_consensus::block_validation::{
26	BlockAnnounceValidator as BlockAnnounceValidatorT, Validation,
27};
28use sp_core::traits::SpawnNamed;
29use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
30
31use cumulus_relay_chain_interface::RelayChainInterface;
32use polkadot_node_primitives::{CollationSecondedSignal, Statement};
33use polkadot_node_subsystem::messages::RuntimeApiRequest;
34use polkadot_parachain_primitives::primitives::HeadData;
35use polkadot_primitives::{
36	CandidateReceiptV2 as CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId,
37	OccupiedCoreAssumption, SigningContext, UncheckedSigned,
38};
39
40use codec::{Decode, DecodeAll, Encode};
41use futures::{channel::oneshot, future::FutureExt, Future};
42use std::{fmt, marker::PhantomData, pin::Pin, sync::Arc};
43
44#[cfg(test)]
45mod tests;
46
47const LOG_TARGET: &str = "sync::cumulus";
48
49type BoxedError = Box<dyn std::error::Error + Send>;
50
51#[derive(Debug)]
52struct BlockAnnounceError(String);
53impl std::error::Error for BlockAnnounceError {}
54
55impl fmt::Display for BlockAnnounceError {
56	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57		self.0.fmt(f)
58	}
59}
60
61/// The data that we attach to a block announcement.
62///
63/// This will be used to prove that a header belongs to a block that is probably being backed by
64/// the relay chain.
65#[derive(Encode, Debug)]
66pub struct BlockAnnounceData {
67	/// The receipt identifying the candidate.
68	receipt: CandidateReceipt,
69	/// The seconded statement issued by a relay chain validator that approves the candidate.
70	statement: UncheckedSigned<CompactStatement>,
71	/// The relay parent that was used as context to sign the [`Self::statement`].
72	relay_parent: PHash,
73}
74
75impl Decode for BlockAnnounceData {
76	fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
77		let receipt = CandidateReceipt::decode(input)?;
78		let statement = UncheckedSigned::<CompactStatement>::decode(input)?;
79
80		let relay_parent = match PHash::decode(input) {
81			Ok(p) => p,
82			// For being backwards compatible, we support missing relay-chain parent.
83			Err(_) => receipt.descriptor.relay_parent(),
84		};
85
86		Ok(Self { receipt, statement, relay_parent })
87	}
88}
89
90impl BlockAnnounceData {
91	/// Validate that the receipt, statement and announced header match.
92	///
93	/// This will not check the signature, for this you should use
94	/// [`BlockAnnounceData::check_signature`].
95	fn validate(&self, encoded_header: Vec<u8>) -> Result<(), Validation> {
96		let candidate_hash =
97			if let CompactStatement::Seconded(h) = self.statement.unchecked_payload() {
98				h
99			} else {
100				tracing::debug!(target: LOG_TARGET, "`CompactStatement` isn't the candidate variant!",);
101				return Err(Validation::Failure { disconnect: true })
102			};
103
104		if *candidate_hash != self.receipt.hash() {
105			tracing::debug!(
106				target: LOG_TARGET,
107				"Receipt candidate hash doesn't match candidate hash in statement",
108			);
109			return Err(Validation::Failure { disconnect: true })
110		}
111
112		if HeadData(encoded_header).hash() != self.receipt.descriptor.para_head() {
113			tracing::debug!(
114				target: LOG_TARGET,
115				"Receipt para head hash doesn't match the hash of the header in the block announcement",
116			);
117			return Err(Validation::Failure { disconnect: true })
118		}
119
120		Ok(())
121	}
122
123	/// Check the signature of the statement.
124	///
125	/// Returns an `Err(_)` if it failed.
126	async fn check_signature<RCInterface>(
127		self,
128		relay_chain_client: &RCInterface,
129	) -> Result<Validation, BlockAnnounceError>
130	where
131		RCInterface: RelayChainInterface + 'static,
132	{
133		let validator_index = self.statement.unchecked_validator_index();
134
135		let session_index =
136			match relay_chain_client.session_index_for_child(self.relay_parent).await {
137				Ok(r) => r,
138				Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
139			};
140
141		let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
142
143		// Check that the signer is a legit validator.
144		let authorities = match relay_chain_client.validators(self.relay_parent).await {
145			Ok(r) => r,
146			Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
147		};
148		let signer = match authorities.get(validator_index.0 as usize) {
149			Some(r) => r,
150			None => {
151				tracing::debug!(
152					target: LOG_TARGET,
153					"Block announcement justification signer is a validator index out of bound",
154				);
155
156				return Ok(Validation::Failure { disconnect: true })
157			},
158		};
159
160		// Check statement is correctly signed.
161		if self.statement.try_into_checked(&signing_context, signer).is_err() {
162			tracing::debug!(
163				target: LOG_TARGET,
164				"Block announcement justification signature is invalid.",
165			);
166
167			return Ok(Validation::Failure { disconnect: true })
168		}
169
170		Ok(Validation::Success { is_new_best: true })
171	}
172}
173
174impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
175	type Error = ();
176
177	fn try_from(signal: &CollationSecondedSignal) -> Result<BlockAnnounceData, ()> {
178		let receipt = if let Statement::Seconded(receipt) = signal.statement.payload() {
179			receipt.to_plain()
180		} else {
181			return Err(())
182		};
183
184		Ok(BlockAnnounceData {
185			receipt,
186			statement: signal.statement.convert_payload().into(),
187			relay_parent: signal.relay_parent,
188		})
189	}
190}
191
192/// A type alias for the [`RequireSecondedInBlockAnnounce`] validator.
193#[deprecated = "This has been renamed to RequireSecondedInBlockAnnounce"]
194pub type BlockAnnounceValidator<Block, RCInterface> =
195	RequireSecondedInBlockAnnounce<Block, RCInterface>;
196
197/// Parachain specific block announce validator.
198///
199/// This is not required when the collation mechanism itself is sybil-resistant, as it is a spam
200/// protection mechanism used to prevent nodes from dealing with unbounded numbers of blocks. For
201/// sybil-resistant collation mechanisms, this will only slow things down.
202///
203/// This block announce validator is required if the parachain is running
204/// with the relay chain provided consensus to make sure each node only
205/// imports a reasonable number of blocks per round. The relay chain provided
206/// consensus doesn't have any authorities and so it could happen that without
207/// this special block announce validator a node would need to import *millions*
208/// of blocks per round, which is clearly not doable.
209///
210/// To solve this problem, each block announcement is delayed until a collator
211/// has received a [`Statement::Seconded`] for its `PoV`. This message tells the
212/// collator that its `PoV` was validated successfully by a parachain validator and
213/// that it is very likely that this `PoV` will be included in the relay chain. Every
214/// collator that doesn't receive the message for its `PoV` will not announce its block.
215/// For more information on the block announcement, see [`WaitToAnnounce`].
216///
217/// For each block announcement that is received, the generic block announcement validation
218/// will call this validator and provides the extra data that was attached to the announcement.
219/// We call this extra data `justification`.
220/// It is expected that the attached data is a SCALE encoded [`BlockAnnounceData`]. The
221/// statement is checked to be a [`CompactStatement::Seconded`] and that it is signed by an active
222/// parachain validator.
223///
224/// If no justification was provided we check if the block announcement is at the tip of the known
225/// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
226/// it. However, if the announcement is for a block below the tip the announcement is accepted
227/// as it probably comes from a node that is currently syncing the chain.
228#[derive(Clone)]
229pub struct RequireSecondedInBlockAnnounce<Block, RCInterface> {
230	phantom: PhantomData<Block>,
231	relay_chain_interface: RCInterface,
232	para_id: ParaId,
233}
234
235impl<Block, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
236where
237	RCInterface: Clone,
238{
239	/// Create a new [`RequireSecondedInBlockAnnounce`].
240	pub fn new(relay_chain_interface: RCInterface, para_id: ParaId) -> Self {
241		Self { phantom: Default::default(), relay_chain_interface, para_id }
242	}
243}
244
245impl<Block: BlockT, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
246where
247	RCInterface: RelayChainInterface + Clone,
248{
249	/// Get the included block of the given parachain in the relay chain.
250	async fn included_block(
251		relay_chain_interface: &RCInterface,
252		hash: PHash,
253		para_id: ParaId,
254	) -> Result<Block::Header, BoxedError> {
255		let validation_data = relay_chain_interface
256			.persisted_validation_data(hash, para_id, OccupiedCoreAssumption::TimedOut)
257			.await
258			.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
259			.ok_or_else(|| {
260				Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
261					as Box<_>
262			})?;
263		let para_head =
264			Block::Header::decode(&mut &validation_data.parent_head.0[..]).map_err(|e| {
265				Box::new(BlockAnnounceError(format!("Failed to decode parachain head: {:?}", e)))
266					as Box<_>
267			})?;
268
269		Ok(para_head)
270	}
271
272	/// Get the backed block hashes of the given parachain in the relay chain.
273	async fn backed_block_hashes(
274		relay_chain_interface: &RCInterface,
275		hash: PHash,
276		para_id: ParaId,
277	) -> Result<impl Iterator<Item = PHash>, BoxedError> {
278		let runtime_api_version = relay_chain_interface
279			.version(hash)
280			.await
281			.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
282		let parachain_host_runtime_api_version =
283			runtime_api_version
284				.api_version(
285					&<dyn polkadot_primitives::runtime_api::ParachainHost<
286						polkadot_primitives::Block,
287					>>::ID,
288				)
289				.unwrap_or_default();
290
291		// If the relay chain runtime does not support the new runtime API, fallback to the
292		// deprecated one.
293		let candidate_receipts = if parachain_host_runtime_api_version <
294			RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
295		{
296			#[allow(deprecated)]
297			relay_chain_interface
298				.candidate_pending_availability(hash, para_id)
299				.await
300				.map(|c| c.into_iter().collect::<Vec<_>>())
301		} else {
302			relay_chain_interface.candidates_pending_availability(hash, para_id).await
303		}
304		.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
305
306		Ok(candidate_receipts.into_iter().map(|cr| cr.descriptor.para_head()))
307	}
308
309	/// Handle a block announcement with empty data (no statement) attached to it.
310	async fn handle_empty_block_announce_data(
311		&self,
312		header: Block::Header,
313	) -> Result<Validation, BoxedError> {
314		let relay_chain_interface = self.relay_chain_interface.clone();
315		let para_id = self.para_id;
316
317		// Check if block is equal or higher than best (this requires a justification)
318		let relay_chain_best_hash = relay_chain_interface
319			.best_block_hash()
320			.await
321			.map_err(|e| Box::new(e) as Box<_>)?;
322		let block_number = header.number();
323
324		let best_head =
325			Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?;
326		let known_best_number = best_head.number();
327
328		if best_head == header {
329			tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
330
331			return Ok(Validation::Success { is_new_best: true })
332		}
333
334		let mut backed_blocks =
335			Self::backed_block_hashes(&relay_chain_interface, relay_chain_best_hash, para_id)
336				.await?;
337
338		let head_hash = HeadData(header.encode()).hash();
339
340		if backed_blocks.any(|block_hash| block_hash == head_hash) {
341			tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
342
343			Ok(Validation::Success { is_new_best: true })
344		} else if block_number >= known_best_number {
345			tracing::debug!(
346				target: LOG_TARGET,
347				"Validation failed because a justification is needed if the block at the top of the chain."
348			);
349
350			Ok(Validation::Failure { disconnect: false })
351		} else {
352			Ok(Validation::Success { is_new_best: false })
353		}
354	}
355}
356
357impl<Block: BlockT, RCInterface> BlockAnnounceValidatorT<Block>
358	for RequireSecondedInBlockAnnounce<Block, RCInterface>
359where
360	RCInterface: RelayChainInterface + Clone + 'static,
361{
362	fn validate(
363		&mut self,
364		header: &Block::Header,
365		data: &[u8],
366	) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
367		let relay_chain_interface = self.relay_chain_interface.clone();
368		let data = data.to_vec();
369		let header = header.clone();
370		let header_encoded = header.encode();
371		let block_announce_validator = self.clone();
372
373		async move {
374			let relay_chain_is_syncing = relay_chain_interface
375				.is_major_syncing()
376				.await
377				.map_err(
378					|e| tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e),
379				)
380				.unwrap_or(false);
381
382			if relay_chain_is_syncing {
383				return Ok(Validation::Success { is_new_best: false })
384			}
385
386			if data.is_empty() {
387				return block_announce_validator.handle_empty_block_announce_data(header).await
388			}
389
390			let block_announce_data = match BlockAnnounceData::decode_all(&mut data.as_slice()) {
391				Ok(r) => r,
392				Err(err) =>
393					return Err(Box::new(BlockAnnounceError(format!(
394						"Can not decode the `BlockAnnounceData`: {:?}",
395						err
396					))) as Box<_>),
397			};
398
399			if let Err(e) = block_announce_data.validate(header_encoded) {
400				return Ok(e)
401			}
402
403			let relay_parent = block_announce_data.receipt.descriptor.relay_parent();
404
405			relay_chain_interface
406				.wait_for_block(relay_parent)
407				.await
408				.map_err(|e| Box::new(BlockAnnounceError(e.to_string())) as Box<_>)?;
409
410			block_announce_data
411				.check_signature(&relay_chain_interface)
412				.await
413				.map_err(|e| Box::new(e) as Box<_>)
414		}
415		.boxed()
416	}
417}
418
419/// Wait before announcing a block that a candidate message has been received for this block, then
420/// add this message as justification for the block announcement.
421///
422/// This object will spawn a new task every time the method `wait_to_announce` is called and cancel
423/// the previous task running.
424pub struct WaitToAnnounce<Block: BlockT> {
425	spawner: Arc<dyn SpawnNamed + Send + Sync>,
426	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
427}
428
429impl<Block: BlockT> WaitToAnnounce<Block> {
430	/// Create the `WaitToAnnounce` object
431	pub fn new(
432		spawner: Arc<dyn SpawnNamed + Send + Sync>,
433		announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
434	) -> WaitToAnnounce<Block> {
435		WaitToAnnounce { spawner, announce_block }
436	}
437
438	/// Wait for a candidate message for the block, then announce the block. The candidate
439	/// message will be added as justification to the block announcement.
440	pub fn wait_to_announce(
441		&mut self,
442		block_hash: <Block as BlockT>::Hash,
443		signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
444	) {
445		let announce_block = self.announce_block.clone();
446
447		self.spawner.spawn(
448			"cumulus-wait-to-announce",
449			None,
450			async move {
451				tracing::debug!(
452					target: "cumulus-network",
453					"waiting for announce block in a background task...",
454				);
455
456				wait_to_announce::<Block>(block_hash, announce_block, signed_stmt_recv).await;
457
458				tracing::debug!(
459					target: "cumulus-network",
460					"block announcement finished",
461				);
462			}
463			.boxed(),
464		);
465	}
466}
467
468async fn wait_to_announce<Block: BlockT>(
469	block_hash: <Block as BlockT>::Hash,
470	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
471	signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
472) {
473	let signal = match signed_stmt_recv.await {
474		Ok(s) => s,
475		Err(_) => {
476			tracing::debug!(
477				target: "cumulus-network",
478				block = ?block_hash,
479				"Wait to announce stopped, because sender was dropped.",
480			);
481			return
482		},
483	};
484
485	if let Ok(data) = BlockAnnounceData::try_from(&signal) {
486		announce_block(block_hash, Some(data.encode()));
487	} else {
488		tracing::debug!(
489			target: "cumulus-network",
490			?signal,
491			block = ?block_hash,
492			"Received invalid statement while waiting to announce block.",
493		);
494	}
495}
496
497/// A [`BlockAnnounceValidator`] which accepts all block announcements, as it assumes
498/// sybil resistance is handled elsewhere.
499#[derive(Debug, Clone)]
500pub struct AssumeSybilResistance(bool);
501
502impl AssumeSybilResistance {
503	/// Instantiate this block announcement validator while permissively allowing (but ignoring)
504	/// announcements which come tagged with seconded messages.
505	///
506	/// This is useful for backwards compatibility when upgrading nodes: old nodes will continue
507	/// to broadcast announcements with seconded messages, so these announcements shouldn't be
508	/// rejected and the peers not punished.
509	pub fn allow_seconded_messages() -> Self {
510		AssumeSybilResistance(true)
511	}
512
513	/// Instantiate this block announcement validator while rejecting announcements that come with
514	/// data.
515	pub fn reject_seconded_messages() -> Self {
516		AssumeSybilResistance(false)
517	}
518}
519
520impl<Block: BlockT> BlockAnnounceValidatorT<Block> for AssumeSybilResistance {
521	fn validate(
522		&mut self,
523		_header: &Block::Header,
524		data: &[u8],
525	) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
526		let allow_seconded_messages = self.0;
527		let data = data.to_vec();
528
529		async move {
530			Ok(if data.is_empty() {
531				Validation::Success { is_new_best: false }
532			} else if !allow_seconded_messages {
533				Validation::Failure { disconnect: false }
534			} else {
535				match BlockAnnounceData::decode_all(&mut data.as_slice()) {
536					Ok(_) => Validation::Success { is_new_best: false },
537					Err(_) => Validation::Failure { disconnect: true },
538				}
539			})
540		}
541		.boxed()
542	}
543}