sc_network_sync/service/
network.rs1use futures::{channel::oneshot, StreamExt};
20use sc_network_types::PeerId;
21
22use sc_network::{
23 request_responses::{IfDisconnected, RequestFailure},
24 types::ProtocolName,
25 NetworkPeers, NetworkRequest, ReputationChange,
26};
27use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
28
29use std::sync::Arc;
30
31pub trait Network: NetworkPeers + NetworkRequest {}
33
34impl<T> Network for T where T: NetworkPeers + NetworkRequest {}
35
36pub struct NetworkServiceProvider {
41 rx: TracingUnboundedReceiver<ToServiceCommand>,
42}
43
44pub enum ToServiceCommand {
46 DisconnectPeer(PeerId, ProtocolName),
48
49 ReportPeer(PeerId, ReputationChange),
51
52 StartRequest(
54 PeerId,
55 ProtocolName,
56 Vec<u8>,
57 oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
58 IfDisconnected,
59 ),
60}
61
62#[derive(Clone)]
65pub struct NetworkServiceHandle {
66 tx: TracingUnboundedSender<ToServiceCommand>,
67}
68
69impl NetworkServiceHandle {
70 pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle {
72 Self { tx }
73 }
74
75 pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
77 let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit));
78 }
79
80 pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
82 let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
83 }
84
85 pub fn start_request(
87 &self,
88 who: PeerId,
89 protocol: ProtocolName,
90 request: Vec<u8>,
91 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
92 connect: IfDisconnected,
93 ) {
94 let _ = self
95 .tx
96 .unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect));
97 }
98}
99
100impl NetworkServiceProvider {
101 pub fn new() -> (Self, NetworkServiceHandle) {
103 let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);
104
105 (Self { rx }, NetworkServiceHandle::new(tx))
106 }
107
108 pub async fn run(mut self, service: Arc<dyn Network + Send + Sync>) {
110 while let Some(inner) = self.rx.next().await {
111 match inner {
112 ToServiceCommand::DisconnectPeer(peer, protocol_name) =>
113 service.disconnect_peer(peer, protocol_name),
114 ToServiceCommand::ReportPeer(peer, reputation_change) =>
115 service.report_peer(peer, reputation_change),
116 ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) =>
117 service.start_request(peer, protocol, request, None, tx, connect),
118 }
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::service::mock::MockNetwork;
127
128 #[tokio::test]
131 async fn disconnect_and_report_peer() {
132 let (provider, handle) = NetworkServiceProvider::new();
133
134 let peer = PeerId::random();
135 let proto = ProtocolName::from("test-protocol");
136 let proto_clone = proto.clone();
137 let change = sc_network::ReputationChange::new_fatal("test-change");
138
139 let mut mock_network = MockNetwork::new();
140 mock_network
141 .expect_disconnect_peer()
142 .withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto)
143 .once()
144 .returning(|_, _| ());
145 mock_network
146 .expect_report_peer()
147 .withf(move |in_peer, in_change| &peer == in_peer && &change == in_change)
148 .once()
149 .returning(|_, _| ());
150
151 tokio::spawn(async move {
152 provider.run(Arc::new(mock_network)).await;
153 });
154
155 handle.disconnect_peer(peer, proto_clone);
156 handle.report_peer(peer, change);
157 }
158}