referrerpolicy=no-referrer-when-downgrade

sc_consensus_beefy/communication/request_response/
incoming_requests_handler.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Substrate is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Substrate is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Substrate. If not, see <https://www.gnu.org/licenses/>.
17
18//! Helper for handling (i.e. answering) BEEFY justifications requests from a remote peer.
19
20use codec::DecodeAll;
21use futures::{channel::oneshot, StreamExt};
22use log::{debug, trace};
23use sc_client_api::BlockBackend;
24use sc_network::{
25	config as netconfig, service::traits::RequestResponseConfig, types::ProtocolName,
26	NetworkBackend, ReputationChange,
27};
28use sc_network_types::PeerId;
29use sp_consensus_beefy::BEEFY_ENGINE_ID;
30use sp_runtime::traits::Block;
31use std::{marker::PhantomData, sync::Arc};
32
33use crate::{
34	communication::{
35		cost,
36		request_response::{
37			on_demand_justifications_protocol_config, Error, JustificationRequest,
38			BEEFY_SYNC_LOG_TARGET,
39		},
40	},
41	metric_inc,
42	metrics::{register_metrics, OnDemandIncomingRequestsMetrics},
43};
44
45/// A request coming in, including a sender for sending responses.
46#[derive(Debug)]
47pub(crate) struct IncomingRequest<B: Block> {
48	/// `PeerId` of sending peer.
49	pub peer: PeerId,
50	/// The sent request.
51	pub payload: JustificationRequest<B>,
52	/// Sender for sending response back.
53	pub pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
54}
55
56impl<B: Block> IncomingRequest<B> {
57	/// Create new `IncomingRequest`.
58	pub fn new(
59		peer: PeerId,
60		payload: JustificationRequest<B>,
61		pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
62	) -> Self {
63		Self { peer, payload, pending_response }
64	}
65
66	/// Try building from raw network request.
67	///
68	/// This function will fail if the request cannot be decoded and will apply passed in
69	/// reputation changes in that case.
70	///
71	/// Params:
72	/// 	- The raw request to decode
73	/// 	- Reputation changes to apply for the peer in case decoding fails.
74	pub fn try_from_raw<F>(
75		raw: netconfig::IncomingRequest,
76		reputation_changes_on_err: F,
77	) -> Result<Self, Error>
78	where
79		F: FnOnce(usize) -> Vec<ReputationChange>,
80	{
81		let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
82		let payload = match JustificationRequest::decode_all(&mut payload.as_ref()) {
83			Ok(payload) => payload,
84			Err(err) => {
85				let response = netconfig::OutgoingResponse {
86					result: Err(()),
87					reputation_changes: reputation_changes_on_err(payload.len()),
88					sent_feedback: None,
89				};
90				if let Err(_) = pending_response.send(response) {
91					return Err(Error::DecodingErrorNoReputationChange(peer, err));
92				}
93				return Err(Error::DecodingError(peer, err));
94			},
95		};
96		Ok(Self::new(peer, payload, pending_response))
97	}
98}
99
100/// Receiver for incoming BEEFY justifications requests.
101///
102/// Takes care of decoding and handling of invalid encoded requests.
103pub(crate) struct IncomingRequestReceiver {
104	raw: async_channel::Receiver<netconfig::IncomingRequest>,
105}
106
107impl IncomingRequestReceiver {
108	pub fn new(inner: async_channel::Receiver<netconfig::IncomingRequest>) -> Self {
109		Self { raw: inner }
110	}
111
112	/// Try to receive the next incoming request.
113	///
114	/// Any received request will be decoded, on decoding errors the provided reputation changes
115	/// will be applied and an error will be reported.
116	pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
117	where
118		B: Block,
119		F: FnOnce(usize) -> Vec<ReputationChange>,
120	{
121		let req = match self.raw.next().await {
122			None => return Err(Error::RequestChannelExhausted),
123			Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes)?,
124		};
125		Ok(req)
126	}
127}
128
129/// Handler for incoming BEEFY justifications requests from a remote peer.
130pub struct BeefyJustifsRequestHandler<B, Client> {
131	pub(crate) request_receiver: IncomingRequestReceiver,
132	pub(crate) justif_protocol_name: ProtocolName,
133	pub(crate) client: Arc<Client>,
134	pub(crate) metrics: Option<OnDemandIncomingRequestsMetrics>,
135	pub(crate) _block: PhantomData<B>,
136}
137
138impl<B, Client> BeefyJustifsRequestHandler<B, Client>
139where
140	B: Block,
141	Client: BlockBackend<B> + Send + Sync,
142{
143	/// Create a new [`BeefyJustifsRequestHandler`].
144	pub fn new<Hash: AsRef<[u8]>, Network: NetworkBackend<B, <B as Block>::Hash>>(
145		genesis_hash: Hash,
146		fork_id: Option<&str>,
147		client: Arc<Client>,
148		prometheus_registry: Option<prometheus_endpoint::Registry>,
149	) -> (Self, Network::RequestResponseProtocolConfig) {
150		let (request_receiver, config): (_, Network::RequestResponseProtocolConfig) =
151			on_demand_justifications_protocol_config::<_, _, Network>(genesis_hash, fork_id);
152		let justif_protocol_name = config.protocol_name().clone();
153		let metrics = register_metrics(prometheus_registry);
154		(
155			Self { request_receiver, justif_protocol_name, client, metrics, _block: PhantomData },
156			config,
157		)
158	}
159
160	/// Network request-response protocol name used by this handler.
161	pub fn protocol_name(&self) -> ProtocolName {
162		self.justif_protocol_name.clone()
163	}
164
165	// Sends back justification response if justification found in client backend.
166	fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
167		let mut reputation_changes = vec![];
168		let maybe_encoded_proof = self
169			.client
170			.block_hash(request.payload.begin)
171			.ok()
172			.flatten()
173			.and_then(|hash| self.client.justifications(hash).ok().flatten())
174			.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
175			.ok_or_else(|| reputation_changes.push(cost::UNKNOWN_PROOF_REQUEST));
176		request
177			.pending_response
178			.send(netconfig::OutgoingResponse {
179				result: maybe_encoded_proof,
180				reputation_changes,
181				sent_feedback: None,
182			})
183			.map_err(|_| Error::SendResponse)
184	}
185
186	/// Run [`BeefyJustifsRequestHandler`].
187	///
188	/// Should never end, returns `Error` otherwise.
189	pub async fn run(&mut self) -> Error {
190		trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");
191
192		while let Ok(request) = self
193			.request_receiver
194			.recv(|bytes| {
195				let bytes = bytes.min(i32::MAX as usize) as i32;
196				vec![ReputationChange::new(
197					bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
198					"BEEFY: Bad request payload",
199				)]
200			})
201			.await
202		{
203			let peer = request.peer;
204			match self.handle_request(request) {
205				Ok(()) => {
206					metric_inc!(self.metrics, beefy_successful_justification_responses);
207					debug!(
208						target: BEEFY_SYNC_LOG_TARGET,
209						"🥩 Handled BEEFY justification request from {:?}.", peer
210					)
211				},
212				Err(e) => {
213					// peer reputation changes already applied in `self.handle_request()`
214					metric_inc!(self.metrics, beefy_failed_justification_responses);
215					debug!(
216						target: BEEFY_SYNC_LOG_TARGET,
217						"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
218					)
219				},
220			}
221		}
222		Error::RequestsReceiverStreamClosed
223	}
224}