referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_bitfield_signing/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block.
18
19#![deny(unused_crate_dependencies)]
20#![warn(missing_docs)]
21#![recursion_limit = "256"]
22
23use futures::{
24	channel::{mpsc, oneshot},
25	future,
26	lock::Mutex,
27	FutureExt,
28};
29use polkadot_node_subsystem::{
30	messages::{AvailabilityStoreMessage, BitfieldDistributionMessage},
31	overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
32	SubsystemResult,
33};
34use polkadot_node_subsystem_util::{
35	self as util, request_availability_cores, runtime::recv_runtime, Validator,
36};
37use polkadot_primitives::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
38use sp_keystore::{Error as KeystoreError, KeystorePtr};
39use std::{collections::HashMap, time::Duration};
40use wasm_timer::{Delay, Instant};
41
42mod metrics;
43use self::metrics::Metrics;
44
45#[cfg(test)]
46mod tests;
47
48/// Delay between starting a bitfield signing job and its attempting to create a bitfield.
49const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1500);
50const LOG_TARGET: &str = "parachain::bitfield-signing";
51
52// TODO: use `fatality` (https://github.com/paritytech/polkadot/issues/5540).
53/// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`.
54#[derive(Debug, thiserror::Error)]
55#[allow(missing_docs)]
56pub enum Error {
57	#[error(transparent)]
58	Util(#[from] util::Error),
59
60	#[error(transparent)]
61	Io(#[from] std::io::Error),
62
63	#[error(transparent)]
64	Oneshot(#[from] oneshot::Canceled),
65
66	#[error(transparent)]
67	MpscSend(#[from] mpsc::SendError),
68
69	#[error(transparent)]
70	Runtime(#[from] util::runtime::Error),
71
72	#[error("Keystore failed: {0:?}")]
73	Keystore(KeystoreError),
74}
75
76/// If there is a candidate pending availability, query the Availability Store
77/// for whether we have the availability chunk for our validator index.
78async fn get_core_availability(
79	core: &CoreState,
80	validator_index: ValidatorIndex,
81	sender: &Mutex<&mut impl overseer::BitfieldSigningSenderTrait>,
82) -> Result<bool, Error> {
83	if let CoreState::Occupied(core) = core {
84		let (tx, rx) = oneshot::channel();
85		sender
86			.lock()
87			.await
88			.send_message(AvailabilityStoreMessage::QueryChunkAvailability(
89				core.candidate_hash,
90				validator_index,
91				tx,
92			))
93			.await;
94
95		let res = rx.await.map_err(Into::into);
96
97		gum::trace!(
98			target: LOG_TARGET,
99			para_id = %core.para_id(),
100			availability = ?res,
101			?core.candidate_hash,
102			"Candidate availability",
103		);
104
105		res
106	} else {
107		Ok(false)
108	}
109}
110
111/// - get the list of core states from the runtime
112/// - for each core, concurrently determine chunk availability (see `get_core_availability`)
113/// - return the bitfield if there were no errors at any point in this process (otherwise, it's
114///   prone to false negatives)
115async fn construct_availability_bitfield(
116	relay_parent: Hash,
117	validator_idx: ValidatorIndex,
118	sender: &mut impl overseer::BitfieldSigningSenderTrait,
119) -> Result<AvailabilityBitfield, Error> {
120	// get the set of availability cores from the runtime
121	let availability_cores =
122		{ recv_runtime(request_availability_cores(relay_parent, sender).await).await? };
123
124	// Wrap the sender in a Mutex to share it between the futures.
125	//
126	// We use a `Mutex` here to not `clone` the sender inside the future, because
127	// cloning the sender will always increase the capacity of the channel by one.
128	// (for the lifetime of the sender)
129	let sender = Mutex::new(sender);
130
131	// Handle all cores concurrently
132	// `try_join_all` returns all results in the same order as the input futures.
133	let results = future::try_join_all(
134		availability_cores
135			.iter()
136			.map(|core| get_core_availability(core, validator_idx, &sender)),
137	)
138	.await?;
139
140	let core_bits = FromIterator::from_iter(results.into_iter());
141	gum::debug!(
142		target: LOG_TARGET,
143		?relay_parent,
144		"Signing Bitfield for {core_count} cores: {core_bits}",
145		core_count = availability_cores.len(),
146		core_bits = core_bits,
147	);
148
149	Ok(AvailabilityBitfield(core_bits))
150}
151
152/// The bitfield signing subsystem.
153pub struct BitfieldSigningSubsystem {
154	keystore: KeystorePtr,
155	metrics: Metrics,
156}
157
158impl BitfieldSigningSubsystem {
159	/// Create a new instance of the `BitfieldSigningSubsystem`.
160	pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
161		Self { keystore, metrics }
162	}
163}
164
165#[overseer::subsystem(BitfieldSigning, error=SubsystemError, prefix=self::overseer)]
166impl<Context> BitfieldSigningSubsystem {
167	fn start(self, ctx: Context) -> SpawnedSubsystem {
168		let future = async move {
169			run(ctx, self.keystore, self.metrics)
170				.await
171				.map_err(|e| SubsystemError::with_origin("bitfield-signing", e))
172		}
173		.boxed();
174
175		SpawnedSubsystem { name: "bitfield-signing-subsystem", future }
176	}
177}
178
179#[overseer::contextbounds(BitfieldSigning, prefix = self::overseer)]
180async fn run<Context>(
181	mut ctx: Context,
182	keystore: KeystorePtr,
183	metrics: Metrics,
184) -> SubsystemResult<()> {
185	// Track spawned jobs per active leaf.
186	let mut running = HashMap::<Hash, future::AbortHandle>::new();
187
188	loop {
189		match ctx.recv().await? {
190			FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
191				// Abort jobs for deactivated leaves.
192				for leaf in &update.deactivated {
193					if let Some(handle) = running.remove(leaf) {
194						handle.abort();
195					}
196				}
197
198				if let Some(leaf) = update.activated {
199					let sender = ctx.sender().clone();
200					let leaf_hash = leaf.hash;
201
202					let (fut, handle) = future::abortable(handle_active_leaves_update(
203						sender,
204						leaf,
205						keystore.clone(),
206						metrics.clone(),
207					));
208
209					running.insert(leaf_hash, handle);
210
211					ctx.spawn("bitfield-signing-job", fut.map(drop).boxed())?;
212				}
213			},
214			FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
215			FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
216			FromOrchestra::Communication { .. } => {},
217		}
218	}
219}
220
221async fn handle_active_leaves_update<Sender>(
222	mut sender: Sender,
223	leaf: ActivatedLeaf,
224	keystore: KeystorePtr,
225	metrics: Metrics,
226) -> Result<(), Error>
227where
228	Sender: overseer::BitfieldSigningSenderTrait,
229{
230	let wait_until = Instant::now() + SPAWNED_TASK_DELAY;
231
232	// now do all the work we can before we need to wait for the availability store
233	// if we're not a validator, we can just succeed effortlessly
234	let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
235		Ok(validator) => validator,
236		Err(util::Error::NotAValidator) => return Ok(()),
237		Err(err) => return Err(Error::Util(err)),
238	};
239
240	// wait a bit before doing anything else
241	Delay::new_at(wait_until).await?;
242
243	// this timer does not appear at the head of the function because we don't want to include
244	// SPAWNED_TASK_DELAY each time.
245	let _timer = metrics.time_run();
246
247	let bitfield =
248		match construct_availability_bitfield(leaf.hash, validator.index(), &mut sender).await {
249			Err(Error::Runtime(runtime_err)) => {
250				// Don't take down the node on runtime API errors.
251				gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
252				return Ok(())
253			},
254			Err(err) => return Err(err),
255			Ok(bitfield) => bitfield,
256		};
257
258	let signed_bitfield =
259		match validator.sign(keystore, bitfield).map_err(|e| Error::Keystore(e))? {
260			Some(b) => b,
261			None => {
262				gum::error!(
263					target: LOG_TARGET,
264					"Key was found at construction, but while signing it could not be found.",
265				);
266				return Ok(())
267			},
268		};
269
270	metrics.on_bitfield_signed();
271
272	sender
273		.send_message(BitfieldDistributionMessage::DistributeBitfield(leaf.hash, signed_bitfield))
274		.await;
275
276	Ok(())
277}