referrerpolicy=no-referrer-when-downgrade

sc_consensus_grandpa/
observer.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{
20	marker::{PhantomData, Unpin},
21	pin::Pin,
22	sync::Arc,
23	task::{Context, Poll},
24};
25
26use finality_grandpa::{voter, voter_set::VoterSet, BlockNumberOps, Error as GrandpaError};
27use futures::prelude::*;
28use log::{debug, info, warn};
29
30use sc_client_api::backend::Backend;
31use sc_network::NotificationService;
32use sc_telemetry::TelemetryHandle;
33use sc_utils::mpsc::TracingUnboundedReceiver;
34use sp_blockchain::HeaderMetadata;
35use sp_consensus::SelectChain;
36use sp_consensus_grandpa::AuthorityId;
37use sp_keystore::KeystorePtr;
38use sp_runtime::traits::{Block as BlockT, NumberFor};
39
40use crate::{
41	authorities::SharedAuthoritySet,
42	aux_schema::PersistentData,
43	communication::{Network as NetworkT, NetworkBridge, Syncing as SyncingT},
44	environment, global_communication,
45	notification::GrandpaJustificationSender,
46	ClientForGrandpa, CommandOrError, CommunicationIn, Config, Error, LinkHalf, VoterCommand,
47	VoterSetState, LOG_TARGET,
48};
49
50struct ObserverChain<'a, Block: BlockT, Client> {
51	client: &'a Arc<Client>,
52	_phantom: PhantomData<Block>,
53}
54
55impl<'a, Block, Client> finality_grandpa::Chain<Block::Hash, NumberFor<Block>>
56	for ObserverChain<'a, Block, Client>
57where
58	Block: BlockT,
59	Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
60	NumberFor<Block>: BlockNumberOps,
61{
62	fn ancestry(
63		&self,
64		base: Block::Hash,
65		block: Block::Hash,
66	) -> Result<Vec<Block::Hash>, GrandpaError> {
67		environment::ancestry(self.client, base, block)
68	}
69}
70
71fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
72	client: &Arc<Client>,
73	authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
74	voters: &Arc<VoterSet<AuthorityId>>,
75	justification_sender: &Option<GrandpaJustificationSender<Block>>,
76	last_finalized_number: NumberFor<Block>,
77	commits: S,
78	note_round: F,
79	telemetry: Option<TelemetryHandle>,
80) -> impl Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>>
81where
82	NumberFor<Block>: BlockNumberOps,
83	S: Stream<Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>>,
84	F: Fn(u64),
85	BE: Backend<Block>,
86	Client: ClientForGrandpa<Block, BE>,
87{
88	let authority_set = authority_set.clone();
89	let client = client.clone();
90	let voters = voters.clone();
91	let justification_sender = justification_sender.clone();
92
93	let observer = commits.try_fold(last_finalized_number, move |last_finalized_number, global| {
94		let (round, commit, callback) = match global {
95			voter::CommunicationIn::Commit(round, commit, callback) => {
96				let commit = finality_grandpa::Commit::from(commit);
97				(round, commit, callback)
98			},
99			voter::CommunicationIn::CatchUp(..) => {
100				// ignore catch up messages
101				return future::ok(last_finalized_number)
102			},
103		};
104
105		// if the commit we've received targets a block lower or equal to the last
106		// finalized, ignore it and continue with the current state
107		if commit.target_number <= last_finalized_number {
108			return future::ok(last_finalized_number)
109		}
110
111		let validation_result = match finality_grandpa::validate_commit(
112			&commit,
113			&voters,
114			&ObserverChain { client: &client, _phantom: PhantomData },
115		) {
116			Ok(r) => r,
117			Err(e) => return future::err(e.into()),
118		};
119
120		if validation_result.is_valid() {
121			let finalized_hash = commit.target_hash;
122			let finalized_number = commit.target_number;
123
124			// commit is valid, finalize the block it targets
125			match environment::finalize_block(
126				client.clone(),
127				&authority_set,
128				None,
129				finalized_hash,
130				finalized_number,
131				(round, commit).into(),
132				false,
133				justification_sender.as_ref(),
134				telemetry.clone(),
135			) {
136				Ok(_) => {},
137				Err(e) => return future::err(e),
138			};
139
140			// note that we've observed completion of this round through the commit,
141			// and that implies that the next round has started.
142			note_round(round + 1);
143
144			finality_grandpa::process_commit_validation_result(validation_result, callback);
145
146			// proceed processing with new finalized block number
147			future::ok(finalized_number)
148		} else {
149			debug!(target: LOG_TARGET, "Received invalid commit: ({:?}, {:?})", round, commit);
150
151			finality_grandpa::process_commit_validation_result(validation_result, callback);
152
153			// commit is invalid, continue processing commits with the current state
154			future::ok(last_finalized_number)
155		}
156	});
157
158	observer.map_ok(|_| ())
159}
160
161/// Run a GRANDPA observer as a task, the observer will finalize blocks only by
162/// listening for and validating GRANDPA commits instead of following the full
163/// protocol. Provide configuration and a link to a block import worker that has
164/// already been instantiated with `block_import`.
165/// NOTE: this is currently not part of the crate's public API since we don't consider
166/// it stable enough to use on a live network.
167pub fn run_grandpa_observer<BE, Block: BlockT, Client, N, S, SC>(
168	config: Config,
169	link: LinkHalf<Block, Client, SC>,
170	network: N,
171	sync: S,
172	notification_service: Box<dyn NotificationService>,
173) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
174where
175	BE: Backend<Block> + Unpin + 'static,
176	N: NetworkT<Block>,
177	S: SyncingT<Block>,
178	SC: SelectChain<Block>,
179	NumberFor<Block>: BlockNumberOps,
180	Client: ClientForGrandpa<Block, BE> + 'static,
181{
182	let LinkHalf {
183		client,
184		persistent_data,
185		voter_commands_rx,
186		justification_sender,
187		telemetry,
188		..
189	} = link;
190
191	let network = NetworkBridge::new(
192		network,
193		sync,
194		notification_service,
195		config.clone(),
196		persistent_data.set_state.clone(),
197		None,
198		telemetry.clone(),
199	);
200
201	let observer_work = ObserverWork::new(
202		client,
203		network,
204		persistent_data,
205		config.keystore,
206		voter_commands_rx,
207		Some(justification_sender),
208		telemetry,
209	);
210
211	let observer_work = observer_work.map_ok(|_| ()).map_err(|e| {
212		warn!("GRANDPA Observer failed: {}", e);
213	});
214
215	Ok(observer_work.map(drop))
216}
217
218/// Future that powers the observer.
219#[must_use]
220struct ObserverWork<B: BlockT, BE, Client, N: NetworkT<B>, S: SyncingT<B>> {
221	observer:
222		Pin<Box<dyn Future<Output = Result<(), CommandOrError<B::Hash, NumberFor<B>>>> + Send>>,
223	client: Arc<Client>,
224	network: NetworkBridge<B, N, S>,
225	persistent_data: PersistentData<B>,
226	keystore: Option<KeystorePtr>,
227	voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
228	justification_sender: Option<GrandpaJustificationSender<B>>,
229	telemetry: Option<TelemetryHandle>,
230	_phantom: PhantomData<BE>,
231}
232
233impl<B, BE, Client, Network, Syncing> ObserverWork<B, BE, Client, Network, Syncing>
234where
235	B: BlockT,
236	BE: Backend<B> + 'static,
237	Client: ClientForGrandpa<B, BE> + 'static,
238	Network: NetworkT<B>,
239	Syncing: SyncingT<B>,
240	NumberFor<B>: BlockNumberOps,
241{
242	fn new(
243		client: Arc<Client>,
244		network: NetworkBridge<B, Network, Syncing>,
245		persistent_data: PersistentData<B>,
246		keystore: Option<KeystorePtr>,
247		voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
248		justification_sender: Option<GrandpaJustificationSender<B>>,
249		telemetry: Option<TelemetryHandle>,
250	) -> Self {
251		let mut work = ObserverWork {
252			// `observer` is set to a temporary value and replaced below when
253			// calling `rebuild_observer`.
254			observer: Box::pin(future::pending()) as Pin<Box<_>>,
255			client,
256			network,
257			persistent_data,
258			keystore: keystore.clone(),
259			voter_commands_rx,
260			justification_sender,
261			telemetry,
262			_phantom: PhantomData,
263		};
264		work.rebuild_observer();
265		work
266	}
267
268	/// Rebuilds the `self.observer` field using the current authority set
269	/// state. This method should be called when we know that the authority set
270	/// has changed (e.g. as signalled by a voter command).
271	fn rebuild_observer(&mut self) {
272		let set_id = self.persistent_data.authority_set.set_id();
273		let voters = Arc::new(self.persistent_data.authority_set.current_authorities());
274
275		// start global communication stream for the current set
276		let (global_in, _) = global_communication(
277			set_id,
278			&voters,
279			self.client.clone(),
280			&self.network,
281			self.keystore.as_ref(),
282			None,
283		);
284
285		let last_finalized_number = self.client.info().finalized_number;
286
287		// NOTE: since we are not using `round_communication` we have to
288		// manually note the round with the gossip validator, otherwise we won't
289		// relay round messages. we want all full nodes to contribute to vote
290		// availability.
291		let note_round = {
292			let network = self.network.clone();
293			let voters = voters.clone();
294
295			move |round| {
296				network.note_round(
297					crate::communication::Round(round),
298					crate::communication::SetId(set_id),
299					&voters,
300				)
301			}
302		};
303
304		// create observer for the current set
305		let observer = grandpa_observer(
306			&self.client,
307			&self.persistent_data.authority_set,
308			&voters,
309			&self.justification_sender,
310			last_finalized_number,
311			global_in,
312			note_round,
313			self.telemetry.clone(),
314		);
315
316		self.observer = Box::pin(observer);
317	}
318
319	fn handle_voter_command(
320		&mut self,
321		command: VoterCommand<B::Hash, NumberFor<B>>,
322	) -> Result<(), Error> {
323		// the observer doesn't use the voter set state, but we need to
324		// update it on-disk in case we restart as validator in the future.
325		self.persistent_data.set_state = match command {
326			VoterCommand::Pause(reason) => {
327				info!(target: LOG_TARGET, "Pausing old validator set: {}", reason);
328
329				let completed_rounds = self.persistent_data.set_state.read().completed_rounds();
330				let set_state = VoterSetState::Paused { completed_rounds };
331
332				crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
333
334				set_state
335			},
336			VoterCommand::ChangeAuthorities(new) => {
337				// start the new authority set using the block where the
338				// set changed (not where the signal happened!) as the base.
339				let set_state = VoterSetState::live(
340					new.set_id,
341					&*self.persistent_data.authority_set.inner(),
342					(new.canon_hash, new.canon_number),
343				);
344
345				crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
346
347				set_state
348			},
349		}
350		.into();
351
352		self.rebuild_observer();
353		Ok(())
354	}
355}
356
357impl<B, BE, C, N, S> Future for ObserverWork<B, BE, C, N, S>
358where
359	B: BlockT,
360	BE: Backend<B> + Unpin + 'static,
361	C: ClientForGrandpa<B, BE> + 'static,
362	N: NetworkT<B>,
363	S: SyncingT<B>,
364	NumberFor<B>: BlockNumberOps,
365{
366	type Output = Result<(), Error>;
367
368	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
369		match Future::poll(Pin::new(&mut self.observer), cx) {
370			Poll::Pending => {},
371			Poll::Ready(Ok(())) => {
372				// observer commit stream doesn't conclude naturally; this could reasonably be an
373				// error.
374				return Poll::Ready(Ok(()))
375			},
376			Poll::Ready(Err(CommandOrError::Error(e))) => {
377				// return inner observer error
378				return Poll::Ready(Err(e))
379			},
380			Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
381				// some command issued internally
382				self.handle_voter_command(command)?;
383				cx.waker().wake_by_ref();
384			},
385		}
386
387		match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
388			Poll::Pending => {},
389			Poll::Ready(None) => {
390				// the `voter_commands_rx` stream should never conclude since it's never closed.
391				return Poll::Ready(Ok(()))
392			},
393			Poll::Ready(Some(command)) => {
394				// some command issued externally
395				self.handle_voter_command(command)?;
396				cx.waker().wake_by_ref();
397			},
398		}
399
400		Future::poll(Pin::new(&mut self.network), cx)
401	}
402}
403
404#[cfg(test)]
405mod tests {
406	use super::*;
407
408	use crate::{
409		aux_schema,
410		communication::tests::{make_test_network, Event},
411	};
412	use assert_matches::assert_matches;
413	use sc_network_types::PeerId;
414	use sc_utils::mpsc::tracing_unbounded;
415	use sp_blockchain::HeaderBackend as _;
416	use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
417
418	use futures::executor;
419
420	/// Ensure `Future` implementation of `ObserverWork` is polling its `NetworkBridge`.
421	/// Regression test for bug introduced in d4fbb897c and fixed in b7af8b339.
422	///
423	/// When polled, `NetworkBridge` forwards reputation change requests from the
424	/// `GossipValidator` to the underlying `dyn Network`. This test triggers a reputation change
425	/// by calling `GossipValidator::validate` with an invalid gossip message. After polling the
426	/// `ObserverWork` which should poll the `NetworkBridge`, the reputation change should be
427	/// forwarded to the test network.
428	#[test]
429	fn observer_work_polls_underlying_network_bridge() {
430		// Create a test network.
431		let (tester_fut, _network) = make_test_network();
432		let mut tester = executor::block_on(tester_fut);
433
434		// Create an observer.
435		let (client, backend) = {
436			let builder = TestClientBuilder::with_default_backend();
437			let backend = builder.backend();
438			let (client, _) = builder.build_with_longest_chain();
439			(Arc::new(client), backend)
440		};
441
442		let voters = vec![(sp_keyring::Ed25519Keyring::Alice.public().into(), 1)];
443
444		let persistent_data =
445			aux_schema::load_persistent(&*backend, client.info().genesis_hash, 0, || Ok(voters))
446				.unwrap();
447
448		let (_tx, voter_command_rx) = tracing_unbounded("test_mpsc_voter_command", 100_000);
449
450		let observer = ObserverWork::new(
451			client,
452			tester.net_handle.clone(),
453			persistent_data,
454			None,
455			voter_command_rx,
456			None,
457			None,
458		);
459
460		// Trigger a reputation change through the gossip validator.
461		let peer_id = PeerId::random();
462		tester.trigger_gossip_validator_reputation_change(&peer_id);
463
464		executor::block_on(async move {
465			// Poll the observer once and have it forward the reputation change from the gossip
466			// validator to the test network.
467			assert!(observer.now_or_never().is_none());
468
469			assert_matches!(tester.events.next().now_or_never(), Some(Some(Event::Report(_, _))));
470		});
471	}
472}