referrerpolicy=no-referrer-when-downgrade

sc_rpc/chain/
chain_full.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Blockchain API backend for full nodes.
20
21use super::{client_err, ChainBackend, Error};
22use crate::{
23	utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
24	SubscriptionTaskExecutor,
25};
26use std::{marker::PhantomData, sync::Arc};
27
28use futures::{
29	future::{self},
30	stream::{self, Stream, StreamExt},
31};
32use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
33use sc_client_api::{BlockBackend, BlockchainEvents};
34use sp_blockchain::HeaderBackend;
35use sp_runtime::{generic::SignedBlock, traits::Block as BlockT};
36
37/// Blockchain API backend for full nodes. Reads all the data from local database.
38pub struct FullChain<Block: BlockT, Client> {
39	/// Substrate client.
40	client: Arc<Client>,
41	/// phantom member to pin the block type
42	_phantom: PhantomData<Block>,
43	/// Subscription executor.
44	executor: SubscriptionTaskExecutor,
45}
46
47impl<Block: BlockT, Client> FullChain<Block, Client> {
48	/// Create new Chain API RPC handler.
49	pub fn new(client: Arc<Client>, executor: SubscriptionTaskExecutor) -> Self {
50		Self { client, executor, _phantom: PhantomData }
51	}
52}
53
54#[async_trait]
55impl<Block, Client> ChainBackend<Client, Block> for FullChain<Block, Client>
56where
57	Block: BlockT + 'static,
58	Block::Header: Unpin,
59	Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
60{
61	fn client(&self) -> &Arc<Client> {
62		&self.client
63	}
64
65	fn header(&self, hash: Option<Block::Hash>) -> Result<Option<Block::Header>, Error> {
66		self.client.header(self.unwrap_or_best(hash)).map_err(client_err)
67	}
68
69	fn block(&self, hash: Option<Block::Hash>) -> Result<Option<SignedBlock<Block>>, Error> {
70		self.client.block(self.unwrap_or_best(hash)).map_err(client_err)
71	}
72
73	fn subscribe_all_heads(&self, pending: PendingSubscriptionSink) {
74		subscribe_headers(
75			&self.client,
76			&self.executor,
77			pending,
78			|| self.client().info().best_hash,
79			|| {
80				self.client()
81					.import_notification_stream()
82					.map(|notification| notification.header)
83			},
84		)
85	}
86
87	fn subscribe_new_heads(&self, pending: PendingSubscriptionSink) {
88		subscribe_headers(
89			&self.client,
90			&self.executor,
91			pending,
92			|| self.client().info().best_hash,
93			|| {
94				self.client()
95					.import_notification_stream()
96					.filter(|notification| future::ready(notification.is_new_best))
97					.map(|notification| notification.header)
98			},
99		)
100	}
101
102	fn subscribe_finalized_heads(&self, pending: PendingSubscriptionSink) {
103		subscribe_headers(
104			&self.client,
105			&self.executor,
106			pending,
107			|| self.client().info().finalized_hash,
108			|| {
109				self.client()
110					.finality_notification_stream()
111					.map(|notification| notification.header)
112			},
113		)
114	}
115}
116
117/// Subscribe to new headers.
118fn subscribe_headers<Block, Client, F, G, S>(
119	client: &Arc<Client>,
120	executor: &SubscriptionTaskExecutor,
121	pending: PendingSubscriptionSink,
122	best_block_hash: G,
123	stream: F,
124) where
125	Block: BlockT + 'static,
126	Block::Header: Unpin,
127	Client: HeaderBackend<Block> + 'static,
128	F: FnOnce() -> S,
129	G: FnOnce() -> Block::Hash,
130	S: Stream<Item = Block::Header> + Send + Unpin + 'static,
131{
132	// send current head right at the start.
133	let maybe_header = client
134		.header(best_block_hash())
135		.map_err(client_err)
136		.and_then(|header| header.ok_or_else(|| Error::Other("Best header missing.".into())))
137		.map_err(|e| log::warn!("Best header error {:?}", e))
138		.ok();
139
140	// NOTE: by the time we set up the stream there might be a new best block and so there is a risk
141	// that the stream has a hole in it. The alternative would be to look up the best block *after*
142	// we set up the stream and chain it to the stream. Consuming code would need to handle
143	// duplicates at the beginning of the stream though.
144	let stream = stream::iter(maybe_header).chain(stream());
145	spawn_subscription_task(
146		executor,
147		PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
148	);
149}