1use std::{
20 marker::{PhantomData, Unpin},
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25
26use finality_grandpa::{voter, voter_set::VoterSet, BlockNumberOps, Error as GrandpaError};
27use futures::prelude::*;
28use log::{debug, info, warn};
29
30use sc_client_api::backend::Backend;
31use sc_network::NotificationService;
32use sc_telemetry::TelemetryHandle;
33use sc_utils::mpsc::TracingUnboundedReceiver;
34use sp_blockchain::HeaderMetadata;
35use sp_consensus::SelectChain;
36use sp_consensus_grandpa::AuthorityId;
37use sp_keystore::KeystorePtr;
38use sp_runtime::traits::{Block as BlockT, NumberFor};
39
40use crate::{
41 authorities::SharedAuthoritySet,
42 aux_schema::PersistentData,
43 communication::{Network as NetworkT, NetworkBridge, Syncing as SyncingT},
44 environment, global_communication,
45 notification::GrandpaJustificationSender,
46 ClientForGrandpa, CommandOrError, CommunicationIn, Config, Error, LinkHalf, VoterCommand,
47 VoterSetState, LOG_TARGET,
48};
49
50struct ObserverChain<'a, Block: BlockT, Client> {
51 client: &'a Arc<Client>,
52 _phantom: PhantomData<Block>,
53}
54
55impl<'a, Block, Client> finality_grandpa::Chain<Block::Hash, NumberFor<Block>>
56 for ObserverChain<'a, Block, Client>
57where
58 Block: BlockT,
59 Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
60 NumberFor<Block>: BlockNumberOps,
61{
62 fn ancestry(
63 &self,
64 base: Block::Hash,
65 block: Block::Hash,
66 ) -> Result<Vec<Block::Hash>, GrandpaError> {
67 environment::ancestry(self.client, base, block)
68 }
69}
70
71fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
72 client: &Arc<Client>,
73 authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
74 voters: &Arc<VoterSet<AuthorityId>>,
75 justification_sender: &Option<GrandpaJustificationSender<Block>>,
76 last_finalized_number: NumberFor<Block>,
77 commits: S,
78 note_round: F,
79 telemetry: Option<TelemetryHandle>,
80) -> impl Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>>
81where
82 NumberFor<Block>: BlockNumberOps,
83 S: Stream<Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>>,
84 F: Fn(u64),
85 BE: Backend<Block>,
86 Client: ClientForGrandpa<Block, BE>,
87{
88 let authority_set = authority_set.clone();
89 let client = client.clone();
90 let voters = voters.clone();
91 let justification_sender = justification_sender.clone();
92
93 let observer = commits.try_fold(last_finalized_number, move |last_finalized_number, global| {
94 let (round, commit, callback) = match global {
95 voter::CommunicationIn::Commit(round, commit, callback) => {
96 let commit = finality_grandpa::Commit::from(commit);
97 (round, commit, callback)
98 },
99 voter::CommunicationIn::CatchUp(..) => {
100 return future::ok(last_finalized_number)
102 },
103 };
104
105 if commit.target_number <= last_finalized_number {
108 return future::ok(last_finalized_number)
109 }
110
111 let validation_result = match finality_grandpa::validate_commit(
112 &commit,
113 &voters,
114 &ObserverChain { client: &client, _phantom: PhantomData },
115 ) {
116 Ok(r) => r,
117 Err(e) => return future::err(e.into()),
118 };
119
120 if validation_result.is_valid() {
121 let finalized_hash = commit.target_hash;
122 let finalized_number = commit.target_number;
123
124 match environment::finalize_block(
126 client.clone(),
127 &authority_set,
128 None,
129 finalized_hash,
130 finalized_number,
131 (round, commit).into(),
132 false,
133 justification_sender.as_ref(),
134 telemetry.clone(),
135 ) {
136 Ok(_) => {},
137 Err(e) => return future::err(e),
138 };
139
140 note_round(round + 1);
143
144 finality_grandpa::process_commit_validation_result(validation_result, callback);
145
146 future::ok(finalized_number)
148 } else {
149 debug!(target: LOG_TARGET, "Received invalid commit: ({:?}, {:?})", round, commit);
150
151 finality_grandpa::process_commit_validation_result(validation_result, callback);
152
153 future::ok(last_finalized_number)
155 }
156 });
157
158 observer.map_ok(|_| ())
159}
160
161pub fn run_grandpa_observer<BE, Block: BlockT, Client, N, S, SC>(
168 config: Config,
169 link: LinkHalf<Block, Client, SC>,
170 network: N,
171 sync: S,
172 notification_service: Box<dyn NotificationService>,
173) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
174where
175 BE: Backend<Block> + Unpin + 'static,
176 N: NetworkT<Block>,
177 S: SyncingT<Block>,
178 SC: SelectChain<Block>,
179 NumberFor<Block>: BlockNumberOps,
180 Client: ClientForGrandpa<Block, BE> + 'static,
181{
182 let LinkHalf {
183 client,
184 persistent_data,
185 voter_commands_rx,
186 justification_sender,
187 telemetry,
188 ..
189 } = link;
190
191 let network = NetworkBridge::new(
192 network,
193 sync,
194 notification_service,
195 config.clone(),
196 persistent_data.set_state.clone(),
197 None,
198 telemetry.clone(),
199 );
200
201 let observer_work = ObserverWork::new(
202 client,
203 network,
204 persistent_data,
205 config.keystore,
206 voter_commands_rx,
207 Some(justification_sender),
208 telemetry,
209 );
210
211 let observer_work = observer_work.map_ok(|_| ()).map_err(|e| {
212 warn!("GRANDPA Observer failed: {}", e);
213 });
214
215 Ok(observer_work.map(drop))
216}
217
218#[must_use]
220struct ObserverWork<B: BlockT, BE, Client, N: NetworkT<B>, S: SyncingT<B>> {
221 observer:
222 Pin<Box<dyn Future<Output = Result<(), CommandOrError<B::Hash, NumberFor<B>>>> + Send>>,
223 client: Arc<Client>,
224 network: NetworkBridge<B, N, S>,
225 persistent_data: PersistentData<B>,
226 keystore: Option<KeystorePtr>,
227 voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
228 justification_sender: Option<GrandpaJustificationSender<B>>,
229 telemetry: Option<TelemetryHandle>,
230 _phantom: PhantomData<BE>,
231}
232
233impl<B, BE, Client, Network, Syncing> ObserverWork<B, BE, Client, Network, Syncing>
234where
235 B: BlockT,
236 BE: Backend<B> + 'static,
237 Client: ClientForGrandpa<B, BE> + 'static,
238 Network: NetworkT<B>,
239 Syncing: SyncingT<B>,
240 NumberFor<B>: BlockNumberOps,
241{
242 fn new(
243 client: Arc<Client>,
244 network: NetworkBridge<B, Network, Syncing>,
245 persistent_data: PersistentData<B>,
246 keystore: Option<KeystorePtr>,
247 voter_commands_rx: TracingUnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
248 justification_sender: Option<GrandpaJustificationSender<B>>,
249 telemetry: Option<TelemetryHandle>,
250 ) -> Self {
251 let mut work = ObserverWork {
252 observer: Box::pin(future::pending()) as Pin<Box<_>>,
255 client,
256 network,
257 persistent_data,
258 keystore: keystore.clone(),
259 voter_commands_rx,
260 justification_sender,
261 telemetry,
262 _phantom: PhantomData,
263 };
264 work.rebuild_observer();
265 work
266 }
267
268 fn rebuild_observer(&mut self) {
272 let set_id = self.persistent_data.authority_set.set_id();
273 let voters = Arc::new(self.persistent_data.authority_set.current_authorities());
274
275 let (global_in, _) = global_communication(
277 set_id,
278 &voters,
279 self.client.clone(),
280 &self.network,
281 self.keystore.as_ref(),
282 None,
283 );
284
285 let last_finalized_number = self.client.info().finalized_number;
286
287 let note_round = {
292 let network = self.network.clone();
293 let voters = voters.clone();
294
295 move |round| {
296 network.note_round(
297 crate::communication::Round(round),
298 crate::communication::SetId(set_id),
299 &voters,
300 )
301 }
302 };
303
304 let observer = grandpa_observer(
306 &self.client,
307 &self.persistent_data.authority_set,
308 &voters,
309 &self.justification_sender,
310 last_finalized_number,
311 global_in,
312 note_round,
313 self.telemetry.clone(),
314 );
315
316 self.observer = Box::pin(observer);
317 }
318
319 fn handle_voter_command(
320 &mut self,
321 command: VoterCommand<B::Hash, NumberFor<B>>,
322 ) -> Result<(), Error> {
323 self.persistent_data.set_state = match command {
326 VoterCommand::Pause(reason) => {
327 info!(target: LOG_TARGET, "Pausing old validator set: {}", reason);
328
329 let completed_rounds = self.persistent_data.set_state.read().completed_rounds();
330 let set_state = VoterSetState::Paused { completed_rounds };
331
332 crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
333
334 set_state
335 },
336 VoterCommand::ChangeAuthorities(new) => {
337 let set_state = VoterSetState::live(
340 new.set_id,
341 &*self.persistent_data.authority_set.inner(),
342 (new.canon_hash, new.canon_number),
343 );
344
345 crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
346
347 set_state
348 },
349 }
350 .into();
351
352 self.rebuild_observer();
353 Ok(())
354 }
355}
356
357impl<B, BE, C, N, S> Future for ObserverWork<B, BE, C, N, S>
358where
359 B: BlockT,
360 BE: Backend<B> + Unpin + 'static,
361 C: ClientForGrandpa<B, BE> + 'static,
362 N: NetworkT<B>,
363 S: SyncingT<B>,
364 NumberFor<B>: BlockNumberOps,
365{
366 type Output = Result<(), Error>;
367
368 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
369 match Future::poll(Pin::new(&mut self.observer), cx) {
370 Poll::Pending => {},
371 Poll::Ready(Ok(())) => {
372 return Poll::Ready(Ok(()))
375 },
376 Poll::Ready(Err(CommandOrError::Error(e))) => {
377 return Poll::Ready(Err(e))
379 },
380 Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
381 self.handle_voter_command(command)?;
383 cx.waker().wake_by_ref();
384 },
385 }
386
387 match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
388 Poll::Pending => {},
389 Poll::Ready(None) => {
390 return Poll::Ready(Ok(()))
392 },
393 Poll::Ready(Some(command)) => {
394 self.handle_voter_command(command)?;
396 cx.waker().wake_by_ref();
397 },
398 }
399
400 Future::poll(Pin::new(&mut self.network), cx)
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 use crate::{
409 aux_schema,
410 communication::tests::{make_test_network, Event},
411 };
412 use assert_matches::assert_matches;
413 use sc_network_types::PeerId;
414 use sc_utils::mpsc::tracing_unbounded;
415 use sp_blockchain::HeaderBackend as _;
416 use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
417
418 use futures::executor;
419
420 #[test]
429 fn observer_work_polls_underlying_network_bridge() {
430 let (tester_fut, _network) = make_test_network();
432 let mut tester = executor::block_on(tester_fut);
433
434 let (client, backend) = {
436 let builder = TestClientBuilder::with_default_backend();
437 let backend = builder.backend();
438 let (client, _) = builder.build_with_longest_chain();
439 (Arc::new(client), backend)
440 };
441
442 let voters = vec![(sp_keyring::Ed25519Keyring::Alice.public().into(), 1)];
443
444 let persistent_data =
445 aux_schema::load_persistent(&*backend, client.info().genesis_hash, 0, || Ok(voters))
446 .unwrap();
447
448 let (_tx, voter_command_rx) = tracing_unbounded("test_mpsc_voter_command", 100_000);
449
450 let observer = ObserverWork::new(
451 client,
452 tester.net_handle.clone(),
453 persistent_data,
454 None,
455 voter_command_rx,
456 None,
457 None,
458 );
459
460 let peer_id = PeerId::random();
462 tester.trigger_gossip_validator_reputation_change(&peer_id);
463
464 executor::block_on(async move {
465 assert!(observer.now_or_never().is_none());
468
469 assert_matches!(tester.events.next().now_or_never(), Some(Some(Event::Report(_, _))));
470 });
471 }
472}