referrerpolicy=no-referrer-when-downgrade

sc_consensus_beefy/communication/request_response/
outgoing_requests_engine.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Generating request logic for request/response protocol for syncing BEEFY justifications.
20
21use codec::Encode;
22use futures::channel::{oneshot, oneshot::Canceled};
23use log::{debug, warn};
24use parking_lot::Mutex;
25use sc_network::{
26	request_responses::{IfDisconnected, RequestFailure},
27	NetworkRequest, ProtocolName,
28};
29use sc_network_types::PeerId;
30use sp_consensus_beefy::{AuthorityIdBound, ValidatorSet};
31use sp_runtime::traits::{Block, NumberFor};
32use std::{collections::VecDeque, result::Result, sync::Arc};
33
34use crate::{
35	communication::{
36		benefit, cost,
37		peers::PeerReport,
38		request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
39	},
40	justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
41	metric_inc, metric_set,
42	metrics::{register_metrics, OnDemandOutgoingRequestsMetrics},
43	KnownPeers,
44};
45
46/// Response type received from network.
47type Response = Result<(Vec<u8>, ProtocolName), RequestFailure>;
48/// Used to receive a response from the network.
49type ResponseReceiver = oneshot::Receiver<Response>;
50
51#[derive(Clone, Debug)]
52struct RequestInfo<B: Block, AuthorityId: AuthorityIdBound> {
53	block: NumberFor<B>,
54	active_set: ValidatorSet<AuthorityId>,
55}
56
57enum State<B: Block, AuthorityId: AuthorityIdBound> {
58	Idle,
59	AwaitingResponse(PeerId, RequestInfo<B, AuthorityId>, ResponseReceiver),
60}
61
62/// Possible engine responses.
63pub(crate) enum ResponseInfo<B: Block, AuthorityId: AuthorityIdBound> {
64	/// No peer response available yet.
65	Pending,
66	/// Valid justification provided alongside peer reputation changes.
67	ValidProof(BeefyVersionedFinalityProof<B, AuthorityId>, PeerReport),
68	/// No justification yet, only peer reputation changes.
69	PeerReport(PeerReport),
70}
71
72pub struct OnDemandJustificationsEngine<B: Block, AuthorityId: AuthorityIdBound> {
73	network: Arc<dyn NetworkRequest + Send + Sync>,
74	protocol_name: ProtocolName,
75
76	live_peers: Arc<Mutex<KnownPeers<B>>>,
77	peers_cache: VecDeque<PeerId>,
78
79	state: State<B, AuthorityId>,
80	metrics: Option<OnDemandOutgoingRequestsMetrics>,
81}
82
83impl<B: Block, AuthorityId: AuthorityIdBound> OnDemandJustificationsEngine<B, AuthorityId> {
84	pub fn new(
85		network: Arc<dyn NetworkRequest + Send + Sync>,
86		protocol_name: ProtocolName,
87		live_peers: Arc<Mutex<KnownPeers<B>>>,
88		prometheus_registry: Option<prometheus_endpoint::Registry>,
89	) -> Self {
90		let metrics = register_metrics(prometheus_registry);
91		Self {
92			network,
93			protocol_name,
94			live_peers,
95			peers_cache: VecDeque::new(),
96			state: State::Idle,
97			metrics,
98		}
99	}
100
101	fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
102		self.peers_cache = self.live_peers.lock().further_than(block);
103	}
104
105	fn try_next_peer(&mut self) -> Option<PeerId> {
106		let live = self.live_peers.lock();
107		while let Some(peer) = self.peers_cache.pop_front() {
108			if live.contains(&peer) {
109				return Some(peer);
110			}
111		}
112		None
113	}
114
115	fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo<B, AuthorityId>) {
116		debug!(
117			target: BEEFY_SYNC_LOG_TARGET,
118			"🥩 requesting justif #{:?} from peer {:?}", req_info.block, peer,
119		);
120
121		let payload = JustificationRequest::<B> { begin: req_info.block }.encode();
122
123		let (tx, rx) = oneshot::channel();
124
125		self.network.start_request(
126			peer,
127			self.protocol_name.clone(),
128			payload,
129			None,
130			tx,
131			IfDisconnected::ImmediateError,
132		);
133
134		self.state = State::AwaitingResponse(peer, req_info, rx);
135	}
136
137	/// Start new justification request for `block`, if no other request is in progress.
138	///
139	/// `active_set` will be used to verify validity of potential responses.
140	pub fn request(&mut self, block: NumberFor<B>, active_set: ValidatorSet<AuthorityId>) {
141		// ignore new requests while there's already one pending
142		if matches!(self.state, State::AwaitingResponse(_, _, _)) {
143			return;
144		}
145		self.reset_peers_cache_for_block(block);
146
147		// Start the requests engine - each unsuccessful received response will automatically
148		// trigger a new request to the next peer in the `peers_cache` until there are none left.
149		if let Some(peer) = self.try_next_peer() {
150			self.request_from_peer(peer, RequestInfo { block, active_set });
151		} else {
152			metric_inc!(self.metrics, beefy_on_demand_justification_no_peer_to_request_from);
153			debug!(
154				target: BEEFY_SYNC_LOG_TARGET,
155				"🥩 no good peers to request justif #{:?} from", block
156			);
157		}
158	}
159
160	/// Cancel any pending request for block numbers smaller or equal to `block`.
161	pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) {
162		match &self.state {
163			State::AwaitingResponse(_, req_info, _) if req_info.block <= block => {
164				debug!(
165					target: BEEFY_SYNC_LOG_TARGET,
166					"🥩 cancel pending request for justification #{:?}", req_info.block
167				);
168				self.state = State::Idle;
169			},
170			_ => (),
171		}
172	}
173
174	fn process_response(
175		&mut self,
176		peer: &PeerId,
177		req_info: &RequestInfo<B, AuthorityId>,
178		response: Result<Response, Canceled>,
179	) -> Result<BeefyVersionedFinalityProof<B, AuthorityId>, Error> {
180		response
181			.map_err(|e| {
182				debug!(
183					target: BEEFY_SYNC_LOG_TARGET,
184					"🥩 on-demand sc-network channel sender closed, err: {:?}", e
185				);
186				Error::ResponseError
187			})?
188			.map_err(|e| {
189				debug!(
190					target: BEEFY_SYNC_LOG_TARGET,
191					"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
192					req_info.block,
193					peer,
194					e
195				);
196				match e {
197					RequestFailure::Refused => {
198						metric_inc!(self.metrics, beefy_on_demand_justification_peer_refused);
199						let peer_report =
200							PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE };
201						Error::InvalidResponse(peer_report)
202					},
203					_ => {
204						metric_inc!(self.metrics, beefy_on_demand_justification_peer_error);
205						Error::ResponseError
206					},
207				}
208			})
209			.and_then(|(encoded, _)| {
210				decode_and_verify_finality_proof::<B, AuthorityId>(
211					&encoded[..],
212					req_info.block,
213					&req_info.active_set,
214				)
215				.map_err(|(err, signatures_checked)| {
216					metric_inc!(self.metrics, beefy_on_demand_justification_invalid_proof);
217					debug!(
218						target: BEEFY_SYNC_LOG_TARGET,
219						"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
220						req_info.block, peer, err
221					);
222					let mut cost = cost::INVALID_PROOF;
223					cost.value +=
224						cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
225					Error::InvalidResponse(PeerReport { who: *peer, cost_benefit: cost })
226				})
227			})
228	}
229
230	pub(crate) async fn next(&mut self) -> ResponseInfo<B, AuthorityId> {
231		let (peer, req_info, resp) = match &mut self.state {
232			State::Idle => {
233				futures::future::pending::<()>().await;
234				return ResponseInfo::Pending;
235			},
236			State::AwaitingResponse(peer, req_info, receiver) => {
237				let resp = receiver.await;
238				(*peer, req_info.clone(), resp)
239			},
240		};
241		// We received the awaited response. Our 'receiver' will never generate any other response,
242		// meaning we're done with current state. Move the engine to `State::Idle`.
243		self.state = State::Idle;
244
245		metric_set!(self.metrics, beefy_on_demand_live_peers, self.live_peers.lock().len() as u64);
246
247		let block = req_info.block;
248		match self.process_response(&peer, &req_info, resp) {
249			Err(err) => {
250				// No valid justification received, try next peer in our set.
251				if let Some(peer) = self.try_next_peer() {
252					self.request_from_peer(peer, req_info);
253				} else {
254					metric_inc!(
255						self.metrics,
256						beefy_on_demand_justification_no_peer_to_request_from
257					);
258
259					let num_cache = self.peers_cache.len();
260					let num_live = self.live_peers.lock().len();
261					warn!(
262						target: BEEFY_SYNC_LOG_TARGET,
263						"🥩 ran out of peers to request justif #{block:?} from num_cache={num_cache} num_live={num_live} err={err:?}",
264					);
265				}
266				// Report peer based on error type.
267				if let Error::InvalidResponse(peer_report) = err {
268					ResponseInfo::PeerReport(peer_report)
269				} else {
270					ResponseInfo::Pending
271				}
272			},
273			Ok(proof) => {
274				metric_inc!(self.metrics, beefy_on_demand_justification_good_proof);
275				debug!(
276					target: BEEFY_SYNC_LOG_TARGET,
277					"🥩 received valid on-demand justif #{block:?} from {peer:?}",
278				);
279				let peer_report = PeerReport { who: peer, cost_benefit: benefit::VALIDATED_PROOF };
280				ResponseInfo::ValidProof(proof, peer_report)
281			},
282		}
283	}
284}