sc_rpc/chain/
chain_full.rs1use super::{client_err, ChainBackend, Error};
22use crate::{
23 utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
24 SubscriptionTaskExecutor,
25};
26use std::{marker::PhantomData, sync::Arc};
27
28use futures::{
29 future::{self},
30 stream::{self, Stream, StreamExt},
31};
32use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
33use sc_client_api::{BlockBackend, BlockchainEvents};
34use sp_blockchain::HeaderBackend;
35use sp_runtime::{generic::SignedBlock, traits::Block as BlockT};
36
37pub struct FullChain<Block: BlockT, Client> {
39 client: Arc<Client>,
41 _phantom: PhantomData<Block>,
43 executor: SubscriptionTaskExecutor,
45}
46
47impl<Block: BlockT, Client> FullChain<Block, Client> {
48 pub fn new(client: Arc<Client>, executor: SubscriptionTaskExecutor) -> Self {
50 Self { client, executor, _phantom: PhantomData }
51 }
52}
53
54#[async_trait]
55impl<Block, Client> ChainBackend<Client, Block> for FullChain<Block, Client>
56where
57 Block: BlockT + 'static,
58 Block::Header: Unpin,
59 Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
60{
61 fn client(&self) -> &Arc<Client> {
62 &self.client
63 }
64
65 fn header(&self, hash: Option<Block::Hash>) -> Result<Option<Block::Header>, Error> {
66 self.client.header(self.unwrap_or_best(hash)).map_err(client_err)
67 }
68
69 fn block(&self, hash: Option<Block::Hash>) -> Result<Option<SignedBlock<Block>>, Error> {
70 self.client.block(self.unwrap_or_best(hash)).map_err(client_err)
71 }
72
73 fn subscribe_all_heads(&self, pending: PendingSubscriptionSink) {
74 subscribe_headers(
75 &self.client,
76 &self.executor,
77 pending,
78 || self.client().info().best_hash,
79 || {
80 self.client()
81 .import_notification_stream()
82 .map(|notification| notification.header)
83 },
84 )
85 }
86
87 fn subscribe_new_heads(&self, pending: PendingSubscriptionSink) {
88 subscribe_headers(
89 &self.client,
90 &self.executor,
91 pending,
92 || self.client().info().best_hash,
93 || {
94 self.client()
95 .import_notification_stream()
96 .filter(|notification| future::ready(notification.is_new_best))
97 .map(|notification| notification.header)
98 },
99 )
100 }
101
102 fn subscribe_finalized_heads(&self, pending: PendingSubscriptionSink) {
103 subscribe_headers(
104 &self.client,
105 &self.executor,
106 pending,
107 || self.client().info().finalized_hash,
108 || {
109 self.client()
110 .finality_notification_stream()
111 .map(|notification| notification.header)
112 },
113 )
114 }
115}
116
117fn subscribe_headers<Block, Client, F, G, S>(
119 client: &Arc<Client>,
120 executor: &SubscriptionTaskExecutor,
121 pending: PendingSubscriptionSink,
122 best_block_hash: G,
123 stream: F,
124) where
125 Block: BlockT + 'static,
126 Block::Header: Unpin,
127 Client: HeaderBackend<Block> + 'static,
128 F: FnOnce() -> S,
129 G: FnOnce() -> Block::Hash,
130 S: Stream<Item = Block::Header> + Send + Unpin + 'static,
131{
132 let maybe_header = client
134 .header(best_block_hash())
135 .map_err(client_err)
136 .and_then(|header| header.ok_or_else(|| Error::Other("Best header missing.".into())))
137 .map_err(|e| log::warn!("Best header error {:?}", e))
138 .ok();
139
140 let stream = stream::iter(maybe_header).chain(stream());
145 spawn_subscription_task(
146 executor,
147 PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
148 );
149}