sc_consensus_grandpa/communication/
periodic.rs1use futures::{future::FutureExt as _, prelude::*, ready, stream::Stream};
22use futures_timer::Delay;
23use log::debug;
24use std::{
25 pin::Pin,
26 task::{Context, Poll},
27 time::Duration,
28};
29
30use sc_network_types::PeerId;
31use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
32use sp_runtime::traits::{Block as BlockT, NumberFor};
33
34use super::gossip::{GossipMessage, NeighborPacket};
35use crate::LOG_TARGET;
36
37#[derive(Clone)]
39pub(super) struct NeighborPacketSender<B: BlockT>(
40 TracingUnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
41);
42
43impl<B: BlockT> NeighborPacketSender<B> {
44 pub fn send(
46 &self,
47 who: Vec<sc_network_types::PeerId>,
48 neighbor_packet: NeighborPacket<NumberFor<B>>,
49 ) {
50 if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) {
51 debug!(target: LOG_TARGET, "Failed to send neighbor packet: {:?}", err);
52 }
53 }
54}
55
56pub(super) struct NeighborPacketWorker<B: BlockT> {
61 last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
62 rebroadcast_period: Duration,
63 delay: Delay,
64 rx: TracingUnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
65}
66
67impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}
68
69impl<B: BlockT> NeighborPacketWorker<B> {
70 pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender<B>) {
71 let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>(
72 "mpsc_grandpa_neighbor_packet_worker",
73 100_000,
74 );
75 let delay = Delay::new(rebroadcast_period);
76
77 (
78 NeighborPacketWorker { last: None, rebroadcast_period, delay, rx },
79 NeighborPacketSender(tx),
80 )
81 }
82}
83
84impl<B: BlockT> Stream for NeighborPacketWorker<B> {
85 type Item = (Vec<PeerId>, GossipMessage<B>);
86
87 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
88 let this = &mut *self;
89 match this.rx.poll_next_unpin(cx) {
90 Poll::Ready(None) => return Poll::Ready(None),
91 Poll::Ready(Some((to, packet))) => {
92 this.delay.reset(this.rebroadcast_period);
93 this.last = Some((to.clone(), packet.clone()));
94
95 return Poll::Ready(Some((to, GossipMessage::<B>::from(packet))))
96 },
97 Poll::Pending => {},
99 };
100
101 ready!(this.delay.poll_unpin(cx));
102
103 this.delay.reset(this.rebroadcast_period);
106
107 while this.delay.poll_unpin(cx).is_ready() {}
112
113 if let Some((ref to, ref packet)) = this.last {
114 return Poll::Ready(Some((to.clone(), GossipMessage::<B>::from(packet.clone()))))
115 }
116
117 Poll::Pending
118 }
119}