#![deny(unused_crate_dependencies, unused_results)]
#![warn(missing_docs)]
use std::sync::Arc;
use futures::prelude::*;
use sc_client_api::AuxStore;
use futures::stream::StreamExt;
use polkadot_node_subsystem::{
messages::ChainApiMessage, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem,
SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_types::ChainApiBackend;
mod metrics;
use self::metrics::Metrics;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::chain-api";
pub struct ChainApiSubsystem<Client> {
client: Arc<Client>,
metrics: Metrics,
}
impl<Client> ChainApiSubsystem<Client> {
pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
ChainApiSubsystem { client, metrics }
}
}
#[overseer::subsystem(ChainApi, error = SubsystemError, prefix = self::overseer)]
impl<Client, Context> ChainApiSubsystem<Client>
where
Client: ChainApiBackend + AuxStore + 'static,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run::<Client, Context>(ctx, self)
.map_err(|e| SubsystemError::with_origin("chain-api", e))
.boxed();
SpawnedSubsystem { future, name: "chain-api-subsystem" }
}
}
#[overseer::contextbounds(ChainApi, prefix = self::overseer)]
async fn run<Client, Context>(
mut ctx: Context,
subsystem: ChainApiSubsystem<Client>,
) -> SubsystemResult<()>
where
Client: ChainApiBackend + AuxStore,
{
loop {
match ctx.recv().await? {
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Communication { msg } => match msg {
ChainApiMessage::BlockNumber(hash, response_channel) => {
let _timer = subsystem.metrics.time_block_number();
let result =
subsystem.client.number(hash).await.map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::BlockHeader(hash, response_channel) => {
let _timer = subsystem.metrics.time_block_header();
let result =
subsystem.client.header(hash).await.map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::BlockWeight(hash, response_channel) => {
let _timer = subsystem.metrics.time_block_weight();
let result = sc_consensus_babe::block_weight(&*subsystem.client, hash)
.map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::FinalizedBlockHash(number, response_channel) => {
let _timer = subsystem.metrics.time_finalized_block_hash();
let result =
subsystem.client.hash(number).await.map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::FinalizedBlockNumber(response_channel) => {
let _timer = subsystem.metrics.time_finalized_block_number();
let result = subsystem
.client
.info()
.await
.map_err(|e| e.to_string().into())
.map(|info| info.finalized_number);
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::Ancestors { hash, k, response_channel } => {
let _timer = subsystem.metrics.time_ancestors();
gum::trace!(target: LOG_TARGET, hash=%hash, k=k, "ChainApiMessage::Ancestors");
let next_parent_stream = futures::stream::unfold(
(hash, subsystem.client.clone()),
|(hash, client)| async move {
let maybe_header = client.header(hash).await;
match maybe_header {
Err(e) => {
let e = e.to_string().into();
Some((Err(e), (hash, client)))
},
Ok(None) => None,
Ok(Some(header)) => {
if header.number == 0 {
None
} else {
Some((Ok(header.parent_hash), (header.parent_hash, client)))
}
},
}
},
);
let result = next_parent_stream.take(k).try_collect().await;
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
},
}
}
}