referrerpolicy=no-referrer-when-downgrade

polkadot_availability_distribution/requester/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Requester takes care of requesting erasure chunks for candidates that are pending
18//! availability.
19
20use std::{
21	collections::{hash_map::HashMap, hash_set::HashSet},
22	iter::IntoIterator,
23	pin::Pin,
24};
25
26use futures::{
27	channel::{mpsc, oneshot},
28	task::{Context, Poll},
29	Stream,
30};
31
32use polkadot_node_network_protocol::request_response::{v1, v2, IsRequest, ReqProtocolNames};
33use polkadot_node_subsystem::{
34	messages::{ChainApiMessage, RuntimeApiMessage},
35	overseer, ActivatedLeaf, ActiveLeavesUpdate,
36};
37use polkadot_node_subsystem_util::{
38	availability_chunks::availability_chunk_index,
39	runtime::{get_occupied_cores, RuntimeInfo},
40};
41use polkadot_primitives::{CandidateHash, CoreIndex, Hash, OccupiedCore, SessionIndex};
42
43use super::{FatalError, Metrics, Result, LOG_TARGET};
44
45#[cfg(test)]
46mod tests;
47
48/// Cache for session information.
49mod session_cache;
50use session_cache::SessionCache;
51
52/// A task fetching a particular chunk.
53mod fetch_task;
54use fetch_task::{FetchTask, FetchTaskConfig, FromFetchTask};
55
56/// Requester takes care of requesting erasure chunks from backing groups and stores them in the
57/// av store.
58///
59/// It implements a stream that needs to be advanced for it making progress.
60pub struct Requester {
61	/// Candidates we need to fetch our chunk for.
62	///
63	/// We keep those around as long as a candidate is pending availability on some leaf, so we
64	/// won't fetch chunks multiple times.
65	///
66	/// We remove them on failure, so we get retries on the next block still pending availability.
67	fetches: HashMap<CandidateHash, FetchTask>,
68
69	/// Localized information about sessions we are currently interested in.
70	session_cache: SessionCache,
71
72	/// Sender to be cloned for `FetchTask`s.
73	tx: mpsc::Sender<FromFetchTask>,
74
75	/// Receive messages from `FetchTask`.
76	rx: mpsc::Receiver<FromFetchTask>,
77
78	/// Prometheus Metrics
79	metrics: Metrics,
80
81	/// Mapping of the req-response protocols to the full protocol names.
82	req_protocol_names: ReqProtocolNames,
83}
84
85#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
86impl Requester {
87	/// How many ancestors of the leaf should we consider along with it.
88	pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3;
89
90	/// Create a new `Requester`.
91	///
92	/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
93	/// by advancing the stream.
94	pub fn new(req_protocol_names: ReqProtocolNames, metrics: Metrics) -> Self {
95		let (tx, rx) = mpsc::channel(1);
96		Requester {
97			fetches: HashMap::new(),
98			session_cache: SessionCache::new(),
99			tx,
100			rx,
101			metrics,
102			req_protocol_names,
103		}
104	}
105
106	/// Update heads that need availability distribution.
107	///
108	/// For all active heads we will be fetching our chunks for availability distribution.
109	pub async fn update_fetching_heads<Context>(
110		&mut self,
111		ctx: &mut Context,
112		runtime: &mut RuntimeInfo,
113		update: ActiveLeavesUpdate,
114	) -> Result<()> {
115		gum::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
116		let ActiveLeavesUpdate { activated, deactivated } = update;
117		if let Some(leaf) = activated {
118			// Order important! We need to handle activated, prior to deactivated, otherwise we
119			// might cancel still needed jobs.
120			self.start_requesting_chunks(ctx, runtime, leaf).await?;
121		}
122
123		self.stop_requesting_chunks(deactivated.into_iter());
124		Ok(())
125	}
126
127	/// Start requesting chunks for newly imported head.
128	///
129	/// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session
130	/// and start requesting chunks for them too.
131	async fn start_requesting_chunks<Context>(
132		&mut self,
133		ctx: &mut Context,
134		runtime: &mut RuntimeInfo,
135		new_head: ActivatedLeaf,
136	) -> Result<()> {
137		let sender = &mut ctx.sender().clone();
138		let ActivatedLeaf { hash: leaf, .. } = new_head;
139		let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session(
140			sender,
141			runtime,
142			leaf,
143			Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
144		)
145		.await?;
146
147		// Also spawn or bump tasks for candidates in ancestry in the same session.
148		for hash in std::iter::once(leaf).chain(ancestors_in_session) {
149			let cores = get_occupied_cores(sender, hash).await?;
150			gum::trace!(
151				target: LOG_TARGET,
152				occupied_cores = ?cores,
153				"Query occupied core"
154			);
155			// Important:
156			// We mark the whole ancestry as live in the **leaf** hash, so we don't need to track
157			// any tasks separately.
158			//
159			// The next time the subsystem receives leaf update, some of spawned task will be bumped
160			// to be live in fresh relay parent, while some might get dropped due to the current
161			// leaf being deactivated.
162			self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?;
163		}
164
165		Ok(())
166	}
167
168	/// Stop requesting chunks for obsolete heads.
169	fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator<Item = Hash>) {
170		let obsolete_leaves: HashSet<_> = obsolete_leaves.collect();
171		self.fetches.retain(|_, task| {
172			task.remove_leaves(&obsolete_leaves);
173			task.is_live()
174		})
175	}
176
177	/// Add candidates corresponding for a particular relay parent.
178	///
179	/// Starting requests where necessary.
180	///
181	/// Note: The passed in `leaf` is not the same as `CandidateDescriptor::relay_parent` in the
182	/// given cores. The latter is the `relay_parent` this candidate considers its parent, while the
183	/// passed in leaf might be some later block where the candidate is still pending availability.
184	async fn add_cores<Context>(
185		&mut self,
186		context: &mut Context,
187		runtime: &mut RuntimeInfo,
188		leaf: Hash,
189		leaf_session_index: SessionIndex,
190		cores: impl IntoIterator<Item = (CoreIndex, OccupiedCore)>,
191	) -> Result<()> {
192		for (core_index, core) in cores {
193			if let Some(e) = self.fetches.get_mut(&core.candidate_hash) {
194				// Just book keeping - we are already requesting that chunk:
195				e.add_leaf(leaf);
196			} else {
197				let tx = self.tx.clone();
198				let metrics = self.metrics.clone();
199
200				let session_info = self
201					.session_cache
202					.get_session_info(
203						context,
204						runtime,
205						// We use leaf here, the relay_parent must be in the same session as
206						// the leaf. This is guaranteed by runtime which ensures that cores are
207						// cleared at session boundaries. At the same time, only leaves are
208						// guaranteed to be fetchable by the state trie.
209						leaf,
210						leaf_session_index,
211					)
212					.await
213					.map_err(|err| {
214						gum::warn!(
215							target: LOG_TARGET,
216							error = ?err,
217							"Failed to spawn a fetch task"
218						);
219						err
220					})?;
221
222				if let Some(session_info) = session_info {
223					let n_validators =
224						session_info.validator_groups.iter().fold(0usize, |mut acc, group| {
225							acc = acc.saturating_add(group.len());
226							acc
227						});
228					let chunk_index = availability_chunk_index(
229						session_info.node_features.as_ref(),
230						n_validators,
231						core_index,
232						session_info.our_index,
233					)?;
234
235					let task_cfg = FetchTaskConfig::new(
236						leaf,
237						&core,
238						tx,
239						metrics,
240						session_info,
241						chunk_index,
242						self.req_protocol_names.get_name(v1::ChunkFetchingRequest::PROTOCOL),
243						self.req_protocol_names.get_name(v2::ChunkFetchingRequest::PROTOCOL),
244					);
245
246					self.fetches
247						.insert(core.candidate_hash, FetchTask::start(task_cfg, context).await?);
248				}
249			}
250		}
251		Ok(())
252	}
253}
254
255impl Stream for Requester {
256	type Item = overseer::AvailabilityDistributionOutgoingMessages;
257
258	fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
259		loop {
260			match Pin::new(&mut self.rx).poll_next(ctx) {
261				Poll::Ready(Some(FromFetchTask::Message(m))) => return Poll::Ready(Some(m)),
262				Poll::Ready(Some(FromFetchTask::Concluded(Some(bad_boys)))) => {
263					self.session_cache.report_bad_log(bad_boys);
264					continue
265				},
266				Poll::Ready(Some(FromFetchTask::Concluded(None))) => continue,
267				Poll::Ready(Some(FromFetchTask::Failed(candidate_hash))) => {
268					// Make sure we retry on next block still pending availability.
269					self.fetches.remove(&candidate_hash);
270				},
271				Poll::Ready(None) => return Poll::Ready(None),
272				Poll::Pending => return Poll::Pending,
273			}
274		}
275	}
276}
277
278/// Requests up to `limit` ancestor hashes of relay parent in the same session.
279///
280/// Also returns session index of the `head`.
281async fn get_block_ancestors_in_same_session<Sender>(
282	sender: &mut Sender,
283	runtime: &mut RuntimeInfo,
284	head: Hash,
285	limit: usize,
286) -> Result<(SessionIndex, Vec<Hash>)>
287where
288	Sender:
289		overseer::SubsystemSender<RuntimeApiMessage> + overseer::SubsystemSender<ChainApiMessage>,
290{
291	// The order is parent, grandparent, ...
292	//
293	// `limit + 1` since a session index for the last element in ancestry
294	// is obtained through its parent. It always gets truncated because
295	// `session_ancestry_len` can only be incremented `ancestors.len() - 1` times.
296	let mut ancestors = get_block_ancestors(sender, head, limit + 1).await?;
297	let mut ancestors_iter = ancestors.iter();
298
299	// `head` is the child of the first block in `ancestors`, request its session index.
300	let head_session_index = match ancestors_iter.next() {
301		Some(parent) => runtime.get_session_index_for_child(sender, *parent).await?,
302		None => {
303			// No first element, i.e. empty.
304			return Ok((0, ancestors))
305		},
306	};
307
308	let mut session_ancestry_len = 0;
309	// The first parent is skipped.
310	for parent in ancestors_iter {
311		// Parent is the i-th ancestor, request session index for its child -- (i-1)th element.
312		let session_index = runtime.get_session_index_for_child(sender, *parent).await?;
313		if session_index == head_session_index {
314			session_ancestry_len += 1;
315		} else {
316			break
317		}
318	}
319
320	// Drop the rest.
321	ancestors.truncate(session_ancestry_len);
322
323	Ok((head_session_index, ancestors))
324}
325
326/// Request up to `limit` ancestor hashes of relay parent from the Chain API.
327async fn get_block_ancestors<Sender>(
328	sender: &mut Sender,
329	relay_parent: Hash,
330	limit: usize,
331) -> Result<Vec<Hash>>
332where
333	Sender: overseer::SubsystemSender<ChainApiMessage>,
334{
335	let (tx, rx) = oneshot::channel();
336	sender
337		.send_message(ChainApiMessage::Ancestors {
338			hash: relay_parent,
339			k: limit,
340			response_channel: tx,
341		})
342		.await;
343
344	let ancestors = rx
345		.await
346		.map_err(FatalError::ChainApiSenderDropped)?
347		.map_err(FatalError::ChainApi)?;
348	Ok(ancestors)
349}