1use cumulus_client_network::WaitToAnnounce;
22use cumulus_primitives_core::{
23 CollationInfo, CollectCollationInfo, ParachainBlockData, SchedulingProof,
24};
25
26use polkadot_primitives::UMP_SEPARATOR;
27use sc_client_api::BlockBackend;
28use sp_api::{ApiExt, ProvideRuntimeApi, StorageProof};
29use sp_consensus::BlockStatus;
30use sp_core::traits::SpawnNamed;
31use sp_runtime::traits::{Block as BlockT, HashingFor, Header as HeaderT, Zero};
32
33use cumulus_client_consensus_common::ParachainCandidate;
34use polkadot_node_primitives::{
35 BlockData, Collation, CollationSecondedSignal, MaybeCompressedPoV, PoV,
36};
37
38use codec::Encode;
39use futures::channel::oneshot;
40use parking_lot::Mutex;
41use std::sync::Arc;
42const LOG_TARGET: &str = "cumulus-collator";
44
45pub trait ServiceInterface<Block: BlockT> {
47 fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool;
51
52 fn build_collation(
61 &self,
62 parent_header: &Block::Header,
63 block_hash: Block::Hash,
64 candidate: ParachainCandidate<Block>,
65 scheduling_proof: Option<SchedulingProof>,
66 ) -> Option<(Collation, ParachainBlockData<Block>)>;
67
68 fn build_multi_block_collation(
76 &self,
77 parent_header: &Block::Header,
78 blocks: Vec<Block>,
79 proof: StorageProof,
80 scheduling_proof: Option<SchedulingProof>,
81 ) -> Option<(Collation, ParachainBlockData<Block>)>;
82
83 fn announce_with_barrier(
89 &self,
90 block_hash: Block::Hash,
91 ) -> oneshot::Sender<CollationSecondedSignal>;
92
93 fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>);
95}
96
97pub struct CollatorService<Block: BlockT, BS, RA> {
103 block_status: Arc<BS>,
104 wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
105 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
106 runtime_api: Arc<RA>,
107}
108
109impl<Block: BlockT, BS, RA> Clone for CollatorService<Block, BS, RA> {
110 fn clone(&self) -> Self {
111 Self {
112 block_status: self.block_status.clone(),
113 wait_to_announce: self.wait_to_announce.clone(),
114 announce_block: self.announce_block.clone(),
115 runtime_api: self.runtime_api.clone(),
116 }
117 }
118}
119
120impl<Block, BS, RA> CollatorService<Block, BS, RA>
121where
122 Block: BlockT,
123 BS: BlockBackend<Block>,
124 RA: ProvideRuntimeApi<Block>,
125 RA::Api: CollectCollationInfo<Block>,
126{
127 fn split_at_separator(messages: Vec<Vec<u8>>) -> (Vec<Vec<u8>>, Vec<Vec<u8>>) {
128 let mut parts = messages.splitn(2, |m: &Vec<u8>| m.is_empty());
129 (parts.next().unwrap_or(&[]).to_vec(), parts.next().unwrap_or(&[]).to_vec())
130 }
131
132 pub fn new(
134 block_status: Arc<BS>,
135 spawner: Arc<dyn SpawnNamed + Send + Sync>,
136 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
137 runtime_api: Arc<RA>,
138 ) -> Self {
139 let wait_to_announce =
140 Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block.clone())));
141
142 Self { block_status, wait_to_announce, announce_block, runtime_api }
143 }
144
145 pub fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
149 match self.block_status.block_status(hash) {
150 Ok(BlockStatus::Queued) => {
151 tracing::debug!(
152 target: LOG_TARGET,
153 block_hash = ?hash,
154 "Skipping candidate production, because block is still queued for import.",
155 );
156 false
157 },
158 Ok(BlockStatus::InChainWithState) => true,
159 Ok(BlockStatus::InChainPruned) => {
160 tracing::error!(
161 target: LOG_TARGET,
162 "Skipping candidate production, because block `{:?}` is already pruned!",
163 hash,
164 );
165 false
166 },
167 Ok(BlockStatus::KnownBad) => {
168 tracing::error!(
169 target: LOG_TARGET,
170 block_hash = ?hash,
171 "Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
172 );
173 false
174 },
175 Ok(BlockStatus::Unknown) => {
176 if header.number().is_zero() {
177 tracing::error!(
178 target: LOG_TARGET,
179 block_hash = ?hash,
180 "Could not find the header of the genesis block in the database!",
181 );
182 } else {
183 tracing::debug!(
184 target: LOG_TARGET,
185 block_hash = ?hash,
186 "Skipping candidate production, because block is unknown.",
187 );
188 }
189 false
190 },
191 Err(e) => {
192 tracing::error!(
193 target: LOG_TARGET,
194 block_hash = ?hash,
195 error = ?e,
196 "Failed to get block status.",
197 );
198 false
199 },
200 }
201 }
202
203 pub fn fetch_collation_info(
209 &self,
210 block_hash: Block::Hash,
211 header: &Block::Header,
212 ) -> Result<Option<(CollationInfo, u32)>, sp_api::ApiError> {
213 let runtime_api = self.runtime_api.runtime_api();
214
215 let api_version =
216 match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
217 Some(version) => version,
218 None => {
219 tracing::error!(
220 target: LOG_TARGET,
221 "Could not fetch `CollectCollationInfo` runtime api version."
222 );
223 return Ok(None);
224 },
225 };
226
227 let collation_info = if api_version < 2 {
228 #[allow(deprecated)]
229 runtime_api
230 .collect_collation_info_before_version_2(block_hash)?
231 .into_latest(header.encode().into())
232 } else {
233 runtime_api.collect_collation_info(block_hash, header)?
234 };
235
236 Ok(Some((collation_info, api_version)))
237 }
238
239 fn build_multi_block_collation(
245 &self,
246 parent_header: &Block::Header,
247 blocks: Vec<Block>,
248 proof: StorageProof,
249 scheduling_proof: Option<SchedulingProof>,
250 ) -> Option<(Collation, ParachainBlockData<Block>)> {
251 let compact_proof =
252 match proof.into_compact_proof::<HashingFor<Block>>(*parent_header.state_root()) {
253 Ok(proof) => proof,
254 Err(e) => {
255 tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
256 return None;
257 },
258 };
259
260 let api_version = self
266 .runtime_api
267 .runtime_api()
268 .api_version::<dyn CollectCollationInfo<Block>>(parent_header.hash())
269 .ok()
270 .flatten()?;
271 let mut upward_messages = Vec::new();
272 let mut upward_message_signals = Vec::<Vec<u8>>::with_capacity(4);
273 let mut horizontal_messages = Vec::new();
274 let mut new_validation_code = None;
275 let mut processed_downward_messages = 0;
276 let mut hrmp_watermark = None;
277 let mut head_data = None;
278
279 for block in &blocks {
280 let (collation_info, _api_version) = self
282 .fetch_collation_info(block.hash(), block.header())
283 .map_err(|e| {
284 tracing::error!(
285 target: LOG_TARGET,
286 error = ?e,
287 "Failed to collect collation info.",
288 )
289 })
290 .ok()
291 .flatten()?;
292
293 let (messages, signals) = Self::split_at_separator(collation_info.upward_messages);
294
295 upward_messages.extend(messages);
296 upward_message_signals.extend(signals);
297 horizontal_messages.extend(collation_info.horizontal_messages);
298
299 if let Some(new_code) = collation_info.new_validation_code {
300 if new_validation_code.replace(new_code).is_some() {
301 tracing::warn!(
302 target: LOG_TARGET,
303 block = ?block.hash(),
304 "Overwriting validation code from an earlier block in the bundle.",
305 );
306 }
307 }
308 processed_downward_messages += collation_info.processed_downward_messages;
309 hrmp_watermark = Some(collation_info.hrmp_watermark);
310 head_data = Some(collation_info.head_data);
311 }
312
313 horizontal_messages.sort_by(|a, b| a.recipient.cmp(&b.recipient));
315
316 let block_data = ParachainBlockData::<Block>::new(blocks, compact_proof, scheduling_proof);
317
318 let pov = polkadot_node_primitives::maybe_compress_pov(PoV {
319 block_data: BlockData(if api_version >= 3 {
320 block_data.encode()
321 } else {
322 let block_data = block_data.as_v0();
323
324 if block_data.is_none() {
325 tracing::error!(
326 target: LOG_TARGET,
327 "Trying to submit a collation with multiple blocks is not supported by the current runtime."
328 );
329 }
330
331 block_data?.encode()
332 }),
333 });
334
335 if !upward_message_signals.is_empty() {
337 upward_messages.push(UMP_SEPARATOR);
338 upward_messages.extend(upward_message_signals.into_iter());
339 }
340
341 let upward_messages = upward_messages
342 .try_into()
343 .map_err(|e| {
344 tracing::error!(
345 target: LOG_TARGET,
346 error = ?e,
347 "Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
348 )
349 })
350 .ok()?;
351 let horizontal_messages = horizontal_messages
352 .try_into()
353 .map_err(|e| {
354 tracing::error!(
355 target: LOG_TARGET,
356 error = ?e,
357 "Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
358 )
359 })
360 .ok()?;
361
362 let collation = Collation {
363 upward_messages,
364 new_validation_code,
365 processed_downward_messages,
366 horizontal_messages,
367 hrmp_watermark: hrmp_watermark?,
369 head_data: head_data?,
370 proof_of_validity: MaybeCompressedPoV::Compressed(pov),
371 };
372
373 Some((collation, block_data))
374 }
375
376 pub fn announce_with_barrier(
379 &self,
380 block_hash: Block::Hash,
381 ) -> oneshot::Sender<CollationSecondedSignal> {
382 let (result_sender, signed_stmt_recv) = oneshot::channel();
383 self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
384 result_sender
385 }
386}
387
388impl<Block, BS, RA> ServiceInterface<Block> for CollatorService<Block, BS, RA>
389where
390 Block: BlockT,
391 BS: BlockBackend<Block>,
392 RA: ProvideRuntimeApi<Block>,
393 RA::Api: CollectCollationInfo<Block>,
394{
395 fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
396 CollatorService::check_block_status(self, hash, header)
397 }
398
399 fn build_collation(
400 &self,
401 parent_header: &Block::Header,
402 _: Block::Hash,
403 candidate: ParachainCandidate<Block>,
404 scheduling_proof: Option<SchedulingProof>,
405 ) -> Option<(Collation, ParachainBlockData<Block>)> {
406 CollatorService::build_multi_block_collation(
407 self,
408 parent_header,
409 vec![candidate.block],
410 candidate.proof,
411 scheduling_proof,
412 )
413 }
414
415 fn announce_with_barrier(
416 &self,
417 block_hash: Block::Hash,
418 ) -> oneshot::Sender<CollationSecondedSignal> {
419 CollatorService::announce_with_barrier(self, block_hash)
420 }
421
422 fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>) {
423 (self.announce_block)(block_hash, data)
424 }
425
426 fn build_multi_block_collation(
427 &self,
428 parent_header: &<Block as BlockT>::Header,
429 blocks: Vec<Block>,
430 proof: StorageProof,
431 scheduling_proof: Option<SchedulingProof>,
432 ) -> Option<(Collation, ParachainBlockData<Block>)> {
433 CollatorService::build_multi_block_collation(
434 self,
435 parent_header,
436 blocks,
437 proof,
438 scheduling_proof,
439 )
440 }
441}