polkadot_node_core_bitfield_signing/
lib.rs1#![deny(unused_crate_dependencies)]
20#![warn(missing_docs)]
21#![recursion_limit = "256"]
22
23use futures::{
24 channel::{mpsc, oneshot},
25 future,
26 lock::Mutex,
27 FutureExt,
28};
29use polkadot_node_subsystem::{
30 messages::{AvailabilityStoreMessage, BitfieldDistributionMessage},
31 overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
32 SubsystemResult,
33};
34use polkadot_node_subsystem_util::{
35 self as util, request_availability_cores, runtime::recv_runtime, Validator,
36};
37use polkadot_primitives::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
38use sp_keystore::{Error as KeystoreError, KeystorePtr};
39use std::{collections::HashMap, time::Duration};
40use wasm_timer::{Delay, Instant};
41
42mod metrics;
43use self::metrics::Metrics;
44
45#[cfg(test)]
46mod tests;
47
48const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1500);
50const LOG_TARGET: &str = "parachain::bitfield-signing";
51
52#[derive(Debug, thiserror::Error)]
55#[allow(missing_docs)]
56pub enum Error {
57 #[error(transparent)]
58 Util(#[from] util::Error),
59
60 #[error(transparent)]
61 Io(#[from] std::io::Error),
62
63 #[error(transparent)]
64 Oneshot(#[from] oneshot::Canceled),
65
66 #[error(transparent)]
67 MpscSend(#[from] mpsc::SendError),
68
69 #[error(transparent)]
70 Runtime(#[from] util::runtime::Error),
71
72 #[error("Keystore failed: {0:?}")]
73 Keystore(KeystoreError),
74}
75
76async fn get_core_availability(
79 core: &CoreState,
80 validator_index: ValidatorIndex,
81 sender: &Mutex<&mut impl overseer::BitfieldSigningSenderTrait>,
82) -> Result<bool, Error> {
83 if let CoreState::Occupied(core) = core {
84 let (tx, rx) = oneshot::channel();
85 sender
86 .lock()
87 .await
88 .send_message(AvailabilityStoreMessage::QueryChunkAvailability(
89 core.candidate_hash,
90 validator_index,
91 tx,
92 ))
93 .await;
94
95 let res = rx.await.map_err(Into::into);
96
97 gum::trace!(
98 target: LOG_TARGET,
99 para_id = %core.para_id(),
100 availability = ?res,
101 ?core.candidate_hash,
102 "Candidate availability",
103 );
104
105 res
106 } else {
107 Ok(false)
108 }
109}
110
111async fn construct_availability_bitfield(
116 relay_parent: Hash,
117 validator_idx: ValidatorIndex,
118 sender: &mut impl overseer::BitfieldSigningSenderTrait,
119) -> Result<AvailabilityBitfield, Error> {
120 let availability_cores =
122 { recv_runtime(request_availability_cores(relay_parent, sender).await).await? };
123
124 let sender = Mutex::new(sender);
130
131 let results = future::try_join_all(
134 availability_cores
135 .iter()
136 .map(|core| get_core_availability(core, validator_idx, &sender)),
137 )
138 .await?;
139
140 let core_bits = FromIterator::from_iter(results.into_iter());
141 gum::debug!(
142 target: LOG_TARGET,
143 ?relay_parent,
144 "Signing Bitfield for {core_count} cores: {core_bits}",
145 core_count = availability_cores.len(),
146 core_bits = core_bits,
147 );
148
149 Ok(AvailabilityBitfield(core_bits))
150}
151
152pub struct BitfieldSigningSubsystem {
154 keystore: KeystorePtr,
155 metrics: Metrics,
156}
157
158impl BitfieldSigningSubsystem {
159 pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
161 Self { keystore, metrics }
162 }
163}
164
165#[overseer::subsystem(BitfieldSigning, error=SubsystemError, prefix=self::overseer)]
166impl<Context> BitfieldSigningSubsystem {
167 fn start(self, ctx: Context) -> SpawnedSubsystem {
168 let future = async move {
169 run(ctx, self.keystore, self.metrics)
170 .await
171 .map_err(|e| SubsystemError::with_origin("bitfield-signing", e))
172 }
173 .boxed();
174
175 SpawnedSubsystem { name: "bitfield-signing-subsystem", future }
176 }
177}
178
179#[overseer::contextbounds(BitfieldSigning, prefix = self::overseer)]
180async fn run<Context>(
181 mut ctx: Context,
182 keystore: KeystorePtr,
183 metrics: Metrics,
184) -> SubsystemResult<()> {
185 let mut running = HashMap::<Hash, future::AbortHandle>::new();
187
188 loop {
189 match ctx.recv().await? {
190 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
191 for leaf in &update.deactivated {
193 if let Some(handle) = running.remove(leaf) {
194 handle.abort();
195 }
196 }
197
198 if let Some(leaf) = update.activated {
199 let sender = ctx.sender().clone();
200 let leaf_hash = leaf.hash;
201
202 let (fut, handle) = future::abortable(handle_active_leaves_update(
203 sender,
204 leaf,
205 keystore.clone(),
206 metrics.clone(),
207 ));
208
209 running.insert(leaf_hash, handle);
210
211 ctx.spawn("bitfield-signing-job", fut.map(drop).boxed())?;
212 }
213 },
214 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
215 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
216 FromOrchestra::Communication { .. } => {},
217 }
218 }
219}
220
221async fn handle_active_leaves_update<Sender>(
222 mut sender: Sender,
223 leaf: ActivatedLeaf,
224 keystore: KeystorePtr,
225 metrics: Metrics,
226) -> Result<(), Error>
227where
228 Sender: overseer::BitfieldSigningSenderTrait,
229{
230 let wait_until = Instant::now() + SPAWNED_TASK_DELAY;
231
232 let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
235 Ok(validator) => validator,
236 Err(util::Error::NotAValidator) => return Ok(()),
237 Err(err) => return Err(Error::Util(err)),
238 };
239
240 Delay::new_at(wait_until).await?;
242
243 let _timer = metrics.time_run();
246
247 let bitfield =
248 match construct_availability_bitfield(leaf.hash, validator.index(), &mut sender).await {
249 Err(Error::Runtime(runtime_err)) => {
250 gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
252 return Ok(())
253 },
254 Err(err) => return Err(err),
255 Ok(bitfield) => bitfield,
256 };
257
258 let signed_bitfield =
259 match validator.sign(keystore, bitfield).map_err(|e| Error::Keystore(e))? {
260 Some(b) => b,
261 None => {
262 gum::error!(
263 target: LOG_TARGET,
264 "Key was found at construction, but while signing it could not be found.",
265 );
266 return Ok(())
267 },
268 };
269
270 metrics.on_bitfield_signed();
271
272 sender
273 .send_message(BitfieldDistributionMessage::DistributeBitfield(leaf.hash, signed_bitfield))
274 .await;
275
276 Ok(())
277}