use crate::chain_head::{
chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
event::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
},
subscription::{SubscriptionManagement, SubscriptionManagementError},
};
use futures::{
channel::oneshot,
stream::{self, Stream, StreamExt},
};
use futures_util::future::Either;
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sc_rpc::utils::Subscription;
use schnellru::{ByLength, LruMap};
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
};
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
SaturatedConversion, Saturating,
};
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
};
const MAX_FINALIZED_BLOCKS: usize = 16;
use super::subscription::InsertedSubscriptionData;
pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
backend: Arc<BE>,
sub_handle: SubscriptionManagement<Block, BE>,
with_runtime: bool,
sub_id: String,
current_best_block: Option<Block::Hash>,
pruned_blocks: LruMap<Block::Hash, ()>,
max_lagging_distance: usize,
}
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
pub fn new(
client: Arc<Client>,
backend: Arc<BE>,
sub_handle: SubscriptionManagement<Block, BE>,
with_runtime: bool,
sub_id: String,
max_lagging_distance: usize,
) -> Self {
Self {
client,
backend,
sub_handle,
with_runtime,
sub_id,
current_best_block: None,
pruned_blocks: LruMap::new(ByLength::new(
MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
)),
max_lagging_distance,
}
}
}
enum NotificationType<Block: BlockT> {
InitialEvents(Vec<FollowEvent<Block::Hash>>),
NewBlock(BlockImportNotification<Block>),
Finalized(FinalityNotification<Block>),
MethodResponse(FollowEvent<Block::Hash>),
}
#[derive(Clone, Debug)]
struct InitialBlocks<Block: BlockT> {
finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
finalized_block_hashes: VecDeque<Block::Hash>,
pruned_forks: HashSet<Block::Hash>,
}
struct StartupPoint<Block: BlockT> {
pub best_hash: Block::Hash,
pub finalized_hash: Block::Hash,
pub finalized_number: NumberFor<Block>,
}
impl<Block: BlockT> From<Info<Block>> for StartupPoint<Block> {
fn from(info: Info<Block>) -> Self {
StartupPoint::<Block> {
best_hash: info.best_hash,
finalized_hash: info.finalized_hash,
finalized_number: info.finalized_number,
}
}
}
impl<BE, Block, Client> ChainHeadFollower<BE, Block, Client>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
Client: BlockBackend<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = BlockChainError>
+ BlockchainEvents<Block>
+ CallApiAt<Block>
+ 'static,
{
fn generate_runtime_event(
&self,
block: Block::Hash,
parent: Option<Block::Hash>,
) -> Option<RuntimeEvent> {
if !self.with_runtime {
return None
}
let block_rt = match self.client.runtime_version_at(block) {
Ok(rt) => rt,
Err(err) => return Some(err.into()),
};
let parent = match parent {
Some(parent) => parent,
None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() })),
};
let parent_rt = match self.client.runtime_version_at(parent) {
Ok(rt) => rt,
Err(err) => return Some(err.into()),
};
if block_rt != parent_rt {
Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt.into() }))
} else {
None
}
}
fn distace_within_reason(
&self,
block: Block::Hash,
finalized: Block::Hash,
) -> Result<(), SubscriptionManagementError> {
let Some(block_num) = self.client.number(block)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};
let Some(finalized_num) = self.client.number(finalized)? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
};
let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
if distance > self.max_lagging_distance {
return Err(SubscriptionManagementError::BlockDistanceTooLarge);
}
Ok(())
}
fn get_init_blocks_with_forks(
&self,
finalized: Block::Hash,
) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let leaves = blockchain.leaves()?;
let mut pruned_forks = HashSet::new();
let mut finalized_block_descendants = Vec::new();
let mut unique_descendants = HashSet::new();
for leaf in &leaves {
self.distace_within_reason(*leaf, finalized)?;
}
for leaf in leaves {
let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;
let blocks = tree_route.enacted().iter().map(|block| block.hash);
if !tree_route.retracted().is_empty() {
pruned_forks.extend(blocks);
} else {
let mut parent = finalized;
for child in blocks {
let pair = (child, parent);
if unique_descendants.insert(pair) {
self.sub_handle.pin_block(&self.sub_id, child)?;
finalized_block_descendants.push(pair);
}
parent = child;
}
}
}
let mut current_block = finalized;
let Some(header) = blockchain.header(current_block)? else {
return Err(SubscriptionManagementError::BlockHeaderAbsent);
};
let mut finalized_block_hashes = VecDeque::with_capacity(MAX_FINALIZED_BLOCKS);
self.sub_handle.pin_block(&self.sub_id, current_block)?;
finalized_block_hashes.push_front(current_block);
current_block = *header.parent_hash();
for _ in 0..MAX_FINALIZED_BLOCKS - 1 {
let Ok(Some(header)) = blockchain.header(current_block) else { break };
if self.sub_handle.pin_block(&self.sub_id, current_block).is_err() {
break
};
finalized_block_hashes.push_front(current_block);
current_block = *header.parent_hash();
}
Ok(InitialBlocks { finalized_block_descendants, finalized_block_hashes, pruned_forks })
}
fn generate_init_events(
&mut self,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;
let initial_blocks = init.finalized_block_descendants;
let finalized_block_hashes = init.finalized_block_hashes;
for pruned in init.pruned_forks {
self.pruned_blocks.insert(pruned, ());
}
let finalized_block_hash = startup_point.finalized_hash;
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
let initialized_event = FollowEvent::Initialized(Initialized {
finalized_block_hashes: finalized_block_hashes.into(),
finalized_block_runtime,
with_runtime: self.with_runtime,
});
let mut finalized_block_descendants = Vec::with_capacity(initial_blocks.len() + 1);
finalized_block_descendants.push(initialized_event);
for (child, parent) in initial_blocks.into_iter() {
let new_runtime = self.generate_runtime_event(child, Some(parent));
let event = FollowEvent::NewBlock(NewBlock {
block_hash: child,
parent_block_hash: parent,
new_runtime,
with_runtime: self.with_runtime,
});
finalized_block_descendants.push(event);
}
let best_block_hash = startup_point.best_hash;
if best_block_hash != finalized_block_hash {
let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
self.current_best_block = Some(best_block_hash);
finalized_block_descendants.push(best_block);
};
Ok(finalized_block_descendants)
}
fn generate_import_events(
&mut self,
block_hash: Block::Hash,
parent_block_hash: Block::Hash,
is_best_block: bool,
) -> Vec<FollowEvent<Block::Hash>> {
let new_runtime = self.generate_runtime_event(block_hash, Some(parent_block_hash));
let new_block = FollowEvent::NewBlock(NewBlock {
block_hash,
parent_block_hash,
new_runtime,
with_runtime: self.with_runtime,
});
if !is_best_block {
return vec![new_block]
}
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });
match self.current_best_block {
Some(block_cache) => {
if block_cache != block_hash {
self.current_best_block = Some(block_hash);
vec![new_block, best_block_event]
} else {
vec![new_block]
}
},
None => {
self.current_best_block = Some(block_hash);
vec![new_block, best_block_event]
},
}
}
fn handle_import_blocks(
&mut self,
notification: BlockImportNotification<Block>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? {
return Ok(Default::default())
}
if *notification.header.number() < startup_point.finalized_number {
return Ok(Default::default())
}
Ok(self.generate_import_events(
notification.hash,
*notification.header.parent_hash(),
notification.is_new_best,
))
}
fn generate_finalized_events(
&mut self,
finalized_block_hashes: &[Block::Hash],
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let mut events = Vec::new();
let Some(first_hash) = finalized_block_hashes.get(0) else { return Ok(Default::default()) };
let Some(first_header) = self.client.header(*first_hash)? else {
return Err(SubscriptionManagementError::BlockHeaderAbsent)
};
let parents =
std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
if !self.sub_handle.pin_block(&self.sub_id, *hash)? {
continue
}
let is_last = i + 1 == finalized_block_hashes.len();
if !is_last {
events.extend(self.generate_import_events(*hash, *parent, false));
continue;
}
if let Some(best_block_hash) = self.current_best_block {
let ancestor =
sp_blockchain::lowest_common_ancestor(&*self.client, *hash, best_block_hash)?;
if ancestor.hash == *hash {
return Err(SubscriptionManagementError::Custom(
"A descendent of the finalized block was already reported".into(),
))
}
}
events.extend(self.generate_import_events(*hash, *parent, true))
}
Ok(events)
}
fn get_pruned_hashes(
&mut self,
stale_heads: &[Block::Hash],
last_finalized: Block::Hash,
) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let mut pruned = Vec::new();
for stale_head in stale_heads {
let tree_route = sp_blockchain::tree_route(blockchain, last_finalized, *stale_head)?;
pruned.extend(tree_route.enacted().iter().filter_map(|block| {
if self.pruned_blocks.get(&block.hash).is_some() {
return None
}
self.pruned_blocks.insert(block.hash, ());
Some(block.hash)
}))
}
Ok(pruned)
}
fn handle_finalized_blocks(
&mut self,
notification: FinalityNotification<Block>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let last_finalized = notification.hash;
if *notification.header.number() < startup_point.finalized_number {
return Ok(Default::default())
}
let mut finalized_block_hashes = notification.tree_route.to_vec();
finalized_block_hashes.push(last_finalized);
let mut events = self.generate_finalized_events(&finalized_block_hashes)?;
let pruned_block_hashes =
self.get_pruned_hashes(¬ification.stale_heads, last_finalized)?;
let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
});
if let Some(current_best_block) = self.current_best_block {
let is_in_pruned_list =
pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
if is_in_pruned_list {
self.current_best_block = Some(last_finalized);
events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: last_finalized,
}));
} else {
let ancestor = sp_blockchain::lowest_common_ancestor(
&*self.client,
last_finalized,
current_best_block,
)?;
let is_descendant = ancestor.hash == last_finalized;
if !is_descendant {
self.current_best_block = Some(last_finalized);
events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: last_finalized,
}));
}
}
}
events.push(finalized_event);
Ok(events)
}
async fn submit_events<EventStream>(
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
sink: Subscription,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
{
let mut stream_item = stream.next();
let connection_closed = sink.closed();
tokio::pin!(connection_closed);
let mut stop_event = futures_util::future::select(rx_stop, connection_closed);
while let Either::Left((Some(event), next_stop_event)) =
futures_util::future::select(stream_item, stop_event).await
{
let events = match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};
let events = match events {
Ok(events) => events,
Err(err) => {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to handle stream notification {:?}",
self.sub_id,
err
);
_ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};
for event in events {
if let Err(err) = sink.send(&event).await {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
);
let _ = sink.send(&FollowEvent::<String>::Stop).await;
return Ok(())
}
}
stream_item = stream.next();
stop_event = next_stop_event;
}
let _ = sink.send(&FollowEvent::<String>::Stop).await;
Ok(())
}
pub async fn generate_events(
&mut self,
sink: Subscription,
sub_data: InsertedSubscriptionData<Block>,
) -> Result<(), SubscriptionManagementError> {
let stream_import = self
.client
.import_notification_stream()
.map(|notification| NotificationType::NewBlock(notification));
let stream_finalized = self
.client
.finality_notification_stream()
.map(|notification| NotificationType::Finalized(notification));
let stream_responses = sub_data
.response_receiver
.map(|response| NotificationType::MethodResponse(response));
let startup_point = StartupPoint::from(self.client.info());
let initial_events = match self.generate_init_events(&startup_point) {
Ok(blocks) => blocks,
Err(err) => {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to generate the initial events {:?}",
self.sub_id,
err
);
let _ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};
let initial = NotificationType::InitialEvents(initial_events);
let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
let stream = stream::once(futures::future::ready(initial)).chain(merged);
self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await
}
}