sc_consensus_beefy/communication/request_response/
incoming_requests_handler.rs1use codec::DecodeAll;
21use futures::{channel::oneshot, StreamExt};
22use log::{debug, trace};
23use sc_client_api::BlockBackend;
24use sc_network::{
25 config as netconfig, service::traits::RequestResponseConfig, types::ProtocolName,
26 NetworkBackend, ReputationChange,
27};
28use sc_network_types::PeerId;
29use sp_consensus_beefy::BEEFY_ENGINE_ID;
30use sp_runtime::traits::Block;
31use std::{marker::PhantomData, sync::Arc};
32
33use crate::{
34 communication::{
35 cost,
36 request_response::{
37 on_demand_justifications_protocol_config, Error, JustificationRequest,
38 BEEFY_SYNC_LOG_TARGET,
39 },
40 },
41 metric_inc,
42 metrics::{register_metrics, OnDemandIncomingRequestsMetrics},
43};
44
45#[derive(Debug)]
47pub(crate) struct IncomingRequest<B: Block> {
48 pub peer: PeerId,
50 pub payload: JustificationRequest<B>,
52 pub pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
54}
55
56impl<B: Block> IncomingRequest<B> {
57 pub fn new(
59 peer: PeerId,
60 payload: JustificationRequest<B>,
61 pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
62 ) -> Self {
63 Self { peer, payload, pending_response }
64 }
65
66 pub fn try_from_raw<F>(
75 raw: netconfig::IncomingRequest,
76 reputation_changes_on_err: F,
77 ) -> Result<Self, Error>
78 where
79 F: FnOnce(usize) -> Vec<ReputationChange>,
80 {
81 let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
82 let payload = match JustificationRequest::decode_all(&mut payload.as_ref()) {
83 Ok(payload) => payload,
84 Err(err) => {
85 let response = netconfig::OutgoingResponse {
86 result: Err(()),
87 reputation_changes: reputation_changes_on_err(payload.len()),
88 sent_feedback: None,
89 };
90 if let Err(_) = pending_response.send(response) {
91 return Err(Error::DecodingErrorNoReputationChange(peer, err));
92 }
93 return Err(Error::DecodingError(peer, err));
94 },
95 };
96 Ok(Self::new(peer, payload, pending_response))
97 }
98}
99
100pub(crate) struct IncomingRequestReceiver {
104 raw: async_channel::Receiver<netconfig::IncomingRequest>,
105}
106
107impl IncomingRequestReceiver {
108 pub fn new(inner: async_channel::Receiver<netconfig::IncomingRequest>) -> Self {
109 Self { raw: inner }
110 }
111
112 pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
117 where
118 B: Block,
119 F: FnOnce(usize) -> Vec<ReputationChange>,
120 {
121 let req = match self.raw.next().await {
122 None => return Err(Error::RequestChannelExhausted),
123 Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes)?,
124 };
125 Ok(req)
126 }
127}
128
129pub struct BeefyJustifsRequestHandler<B, Client> {
131 pub(crate) request_receiver: IncomingRequestReceiver,
132 pub(crate) justif_protocol_name: ProtocolName,
133 pub(crate) client: Arc<Client>,
134 pub(crate) metrics: Option<OnDemandIncomingRequestsMetrics>,
135 pub(crate) _block: PhantomData<B>,
136}
137
138impl<B, Client> BeefyJustifsRequestHandler<B, Client>
139where
140 B: Block,
141 Client: BlockBackend<B> + Send + Sync,
142{
143 pub fn new<Hash: AsRef<[u8]>, Network: NetworkBackend<B, <B as Block>::Hash>>(
145 genesis_hash: Hash,
146 fork_id: Option<&str>,
147 client: Arc<Client>,
148 prometheus_registry: Option<prometheus_endpoint::Registry>,
149 ) -> (Self, Network::RequestResponseProtocolConfig) {
150 let (request_receiver, config): (_, Network::RequestResponseProtocolConfig) =
151 on_demand_justifications_protocol_config::<_, _, Network>(genesis_hash, fork_id);
152 let justif_protocol_name = config.protocol_name().clone();
153 let metrics = register_metrics(prometheus_registry);
154 (
155 Self { request_receiver, justif_protocol_name, client, metrics, _block: PhantomData },
156 config,
157 )
158 }
159
160 pub fn protocol_name(&self) -> ProtocolName {
162 self.justif_protocol_name.clone()
163 }
164
165 fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
167 let mut reputation_changes = vec![];
168 let maybe_encoded_proof = self
169 .client
170 .block_hash(request.payload.begin)
171 .ok()
172 .flatten()
173 .and_then(|hash| self.client.justifications(hash).ok().flatten())
174 .and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
175 .ok_or_else(|| reputation_changes.push(cost::UNKNOWN_PROOF_REQUEST));
176 request
177 .pending_response
178 .send(netconfig::OutgoingResponse {
179 result: maybe_encoded_proof,
180 reputation_changes,
181 sent_feedback: None,
182 })
183 .map_err(|_| Error::SendResponse)
184 }
185
186 pub async fn run(&mut self) -> Error {
190 trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");
191
192 while let Ok(request) = self
193 .request_receiver
194 .recv(|bytes| {
195 let bytes = bytes.min(i32::MAX as usize) as i32;
196 vec![ReputationChange::new(
197 bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
198 "BEEFY: Bad request payload",
199 )]
200 })
201 .await
202 {
203 let peer = request.peer;
204 match self.handle_request(request) {
205 Ok(()) => {
206 metric_inc!(self.metrics, beefy_successful_justification_responses);
207 debug!(
208 target: BEEFY_SYNC_LOG_TARGET,
209 "🥩 Handled BEEFY justification request from {:?}.", peer
210 )
211 },
212 Err(e) => {
213 metric_inc!(self.metrics, beefy_failed_justification_responses);
215 debug!(
216 target: BEEFY_SYNC_LOG_TARGET,
217 "🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
218 )
219 },
220 }
221 }
222 Error::RequestsReceiverStreamClosed
223 }
224}