sc_consensus_beefy/communication/request_response/
outgoing_requests_engine.rs1use codec::Encode;
22use futures::channel::{oneshot, oneshot::Canceled};
23use log::{debug, warn};
24use parking_lot::Mutex;
25use sc_network::{
26 request_responses::{IfDisconnected, RequestFailure},
27 NetworkRequest, ProtocolName,
28};
29use sc_network_types::PeerId;
30use sp_consensus_beefy::{AuthorityIdBound, ValidatorSet};
31use sp_runtime::traits::{Block, NumberFor};
32use std::{collections::VecDeque, result::Result, sync::Arc};
33
34use crate::{
35 communication::{
36 benefit, cost,
37 peers::PeerReport,
38 request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
39 },
40 justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
41 metric_inc, metric_set,
42 metrics::{register_metrics, OnDemandOutgoingRequestsMetrics},
43 KnownPeers,
44};
45
46type Response = Result<(Vec<u8>, ProtocolName), RequestFailure>;
48type ResponseReceiver = oneshot::Receiver<Response>;
50
51#[derive(Clone, Debug)]
52struct RequestInfo<B: Block, AuthorityId: AuthorityIdBound> {
53 block: NumberFor<B>,
54 active_set: ValidatorSet<AuthorityId>,
55}
56
57enum State<B: Block, AuthorityId: AuthorityIdBound> {
58 Idle,
59 AwaitingResponse(PeerId, RequestInfo<B, AuthorityId>, ResponseReceiver),
60}
61
62pub(crate) enum ResponseInfo<B: Block, AuthorityId: AuthorityIdBound> {
64 Pending,
66 ValidProof(BeefyVersionedFinalityProof<B, AuthorityId>, PeerReport),
68 PeerReport(PeerReport),
70}
71
72pub struct OnDemandJustificationsEngine<B: Block, AuthorityId: AuthorityIdBound> {
73 network: Arc<dyn NetworkRequest + Send + Sync>,
74 protocol_name: ProtocolName,
75
76 live_peers: Arc<Mutex<KnownPeers<B>>>,
77 peers_cache: VecDeque<PeerId>,
78
79 state: State<B, AuthorityId>,
80 metrics: Option<OnDemandOutgoingRequestsMetrics>,
81}
82
83impl<B: Block, AuthorityId: AuthorityIdBound> OnDemandJustificationsEngine<B, AuthorityId> {
84 pub fn new(
85 network: Arc<dyn NetworkRequest + Send + Sync>,
86 protocol_name: ProtocolName,
87 live_peers: Arc<Mutex<KnownPeers<B>>>,
88 prometheus_registry: Option<prometheus_endpoint::Registry>,
89 ) -> Self {
90 let metrics = register_metrics(prometheus_registry);
91 Self {
92 network,
93 protocol_name,
94 live_peers,
95 peers_cache: VecDeque::new(),
96 state: State::Idle,
97 metrics,
98 }
99 }
100
101 fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
102 self.peers_cache = self.live_peers.lock().further_than(block);
103 }
104
105 fn try_next_peer(&mut self) -> Option<PeerId> {
106 let live = self.live_peers.lock();
107 while let Some(peer) = self.peers_cache.pop_front() {
108 if live.contains(&peer) {
109 return Some(peer);
110 }
111 }
112 None
113 }
114
115 fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo<B, AuthorityId>) {
116 debug!(
117 target: BEEFY_SYNC_LOG_TARGET,
118 "🥩 requesting justif #{:?} from peer {:?}", req_info.block, peer,
119 );
120
121 let payload = JustificationRequest::<B> { begin: req_info.block }.encode();
122
123 let (tx, rx) = oneshot::channel();
124
125 self.network.start_request(
126 peer,
127 self.protocol_name.clone(),
128 payload,
129 None,
130 tx,
131 IfDisconnected::ImmediateError,
132 );
133
134 self.state = State::AwaitingResponse(peer, req_info, rx);
135 }
136
137 pub fn request(&mut self, block: NumberFor<B>, active_set: ValidatorSet<AuthorityId>) {
141 if matches!(self.state, State::AwaitingResponse(_, _, _)) {
143 return;
144 }
145 self.reset_peers_cache_for_block(block);
146
147 if let Some(peer) = self.try_next_peer() {
150 self.request_from_peer(peer, RequestInfo { block, active_set });
151 } else {
152 metric_inc!(self.metrics, beefy_on_demand_justification_no_peer_to_request_from);
153 debug!(
154 target: BEEFY_SYNC_LOG_TARGET,
155 "🥩 no good peers to request justif #{:?} from", block
156 );
157 }
158 }
159
160 pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) {
162 match &self.state {
163 State::AwaitingResponse(_, req_info, _) if req_info.block <= block => {
164 debug!(
165 target: BEEFY_SYNC_LOG_TARGET,
166 "🥩 cancel pending request for justification #{:?}", req_info.block
167 );
168 self.state = State::Idle;
169 },
170 _ => (),
171 }
172 }
173
174 fn process_response(
175 &mut self,
176 peer: &PeerId,
177 req_info: &RequestInfo<B, AuthorityId>,
178 response: Result<Response, Canceled>,
179 ) -> Result<BeefyVersionedFinalityProof<B, AuthorityId>, Error> {
180 response
181 .map_err(|e| {
182 debug!(
183 target: BEEFY_SYNC_LOG_TARGET,
184 "🥩 on-demand sc-network channel sender closed, err: {:?}", e
185 );
186 Error::ResponseError
187 })?
188 .map_err(|e| {
189 debug!(
190 target: BEEFY_SYNC_LOG_TARGET,
191 "🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
192 req_info.block,
193 peer,
194 e
195 );
196 match e {
197 RequestFailure::Refused => {
198 metric_inc!(self.metrics, beefy_on_demand_justification_peer_refused);
199 let peer_report =
200 PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE };
201 Error::InvalidResponse(peer_report)
202 },
203 _ => {
204 metric_inc!(self.metrics, beefy_on_demand_justification_peer_error);
205 Error::ResponseError
206 },
207 }
208 })
209 .and_then(|(encoded, _)| {
210 decode_and_verify_finality_proof::<B, AuthorityId>(
211 &encoded[..],
212 req_info.block,
213 &req_info.active_set,
214 )
215 .map_err(|(err, signatures_checked)| {
216 metric_inc!(self.metrics, beefy_on_demand_justification_invalid_proof);
217 debug!(
218 target: BEEFY_SYNC_LOG_TARGET,
219 "🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
220 req_info.block, peer, err
221 );
222 let mut cost = cost::INVALID_PROOF;
223 cost.value +=
224 cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
225 Error::InvalidResponse(PeerReport { who: *peer, cost_benefit: cost })
226 })
227 })
228 }
229
230 pub(crate) async fn next(&mut self) -> ResponseInfo<B, AuthorityId> {
231 let (peer, req_info, resp) = match &mut self.state {
232 State::Idle => {
233 futures::future::pending::<()>().await;
234 return ResponseInfo::Pending;
235 },
236 State::AwaitingResponse(peer, req_info, receiver) => {
237 let resp = receiver.await;
238 (*peer, req_info.clone(), resp)
239 },
240 };
241 self.state = State::Idle;
244
245 metric_set!(self.metrics, beefy_on_demand_live_peers, self.live_peers.lock().len() as u64);
246
247 let block = req_info.block;
248 match self.process_response(&peer, &req_info, resp) {
249 Err(err) => {
250 if let Some(peer) = self.try_next_peer() {
252 self.request_from_peer(peer, req_info);
253 } else {
254 metric_inc!(
255 self.metrics,
256 beefy_on_demand_justification_no_peer_to_request_from
257 );
258
259 let num_cache = self.peers_cache.len();
260 let num_live = self.live_peers.lock().len();
261 warn!(
262 target: BEEFY_SYNC_LOG_TARGET,
263 "🥩 ran out of peers to request justif #{block:?} from num_cache={num_cache} num_live={num_live} err={err:?}",
264 );
265 }
266 if let Error::InvalidResponse(peer_report) = err {
268 ResponseInfo::PeerReport(peer_report)
269 } else {
270 ResponseInfo::Pending
271 }
272 },
273 Ok(proof) => {
274 metric_inc!(self.metrics, beefy_on_demand_justification_good_proof);
275 debug!(
276 target: BEEFY_SYNC_LOG_TARGET,
277 "🥩 received valid on-demand justif #{block:?} from {peer:?}",
278 );
279 let peer_report = PeerReport { who: peer, cost_benefit: benefit::VALIDATED_PROOF };
280 ResponseInfo::ValidProof(proof, peer_report)
281 },
282 }
283 }
284}