referrerpolicy=no-referrer-when-downgrade

sc_consensus_grandpa/communication/
periodic.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Periodic rebroadcast of neighbor packets.
20
21use futures::{future::FutureExt as _, prelude::*, ready, stream::Stream};
22use futures_timer::Delay;
23use log::debug;
24use std::{
25	pin::Pin,
26	task::{Context, Poll},
27	time::Duration,
28};
29
30use sc_network_types::PeerId;
31use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
32use sp_runtime::traits::{Block as BlockT, NumberFor};
33
34use super::gossip::{GossipMessage, NeighborPacket};
35use crate::LOG_TARGET;
36
37/// A sender used to send neighbor packets to a background job.
38#[derive(Clone)]
39pub(super) struct NeighborPacketSender<B: BlockT>(
40	TracingUnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
41);
42
43impl<B: BlockT> NeighborPacketSender<B> {
44	/// Send a neighbor packet for the background worker to gossip to peers.
45	pub fn send(
46		&self,
47		who: Vec<sc_network_types::PeerId>,
48		neighbor_packet: NeighborPacket<NumberFor<B>>,
49	) {
50		if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) {
51			debug!(target: LOG_TARGET, "Failed to send neighbor packet: {:?}", err);
52		}
53	}
54}
55
56/// NeighborPacketWorker is listening on a channel for new neighbor packets being produced by
57/// components within `finality-grandpa` and forwards those packets to the underlying
58/// `NetworkEngine` through the `NetworkBridge` that it is being polled by (see `Stream`
59/// implementation). Periodically it sends out the last packet in cases where no new ones arrive.
60pub(super) struct NeighborPacketWorker<B: BlockT> {
61	last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
62	rebroadcast_period: Duration,
63	delay: Delay,
64	rx: TracingUnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
65}
66
67impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}
68
69impl<B: BlockT> NeighborPacketWorker<B> {
70	pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender<B>) {
71		let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>(
72			"mpsc_grandpa_neighbor_packet_worker",
73			100_000,
74		);
75		let delay = Delay::new(rebroadcast_period);
76
77		(
78			NeighborPacketWorker { last: None, rebroadcast_period, delay, rx },
79			NeighborPacketSender(tx),
80		)
81	}
82}
83
84impl<B: BlockT> Stream for NeighborPacketWorker<B> {
85	type Item = (Vec<PeerId>, GossipMessage<B>);
86
87	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
88		let this = &mut *self;
89		match this.rx.poll_next_unpin(cx) {
90			Poll::Ready(None) => return Poll::Ready(None),
91			Poll::Ready(Some((to, packet))) => {
92				this.delay.reset(this.rebroadcast_period);
93				this.last = Some((to.clone(), packet.clone()));
94
95				return Poll::Ready(Some((to, GossipMessage::<B>::from(packet))))
96			},
97			// Don't return yet, maybe the timer fired.
98			Poll::Pending => {},
99		};
100
101		ready!(this.delay.poll_unpin(cx));
102
103		// Getting this far here implies that the timer fired.
104
105		this.delay.reset(this.rebroadcast_period);
106
107		// Make sure the underlying task is scheduled for wake-up.
108		//
109		// Note: In case poll_unpin is called after the reset delay fires again, this
110		// will drop one tick. Deemed as very unlikely and also not critical.
111		while this.delay.poll_unpin(cx).is_ready() {}
112
113		if let Some((ref to, ref packet)) = this.last {
114			return Poll::Ready(Some((to.clone(), GossipMessage::<B>::from(packet.clone()))))
115		}
116
117		Poll::Pending
118	}
119}