polkadot_availability_distribution/requester/
mod.rs1use std::{
21 collections::{hash_map::HashMap, hash_set::HashSet},
22 iter::IntoIterator,
23 pin::Pin,
24};
25
26use futures::{
27 channel::{mpsc, oneshot},
28 task::{Context, Poll},
29 Stream,
30};
31
32use polkadot_node_network_protocol::request_response::{v1, v2, IsRequest, ReqProtocolNames};
33use polkadot_node_subsystem::{
34 messages::{ChainApiMessage, RuntimeApiMessage},
35 overseer, ActivatedLeaf, ActiveLeavesUpdate,
36};
37use polkadot_node_subsystem_util::{
38 availability_chunks::availability_chunk_index,
39 runtime::{get_occupied_cores, RuntimeInfo},
40};
41use polkadot_primitives::{CandidateHash, CoreIndex, Hash, OccupiedCore, SessionIndex};
42
43use super::{FatalError, Metrics, Result, LOG_TARGET};
44
45#[cfg(test)]
46mod tests;
47
48mod session_cache;
50use session_cache::SessionCache;
51
52mod fetch_task;
54use fetch_task::{FetchTask, FetchTaskConfig, FromFetchTask};
55
56pub struct Requester {
61 fetches: HashMap<CandidateHash, FetchTask>,
68
69 session_cache: SessionCache,
71
72 tx: mpsc::Sender<FromFetchTask>,
74
75 rx: mpsc::Receiver<FromFetchTask>,
77
78 metrics: Metrics,
80
81 req_protocol_names: ReqProtocolNames,
83}
84
85#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
86impl Requester {
87 pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3;
89
90 pub fn new(req_protocol_names: ReqProtocolNames, metrics: Metrics) -> Self {
95 let (tx, rx) = mpsc::channel(1);
96 Requester {
97 fetches: HashMap::new(),
98 session_cache: SessionCache::new(),
99 tx,
100 rx,
101 metrics,
102 req_protocol_names,
103 }
104 }
105
106 pub async fn update_fetching_heads<Context>(
110 &mut self,
111 ctx: &mut Context,
112 runtime: &mut RuntimeInfo,
113 update: ActiveLeavesUpdate,
114 ) -> Result<()> {
115 gum::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
116 let ActiveLeavesUpdate { activated, deactivated } = update;
117 if let Some(leaf) = activated {
118 self.start_requesting_chunks(ctx, runtime, leaf).await?;
121 }
122
123 self.stop_requesting_chunks(deactivated.into_iter());
124 Ok(())
125 }
126
127 async fn start_requesting_chunks<Context>(
132 &mut self,
133 ctx: &mut Context,
134 runtime: &mut RuntimeInfo,
135 new_head: ActivatedLeaf,
136 ) -> Result<()> {
137 let sender = &mut ctx.sender().clone();
138 let ActivatedLeaf { hash: leaf, .. } = new_head;
139 let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session(
140 sender,
141 runtime,
142 leaf,
143 Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
144 )
145 .await?;
146
147 for hash in std::iter::once(leaf).chain(ancestors_in_session) {
149 let cores = get_occupied_cores(sender, hash).await?;
150 gum::trace!(
151 target: LOG_TARGET,
152 occupied_cores = ?cores,
153 "Query occupied core"
154 );
155 self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?;
163 }
164
165 Ok(())
166 }
167
168 fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator<Item = Hash>) {
170 let obsolete_leaves: HashSet<_> = obsolete_leaves.collect();
171 self.fetches.retain(|_, task| {
172 task.remove_leaves(&obsolete_leaves);
173 task.is_live()
174 })
175 }
176
177 async fn add_cores<Context>(
185 &mut self,
186 context: &mut Context,
187 runtime: &mut RuntimeInfo,
188 leaf: Hash,
189 leaf_session_index: SessionIndex,
190 cores: impl IntoIterator<Item = (CoreIndex, OccupiedCore)>,
191 ) -> Result<()> {
192 for (core_index, core) in cores {
193 if let Some(e) = self.fetches.get_mut(&core.candidate_hash) {
194 e.add_leaf(leaf);
196 } else {
197 let tx = self.tx.clone();
198 let metrics = self.metrics.clone();
199
200 let session_info = self
201 .session_cache
202 .get_session_info(
203 context,
204 runtime,
205 leaf,
210 leaf_session_index,
211 )
212 .await
213 .map_err(|err| {
214 gum::warn!(
215 target: LOG_TARGET,
216 error = ?err,
217 "Failed to spawn a fetch task"
218 );
219 err
220 })?;
221
222 if let Some(session_info) = session_info {
223 let n_validators =
224 session_info.validator_groups.iter().fold(0usize, |mut acc, group| {
225 acc = acc.saturating_add(group.len());
226 acc
227 });
228 let chunk_index = availability_chunk_index(
229 session_info.node_features.as_ref(),
230 n_validators,
231 core_index,
232 session_info.our_index,
233 )?;
234
235 let task_cfg = FetchTaskConfig::new(
236 leaf,
237 &core,
238 tx,
239 metrics,
240 session_info,
241 chunk_index,
242 self.req_protocol_names.get_name(v1::ChunkFetchingRequest::PROTOCOL),
243 self.req_protocol_names.get_name(v2::ChunkFetchingRequest::PROTOCOL),
244 );
245
246 self.fetches
247 .insert(core.candidate_hash, FetchTask::start(task_cfg, context).await?);
248 }
249 }
250 }
251 Ok(())
252 }
253}
254
255impl Stream for Requester {
256 type Item = overseer::AvailabilityDistributionOutgoingMessages;
257
258 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
259 loop {
260 match Pin::new(&mut self.rx).poll_next(ctx) {
261 Poll::Ready(Some(FromFetchTask::Message(m))) => return Poll::Ready(Some(m)),
262 Poll::Ready(Some(FromFetchTask::Concluded(Some(bad_boys)))) => {
263 self.session_cache.report_bad_log(bad_boys);
264 continue
265 },
266 Poll::Ready(Some(FromFetchTask::Concluded(None))) => continue,
267 Poll::Ready(Some(FromFetchTask::Failed(candidate_hash))) => {
268 self.fetches.remove(&candidate_hash);
270 },
271 Poll::Ready(None) => return Poll::Ready(None),
272 Poll::Pending => return Poll::Pending,
273 }
274 }
275 }
276}
277
278async fn get_block_ancestors_in_same_session<Sender>(
282 sender: &mut Sender,
283 runtime: &mut RuntimeInfo,
284 head: Hash,
285 limit: usize,
286) -> Result<(SessionIndex, Vec<Hash>)>
287where
288 Sender:
289 overseer::SubsystemSender<RuntimeApiMessage> + overseer::SubsystemSender<ChainApiMessage>,
290{
291 let mut ancestors = get_block_ancestors(sender, head, limit + 1).await?;
297 let mut ancestors_iter = ancestors.iter();
298
299 let head_session_index = match ancestors_iter.next() {
301 Some(parent) => runtime.get_session_index_for_child(sender, *parent).await?,
302 None => {
303 return Ok((0, ancestors))
305 },
306 };
307
308 let mut session_ancestry_len = 0;
309 for parent in ancestors_iter {
311 let session_index = runtime.get_session_index_for_child(sender, *parent).await?;
313 if session_index == head_session_index {
314 session_ancestry_len += 1;
315 } else {
316 break
317 }
318 }
319
320 ancestors.truncate(session_ancestry_len);
322
323 Ok((head_session_index, ancestors))
324}
325
326async fn get_block_ancestors<Sender>(
328 sender: &mut Sender,
329 relay_parent: Hash,
330 limit: usize,
331) -> Result<Vec<Hash>>
332where
333 Sender: overseer::SubsystemSender<ChainApiMessage>,
334{
335 let (tx, rx) = oneshot::channel();
336 sender
337 .send_message(ChainApiMessage::Ancestors {
338 hash: relay_parent,
339 k: limit,
340 response_channel: tx,
341 })
342 .await;
343
344 let ancestors = rx
345 .await
346 .map_err(FatalError::ChainApiSenderDropped)?
347 .map_err(FatalError::ChainApi)?;
348 Ok(ancestors)
349}