polkadot_availability_distribution/
lib.rs1use futures::{future::Either, FutureExt, StreamExt, TryFutureExt};
18
19use sp_keystore::KeystorePtr;
20
21use polkadot_node_network_protocol::request_response::{
22 v1, v2, IncomingRequestReceiver, ReqProtocolNames,
23};
24use polkadot_node_subsystem::{
25 messages::AvailabilityDistributionMessage, overseer, FromOrchestra, OverseerSignal,
26 SpawnedSubsystem, SubsystemError,
27};
28
29mod error;
31use error::{log_error, FatalError, Result};
32
33use polkadot_node_subsystem_util::runtime::RuntimeInfo;
34
35mod requester;
37use requester::Requester;
38
39mod pov_requester;
41
42mod responder;
44use responder::{run_chunk_receivers, run_pov_receiver};
45
46mod metrics;
47pub use metrics::Metrics;
49
50#[cfg(test)]
51mod tests;
52
53const LOG_TARGET: &'static str = "parachain::availability-distribution";
54
55pub struct AvailabilityDistributionSubsystem {
57 runtime: RuntimeInfo,
59 recvs: IncomingRequestReceivers,
61 req_protocol_names: ReqProtocolNames,
63 metrics: Metrics,
65}
66
67pub struct IncomingRequestReceivers {
69 pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
71 pub chunk_req_v1_receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
73 pub chunk_req_v2_receiver: IncomingRequestReceiver<v2::ChunkFetchingRequest>,
75}
76
77#[overseer::subsystem(AvailabilityDistribution, error=SubsystemError, prefix=self::overseer)]
78impl<Context> AvailabilityDistributionSubsystem {
79 fn start(self, ctx: Context) -> SpawnedSubsystem {
80 let future = self
81 .run(ctx)
82 .map_err(|e| SubsystemError::with_origin("availability-distribution", e))
83 .boxed();
84
85 SpawnedSubsystem { name: "availability-distribution-subsystem", future }
86 }
87}
88
89#[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)]
90impl AvailabilityDistributionSubsystem {
91 pub fn new(
93 keystore: KeystorePtr,
94 recvs: IncomingRequestReceivers,
95 req_protocol_names: ReqProtocolNames,
96 metrics: Metrics,
97 ) -> Self {
98 let runtime = RuntimeInfo::new(Some(keystore));
99 Self { runtime, recvs, req_protocol_names, metrics }
100 }
101
102 async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> {
104 let Self { mut runtime, recvs, metrics, req_protocol_names } = self;
105
106 let IncomingRequestReceivers {
107 pov_req_receiver,
108 chunk_req_v1_receiver,
109 chunk_req_v2_receiver,
110 } = recvs;
111 let mut requester = Requester::new(req_protocol_names, metrics.clone()).fuse();
112 let mut warn_freq = gum::Freq::new();
113
114 {
115 let sender = ctx.sender().clone();
116 ctx.spawn(
117 "pov-receiver",
118 run_pov_receiver(sender.clone(), pov_req_receiver, metrics.clone()).boxed(),
119 )
120 .map_err(FatalError::SpawnTask)?;
121
122 ctx.spawn(
123 "chunk-receiver",
124 run_chunk_receivers(
125 sender,
126 chunk_req_v1_receiver,
127 chunk_req_v2_receiver,
128 metrics.clone(),
129 )
130 .boxed(),
131 )
132 .map_err(FatalError::SpawnTask)?;
133 }
134
135 loop {
136 let action = {
137 let mut subsystem_next = ctx.recv().fuse();
138 futures::select! {
139 subsystem_msg = subsystem_next => Either::Left(subsystem_msg),
140 from_task = requester.next() => Either::Right(from_task),
141 }
142 };
143
144 let message = match action {
146 Either::Left(subsystem_msg) =>
147 subsystem_msg.map_err(|e| FatalError::IncomingMessageChannel(e))?,
148 Either::Right(from_task) => {
149 let from_task = from_task.ok_or(FatalError::RequesterExhausted)?;
150 ctx.send_message(from_task).await;
151 continue
152 },
153 };
154 match message {
155 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
156 log_error(
157 requester
158 .get_mut()
159 .update_fetching_heads(&mut ctx, &mut runtime, update)
160 .await,
161 "Error in Requester::update_fetching_heads",
162 &mut warn_freq,
163 )?;
164 },
165 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, _finalized_number)) => {
166 },
167 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
168 FromOrchestra::Communication {
169 msg:
170 AvailabilityDistributionMessage::FetchPoV {
171 relay_parent,
172 from_validator,
173 para_id,
174 candidate_hash,
175 pov_hash,
176 tx,
177 },
178 } => {
179 log_error(
180 pov_requester::fetch_pov(
181 &mut ctx,
182 &mut runtime,
183 relay_parent,
184 from_validator,
185 para_id,
186 candidate_hash,
187 pov_hash,
188 tx,
189 metrics.clone(),
190 )
191 .await,
192 "pov_requester::fetch_pov",
193 &mut warn_freq,
194 )?;
195 },
196 }
197 }
198 }
199}