1use super::{metrics::MetricsLink as PrometheusMetrics, revalidation};
22pub use crate::{
23 api::FullChainApi,
24 graph::{ChainApi, ValidatedTransaction},
25};
26use crate::{
27 common::{
28 enactment_state::{EnactmentAction, EnactmentState},
29 error,
30 tracing_log_xt::log_xt_trace,
31 },
32 graph::{
33 self, base_pool::TimedTransactionSource, EventHandler, ExtrinsicHash, IsValidator,
34 RawExtrinsicFor,
35 },
36 ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
37};
38use async_trait::async_trait;
39use futures::{channel::oneshot, future, prelude::*, Future, FutureExt};
40use parking_lot::Mutex;
41use prometheus_endpoint::Registry as PrometheusRegistry;
42use sc_transaction_pool_api::{
43 error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
44 PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor,
45 TxHash, TxInvalidityReportMap,
46};
47use sp_blockchain::{HashAndNumber, TreeRoute};
48use sp_core::traits::SpawnEssentialNamed;
49use sp_runtime::{
50 generic::BlockId,
51 traits::{
52 AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, SaturatedConversion, Zero,
53 },
54 transaction_validity::{TransactionTag as Tag, TransactionValidityError},
55};
56use std::{
57 collections::{HashMap, HashSet},
58 pin::Pin,
59 sync::Arc,
60 time::Instant,
61};
62use tokio::select;
63use tracing::{trace, warn};
64
65pub struct BasicPool<PoolApi, Block>
67where
68 Block: BlockT,
69 PoolApi: graph::ChainApi<Block = Block>,
70{
71 pool: Arc<graph::Pool<PoolApi, ()>>,
72 api: Arc<PoolApi>,
73 revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
74 revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
75 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
76 metrics: PrometheusMetrics,
77 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
78}
79
80struct ReadyPoll<T, Block: BlockT> {
81 updated_at: NumberFor<Block>,
82 pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
83}
84
85impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
86 fn default() -> Self {
87 Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
88 }
89}
90
91impl<T, Block: BlockT> ReadyPoll<T, Block> {
92 fn new(best_block_number: NumberFor<Block>) -> Self {
93 Self { updated_at: best_block_number, pollers: Default::default() }
94 }
95
96 fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
97 self.updated_at = number;
98
99 let mut idx = 0;
100 while idx < self.pollers.len() {
101 if self.pollers[idx].0 <= number {
102 let poller_sender = self.pollers.swap_remove(idx);
103 trace!(
104 target: LOG_TARGET,
105 ?number,
106 "Sending ready signal."
107 );
108 let _ = poller_sender.1.send(iterator_factory());
109 } else {
110 idx += 1;
111 }
112 }
113 }
114
115 fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
116 let (sender, receiver) = oneshot::channel();
117 self.pollers.push((number, sender));
118 receiver
119 }
120
121 fn updated_at(&self) -> NumberFor<Block> {
122 self.updated_at
123 }
124}
125
126pub enum RevalidationType {
128 Light,
135
136 Full,
141}
142
143impl<PoolApi, Block> BasicPool<PoolApi, Block>
144where
145 Block: BlockT,
146 PoolApi: graph::ChainApi<Block = Block> + 'static,
147{
148 pub fn new_test(
150 pool_api: Arc<PoolApi>,
151 best_block_hash: Block::Hash,
152 finalized_hash: Block::Hash,
153 options: graph::Options,
154 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
155 let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
156 options,
157 true.into(),
158 pool_api.clone(),
159 ));
160 let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
161 pool_api.clone(),
162 pool.clone(),
163 finalized_hash,
164 );
165 (
166 Self {
167 api: pool_api,
168 pool,
169 revalidation_queue: Arc::new(revalidation_queue),
170 revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
171 ready_poll: Default::default(),
172 metrics: Default::default(),
173 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
174 best_block_hash,
175 finalized_hash,
176 ))),
177 },
178 background_task,
179 )
180 }
181
182 pub fn with_revalidation_type(
185 options: graph::Options,
186 is_validator: IsValidator,
187 pool_api: Arc<PoolApi>,
188 prometheus: Option<&PrometheusRegistry>,
189 revalidation_type: RevalidationType,
190 spawner: impl SpawnEssentialNamed,
191 best_block_number: NumberFor<Block>,
192 best_block_hash: Block::Hash,
193 finalized_hash: Block::Hash,
194 ) -> Self {
195 let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
196 options,
197 is_validator,
198 pool_api.clone(),
199 ));
200 let (revalidation_queue, background_task) = match revalidation_type {
201 RevalidationType::Light =>
202 (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
203 RevalidationType::Full => {
204 let (queue, background) = revalidation::RevalidationQueue::new_background(
205 pool_api.clone(),
206 pool.clone(),
207 finalized_hash,
208 );
209 (queue, Some(background))
210 },
211 };
212
213 if let Some(background_task) = background_task {
214 spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
215 }
216
217 Self {
218 api: pool_api,
219 pool,
220 revalidation_queue: Arc::new(revalidation_queue),
221 revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
222 RevalidationType::Light =>
223 RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
224 RevalidationType::Full => RevalidationStrategy::Always,
225 })),
226 ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
227 metrics: PrometheusMetrics::new(prometheus),
228 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
229 best_block_hash,
230 finalized_hash,
231 ))),
232 }
233 }
234
235 pub fn pool(&self) -> &Arc<graph::Pool<PoolApi, ()>> {
237 &self.pool
238 }
239
240 pub fn api(&self) -> &PoolApi {
242 &self.api
243 }
244
245 async fn ready_at_with_timeout_internal(
246 &self,
247 at: Block::Hash,
248 timeout: std::time::Duration,
249 ) -> ReadyIteratorFor<PoolApi> {
250 select! {
251 ready = self.ready_at(at)=> ready,
252 _ = futures_timer::Delay::new(timeout)=> self.ready()
253 }
254 }
255}
256
257#[async_trait]
258impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
259where
260 Block: BlockT,
261 PoolApi: 'static + graph::ChainApi<Block = Block>,
262{
263 type Block = PoolApi::Block;
264 type Hash = graph::ExtrinsicHash<PoolApi>;
265 type InPoolTransaction =
266 graph::base_pool::Transaction<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
267 type Error = PoolApi::Error;
268
269 async fn submit_at(
270 &self,
271 at: <Self::Block as BlockT>::Hash,
272 source: TransactionSource,
273 xts: Vec<TransactionFor<Self>>,
274 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
275 let pool = self.pool.clone();
276 let xts = xts
277 .into_iter()
278 .map(|xt| {
279 (TimedTransactionSource::from_transaction_source(source, false), Arc::from(xt))
280 })
281 .collect::<Vec<_>>();
282
283 self.metrics
284 .report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
285
286 let number = self.api.resolve_block_number(at);
287 let at = HashAndNumber { hash: at, number: number? };
288 Ok(pool
289 .submit_at(&at, xts, ValidateTransactionPriority::Submitted)
290 .await
291 .into_iter()
292 .map(|result| result.map(|outcome| outcome.hash()))
293 .collect())
294 }
295
296 async fn submit_one(
297 &self,
298 at: <Self::Block as BlockT>::Hash,
299 source: TransactionSource,
300 xt: TransactionFor<Self>,
301 ) -> Result<TxHash<Self>, Self::Error> {
302 let pool = self.pool.clone();
303 let xt = Arc::from(xt);
304
305 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
306
307 let number = self.api.resolve_block_number(at);
308 let at = HashAndNumber { hash: at, number: number? };
309 pool.submit_one(&at, TimedTransactionSource::from_transaction_source(source, false), xt)
310 .await
311 .map(|outcome| outcome.hash())
312 }
313
314 async fn submit_and_watch(
315 &self,
316 at: <Self::Block as BlockT>::Hash,
317 source: TransactionSource,
318 xt: TransactionFor<Self>,
319 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
320 let pool = self.pool.clone();
321 let xt = Arc::from(xt);
322
323 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
324
325 let number = self.api.resolve_block_number(at);
326
327 let at = HashAndNumber { hash: at, number: number? };
328 pool.submit_and_watch(
329 &at,
330 TimedTransactionSource::from_transaction_source(source, false),
331 xt,
332 )
333 .await
334 .map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
335 }
336
337 async fn report_invalid(
338 &self,
339 _at: Option<<Self::Block as BlockT>::Hash>,
340 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
341 ) -> Vec<Arc<Self::InPoolTransaction>> {
342 let hashes = invalid_tx_errors.keys().map(|h| *h).collect::<Vec<_>>();
343 let removed = self.pool.validated_pool().remove_invalid(&hashes);
344 self.metrics
345 .report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
346 removed
347 }
348
349 fn status(&self) -> PoolStatus {
350 self.pool.validated_pool().status()
351 }
352
353 fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
354 self.pool.validated_pool().import_notification_stream()
355 }
356
357 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
358 self.pool.hash_of(xt)
359 }
360
361 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
362 self.pool.validated_pool().on_broadcasted(propagations)
363 }
364
365 fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
366 self.pool.validated_pool().ready_by_hash(hash)
367 }
368
369 async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<PoolApi> {
370 let Ok(at) = self.api.resolve_block_number(at) else {
371 return Box::new(std::iter::empty()) as Box<_>
372 };
373
374 let status = self.status();
375 if status.ready == 0 && status.future == 0 {
380 return Box::new(std::iter::empty()) as Box<_>
381 }
382
383 if self.ready_poll.lock().updated_at() >= at {
384 trace!(
385 target: LOG_TARGET,
386 ?at,
387 "Transaction pool already processed block."
388 );
389 let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
390 return iterator
391 }
392
393 let result = self.ready_poll.lock().add(at).map(|received| {
394 received.unwrap_or_else(|error| {
395 warn!(target: LOG_TARGET, ?error, "Error receiving pending set.");
396 Box::new(std::iter::empty())
397 })
398 });
399
400 result.await
401 }
402
403 fn ready(&self) -> ReadyIteratorFor<PoolApi> {
404 Box::new(self.pool.validated_pool().ready())
405 }
406
407 fn futures(&self) -> Vec<Self::InPoolTransaction> {
408 let pool = self.pool.validated_pool().pool.read();
409 pool.futures().cloned().collect::<Vec<_>>()
410 }
411
412 async fn ready_at_with_timeout(
413 &self,
414 at: <Self::Block as BlockT>::Hash,
415 timeout: std::time::Duration,
416 ) -> ReadyIteratorFor<PoolApi> {
417 self.ready_at_with_timeout_internal(at, timeout).await
418 }
419}
420
421impl<Block, Client> BasicPool<FullChainApi<Client, Block>, Block>
422where
423 Block: BlockT,
424 Client: sp_api::ProvideRuntimeApi<Block>
425 + sc_client_api::BlockBackend<Block>
426 + sc_client_api::blockchain::HeaderBackend<Block>
427 + sp_runtime::traits::BlockIdTo<Block>
428 + sc_client_api::ExecutorProvider<Block>
429 + sc_client_api::UsageProvider<Block>
430 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
431 + Send
432 + Sync
433 + 'static,
434 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
435{
436 pub fn new_full(
438 options: graph::Options,
439 is_validator: IsValidator,
440 prometheus: Option<&PrometheusRegistry>,
441 spawner: impl SpawnEssentialNamed,
442 client: Arc<Client>,
443 ) -> Self {
444 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
445 let pool = Self::with_revalidation_type(
446 options,
447 is_validator,
448 pool_api,
449 prometheus,
450 RevalidationType::Full,
451 spawner,
452 client.usage_info().chain.best_number,
453 client.usage_info().chain.best_hash,
454 client.usage_info().chain.finalized_hash,
455 );
456
457 pool
458 }
459}
460
461impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
462 for BasicPool<FullChainApi<Client, Block>, Block>
463where
464 Block: BlockT,
465 Client: sp_api::ProvideRuntimeApi<Block>
466 + sc_client_api::BlockBackend<Block>
467 + sc_client_api::blockchain::HeaderBackend<Block>
468 + sp_runtime::traits::BlockIdTo<Block>
469 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
470 Client: Send + Sync + 'static,
471 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
472{
473 type Block = Block;
474 type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
475 type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
476
477 fn submit_local(
478 &self,
479 at: Block::Hash,
480 xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
481 ) -> Result<Self::Hash, Self::Error> {
482 let validity = self
483 .api
484 .validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))?
485 .map_err(|e| {
486 Self::Error::Pool(match e {
487 TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
488 TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
489 })
490 })?;
491
492 let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
493 let block_number = self
494 .api
495 .block_id_to_number(&BlockId::hash(at))?
496 .ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
497
498 let validated = ValidatedTransaction::valid_at(
499 block_number.saturated_into::<u64>(),
500 hash,
501 TimedTransactionSource::new_local(false),
502 Arc::from(xt),
503 bytes,
504 validity,
505 );
506
507 self.pool
508 .validated_pool()
509 .submit(vec![validated])
510 .remove(0)
511 .map(|outcome| outcome.hash())
512 }
513}
514
515#[cfg_attr(test, derive(Debug))]
516enum RevalidationStatus<N> {
517 NotScheduled,
519 Scheduled(Option<Instant>, Option<N>),
521 InProgress,
523}
524
525enum RevalidationStrategy<N> {
526 Always,
527 Light(RevalidationStatus<N>),
528}
529
530struct RevalidationAction {
531 revalidate: bool,
532 resubmit: bool,
533}
534
535impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
536 pub fn clear(&mut self) {
537 if let Self::Light(status) = self {
538 status.clear()
539 }
540 }
541
542 pub fn next(
543 &mut self,
544 block: N,
545 revalidate_time_period: Option<std::time::Duration>,
546 revalidate_block_period: Option<N>,
547 ) -> RevalidationAction {
548 match self {
549 Self::Light(status) => RevalidationAction {
550 revalidate: status.next_required(
551 block,
552 revalidate_time_period,
553 revalidate_block_period,
554 ),
555 resubmit: false,
556 },
557 Self::Always => RevalidationAction { revalidate: true, resubmit: true },
558 }
559 }
560}
561
562impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
563 pub fn clear(&mut self) {
565 *self = Self::NotScheduled;
566 }
567
568 pub fn next_required(
570 &mut self,
571 block: N,
572 revalidate_time_period: Option<std::time::Duration>,
573 revalidate_block_period: Option<N>,
574 ) -> bool {
575 match *self {
576 Self::NotScheduled => {
577 *self = Self::Scheduled(
578 revalidate_time_period.map(|period| Instant::now() + period),
579 revalidate_block_period.map(|period| block + period),
580 );
581 false
582 },
583 Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
584 let is_required =
585 revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) ||
586 revalidate_at_block.map(|at| block >= at).unwrap_or(false);
587 if is_required {
588 *self = Self::InProgress;
589 }
590 is_required
591 },
592 Self::InProgress => false,
593 }
594 }
595}
596
597pub async fn prune_known_txs_for_block<
601 Block: BlockT,
602 Api: graph::ChainApi<Block = Block>,
603 L: EventHandler<Api>,
604>(
605 at: &HashAndNumber<Block>,
606 api: &Api,
607 pool: &graph::Pool<Api, L>,
608 extrinsics: Option<Vec<RawExtrinsicFor<Api>>>,
609 known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<Api>, Vec<Tag>>>>,
610) -> Vec<ExtrinsicHash<Api>> {
611 let extrinsics = match extrinsics {
612 Some(xts) => xts,
613 None => api
614 .block_body(at.hash)
615 .await
616 .unwrap_or_else(|error| {
617 warn!(target: LOG_TARGET, ?error, "Prune known transactions: error request.");
618 None
619 })
620 .unwrap_or_default(),
621 };
622
623 let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
624
625 let header = match api.block_header(at.hash) {
626 Ok(Some(h)) => h,
627 Ok(None) => {
628 trace!(target: LOG_TARGET, hash = ?at.hash, "Could not find header.");
629 return hashes
630 },
631 Err(error) => {
632 trace!(target: LOG_TARGET, hash = ?at.hash, ?error, "Error retrieving header.");
633 return hashes
634 },
635 };
636
637 log_xt_trace!(target: LOG_TARGET, &hashes, "Pruning transaction.");
638
639 pool.prune(at, *header.parent_hash(), &extrinsics, known_provides_tags).await;
640 hashes
641}
642
643impl<PoolApi, Block> BasicPool<PoolApi, Block>
644where
645 Block: BlockT,
646 PoolApi: 'static + graph::ChainApi<Block = Block>,
647{
648 async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
652 trace!(target: LOG_TARGET, ?tree_route, "handle_enactment tree_route.");
653 let pool = self.pool.clone();
654 let api = self.api.clone();
655
656 let hash_and_number = match tree_route.last() {
657 Some(hash_and_number) => hash_and_number,
658 None => {
659 warn!(target: LOG_TARGET, ?tree_route, "Skipping ChainEvent - no last block in tree route.");
660 return
661 },
662 };
663
664 let next_action = self.revalidation_strategy.lock().next(
665 hash_and_number.number,
666 Some(std::time::Duration::from_secs(60)),
667 Some(20u32.into()),
668 );
669
670 let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
673
674 for retracted in tree_route.retracted() {
680 pool.validated_pool().on_block_retracted(retracted.hash);
682 }
683
684 future::join_all(
685 tree_route
686 .enacted()
687 .iter()
688 .map(|h| prune_known_txs_for_block(h, &*api, &*pool, None, None)),
689 )
690 .await
691 .into_iter()
692 .for_each(|enacted_log| {
693 pruned_log.extend(enacted_log);
694 });
695
696 self.metrics
697 .report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
698
699 if next_action.resubmit {
700 let mut resubmit_transactions = Vec::new();
701
702 for retracted in tree_route.retracted() {
703 let hash = retracted.hash;
704
705 let block_transactions = api
706 .block_body(hash)
707 .await
708 .unwrap_or_else(|error| {
709 warn!(target: LOG_TARGET, ?error, "Failed to fetch block body.");
710 None
711 })
712 .unwrap_or_default()
713 .into_iter();
714
715 let mut resubmitted_to_report = 0;
716
717 resubmit_transactions.extend(
718 block_transactions.into_iter().map(Arc::from).filter_map(|tx| {
720 let tx_hash = pool.hash_of(&tx);
721 let contains = pruned_log.contains(&tx_hash);
722
723 resubmitted_to_report += 1;
725
726 if !contains {
727 trace!(target: LOG_TARGET, ?tx_hash, ?hash, "Resubmitting from retracted block.");
728 Some((
729 TimedTransactionSource::new_external(false),
732 tx,
733 ))
734 } else {
735 None
736 }
737 }),
738 );
739
740 self.metrics.report(|metrics| {
741 metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
742 });
743 }
744
745 pool.resubmit_at(
746 &hash_and_number,
747 resubmit_transactions,
748 ValidateTransactionPriority::Submitted,
749 )
750 .await;
751 }
752
753 let extra_pool = pool.clone();
754 self.ready_poll
757 .lock()
758 .trigger(hash_and_number.number, move || Box::new(extra_pool.validated_pool().ready()));
759
760 if next_action.revalidate {
761 let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
762 self.revalidation_queue.revalidate_later(hash_and_number.hash, hashes).await;
763
764 self.revalidation_strategy.lock().clear();
765 }
766 }
767}
768
769#[async_trait]
770impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
771where
772 Block: BlockT,
773 PoolApi: 'static + graph::ChainApi<Block = Block>,
774{
775 async fn maintain(&self, event: ChainEvent<Self::Block>) {
776 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
777 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
778 match self.api.tree_route(from, to) {
779 Ok(tree_route) => Ok(tree_route),
780 Err(e) =>
781 return Err(format!(
782 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
783 )),
784 }
785 };
786 let block_id_to_number =
787 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
788
789 let result =
790 self.enactment_state
791 .lock()
792 .update(&event, &compute_tree_route, &block_id_to_number);
793
794 match result {
795 Err(error) => {
796 trace!(target: LOG_TARGET, %error, "enactment state update");
797 self.enactment_state.lock().force_update(&event);
798 },
799 Ok(EnactmentAction::Skip) => return,
800 Ok(EnactmentAction::HandleFinalization) => {},
801 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
802 self.handle_enactment(tree_route).await;
803 },
804 };
805
806 if let ChainEvent::Finalized { hash, tree_route } = event {
807 trace!(
808 target: LOG_TARGET,
809 ?tree_route,
810 ?prev_finalized_block,
811 "on-finalized enacted"
812 );
813
814 for hash in tree_route.iter().chain(std::iter::once(&hash)) {
815 if let Err(error) = self.pool.validated_pool().on_block_finalized(*hash).await {
816 warn!(
817 target: LOG_TARGET,
818 ?hash,
819 ?error,
820 "Error occurred while attempting to notify watchers about finalization"
821 );
822 }
823 }
824 }
825 }
826}