1use cumulus_client_network::WaitToAnnounce;
22use cumulus_primitives_core::{CollationInfo, CollectCollationInfo, ParachainBlockData};
23
24use sc_client_api::BlockBackend;
25use sp_api::{ApiExt, ProvideRuntimeApi};
26use sp_consensus::BlockStatus;
27use sp_core::traits::SpawnNamed;
28use sp_runtime::traits::{Block as BlockT, HashingFor, Header as HeaderT, Zero};
29
30use cumulus_client_consensus_common::ParachainCandidate;
31use polkadot_node_primitives::{
32 BlockData, Collation, CollationSecondedSignal, MaybeCompressedPoV, PoV,
33};
34
35use codec::Encode;
36use futures::channel::oneshot;
37use parking_lot::Mutex;
38use std::sync::Arc;
39
40const LOG_TARGET: &str = "cumulus-collator";
42
43pub trait ServiceInterface<Block: BlockT> {
45 fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool;
49
50 fn build_collation(
56 &self,
57 parent_header: &Block::Header,
58 block_hash: Block::Hash,
59 candidate: ParachainCandidate<Block>,
60 ) -> Option<(Collation, ParachainBlockData<Block>)>;
61
62 fn announce_with_barrier(
68 &self,
69 block_hash: Block::Hash,
70 ) -> oneshot::Sender<CollationSecondedSignal>;
71
72 fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>);
74}
75
76pub struct CollatorService<Block: BlockT, BS, RA> {
82 block_status: Arc<BS>,
83 wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
84 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
85 runtime_api: Arc<RA>,
86}
87
88impl<Block: BlockT, BS, RA> Clone for CollatorService<Block, BS, RA> {
89 fn clone(&self) -> Self {
90 Self {
91 block_status: self.block_status.clone(),
92 wait_to_announce: self.wait_to_announce.clone(),
93 announce_block: self.announce_block.clone(),
94 runtime_api: self.runtime_api.clone(),
95 }
96 }
97}
98
99impl<Block, BS, RA> CollatorService<Block, BS, RA>
100where
101 Block: BlockT,
102 BS: BlockBackend<Block>,
103 RA: ProvideRuntimeApi<Block>,
104 RA::Api: CollectCollationInfo<Block>,
105{
106 pub fn new(
108 block_status: Arc<BS>,
109 spawner: Arc<dyn SpawnNamed + Send + Sync>,
110 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
111 runtime_api: Arc<RA>,
112 ) -> Self {
113 let wait_to_announce =
114 Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block.clone())));
115
116 Self { block_status, wait_to_announce, announce_block, runtime_api }
117 }
118
119 pub fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
123 match self.block_status.block_status(hash) {
124 Ok(BlockStatus::Queued) => {
125 tracing::debug!(
126 target: LOG_TARGET,
127 block_hash = ?hash,
128 "Skipping candidate production, because block is still queued for import.",
129 );
130 false
131 },
132 Ok(BlockStatus::InChainWithState) => true,
133 Ok(BlockStatus::InChainPruned) => {
134 tracing::error!(
135 target: LOG_TARGET,
136 "Skipping candidate production, because block `{:?}` is already pruned!",
137 hash,
138 );
139 false
140 },
141 Ok(BlockStatus::KnownBad) => {
142 tracing::error!(
143 target: LOG_TARGET,
144 block_hash = ?hash,
145 "Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
146 );
147 false
148 },
149 Ok(BlockStatus::Unknown) => {
150 if header.number().is_zero() {
151 tracing::error!(
152 target: LOG_TARGET,
153 block_hash = ?hash,
154 "Could not find the header of the genesis block in the database!",
155 );
156 } else {
157 tracing::debug!(
158 target: LOG_TARGET,
159 block_hash = ?hash,
160 "Skipping candidate production, because block is unknown.",
161 );
162 }
163 false
164 },
165 Err(e) => {
166 tracing::error!(
167 target: LOG_TARGET,
168 block_hash = ?hash,
169 error = ?e,
170 "Failed to get block status.",
171 );
172 false
173 },
174 }
175 }
176
177 pub fn fetch_collation_info(
183 &self,
184 block_hash: Block::Hash,
185 header: &Block::Header,
186 ) -> Result<Option<(CollationInfo, u32)>, sp_api::ApiError> {
187 let runtime_api = self.runtime_api.runtime_api();
188
189 let api_version =
190 match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
191 Some(version) => version,
192 None => {
193 tracing::error!(
194 target: LOG_TARGET,
195 "Could not fetch `CollectCollationInfo` runtime api version."
196 );
197 return Ok(None)
198 },
199 };
200
201 let collation_info = if api_version < 2 {
202 #[allow(deprecated)]
203 runtime_api
204 .collect_collation_info_before_version_2(block_hash)?
205 .into_latest(header.encode().into())
206 } else {
207 runtime_api.collect_collation_info(block_hash, header)?
208 };
209
210 Ok(Some((collation_info, api_version)))
211 }
212
213 pub fn build_collation(
219 &self,
220 parent_header: &Block::Header,
221 block_hash: Block::Hash,
222 candidate: ParachainCandidate<Block>,
223 ) -> Option<(Collation, ParachainBlockData<Block>)> {
224 let block = candidate.block;
225
226 let compact_proof = match candidate
227 .proof
228 .into_compact_proof::<HashingFor<Block>>(*parent_header.state_root())
229 {
230 Ok(proof) => proof,
231 Err(e) => {
232 tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
233 return None
234 },
235 };
236
237 let (collation_info, _api_version) = self
239 .fetch_collation_info(block_hash, block.header())
240 .map_err(|e| {
241 tracing::error!(
242 target: LOG_TARGET,
243 error = ?e,
244 "Failed to collect collation info.",
245 )
246 })
247 .ok()
248 .flatten()?;
249
250 let api_version = self
258 .runtime_api
259 .runtime_api()
260 .api_version::<dyn CollectCollationInfo<Block>>(parent_header.hash())
261 .ok()
262 .flatten()?;
263
264 let block_data = ParachainBlockData::<Block>::new(vec![block], compact_proof);
265
266 let pov = polkadot_node_primitives::maybe_compress_pov(PoV {
267 block_data: BlockData(if api_version >= 3 {
268 block_data.encode()
269 } else {
270 let block_data = block_data.as_v0();
271
272 if block_data.is_none() {
273 tracing::error!(
274 target: LOG_TARGET,
275 "Trying to submit a collation with multiple blocks is not supported by the current runtime."
276 );
277 }
278
279 block_data?.encode()
280 }),
281 });
282
283 let upward_messages = collation_info
284 .upward_messages
285 .try_into()
286 .map_err(|e| {
287 tracing::error!(
288 target: LOG_TARGET,
289 error = ?e,
290 "Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
291 )
292 })
293 .ok()?;
294 let horizontal_messages = collation_info
295 .horizontal_messages
296 .try_into()
297 .map_err(|e| {
298 tracing::error!(
299 target: LOG_TARGET,
300 error = ?e,
301 "Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
302 )
303 })
304 .ok()?;
305
306 let collation = Collation {
307 upward_messages,
308 new_validation_code: collation_info.new_validation_code,
309 processed_downward_messages: collation_info.processed_downward_messages,
310 horizontal_messages,
311 hrmp_watermark: collation_info.hrmp_watermark,
312 head_data: collation_info.head_data,
313 proof_of_validity: MaybeCompressedPoV::Compressed(pov),
314 };
315
316 Some((collation, block_data))
317 }
318
319 pub fn announce_with_barrier(
322 &self,
323 block_hash: Block::Hash,
324 ) -> oneshot::Sender<CollationSecondedSignal> {
325 let (result_sender, signed_stmt_recv) = oneshot::channel();
326 self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
327 result_sender
328 }
329}
330
331impl<Block, BS, RA> ServiceInterface<Block> for CollatorService<Block, BS, RA>
332where
333 Block: BlockT,
334 BS: BlockBackend<Block>,
335 RA: ProvideRuntimeApi<Block>,
336 RA::Api: CollectCollationInfo<Block>,
337{
338 fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
339 CollatorService::check_block_status(self, hash, header)
340 }
341
342 fn build_collation(
343 &self,
344 parent_header: &Block::Header,
345 block_hash: Block::Hash,
346 candidate: ParachainCandidate<Block>,
347 ) -> Option<(Collation, ParachainBlockData<Block>)> {
348 CollatorService::build_collation(self, parent_header, block_hash, candidate)
349 }
350
351 fn announce_with_barrier(
352 &self,
353 block_hash: Block::Hash,
354 ) -> oneshot::Sender<CollationSecondedSignal> {
355 CollatorService::announce_with_barrier(self, block_hash)
356 }
357
358 fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>) {
359 (self.announce_block)(block_hash, data)
360 }
361}