sc_network_sync/service/
network.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use futures::{channel::oneshot, StreamExt};
20use sc_network_types::PeerId;
21
22use sc_network::{
23	request_responses::{IfDisconnected, RequestFailure},
24	types::ProtocolName,
25	NetworkPeers, NetworkRequest, ReputationChange,
26};
27use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
28
29use std::sync::Arc;
30
31/// Network-related services required by `sc-network-sync`
32pub trait Network: NetworkPeers + NetworkRequest {}
33
34impl<T> Network for T where T: NetworkPeers + NetworkRequest {}
35
36/// Network service provider for `ChainSync`
37///
38/// It runs as an asynchronous task and listens to commands coming from `ChainSync` and
39/// calls the `NetworkService` on its behalf.
40pub struct NetworkServiceProvider {
41	rx: TracingUnboundedReceiver<ToServiceCommand>,
42}
43
44/// Commands that `ChainSync` wishes to send to `NetworkService`
45pub enum ToServiceCommand {
46	/// Call `NetworkPeers::disconnect_peer()`
47	DisconnectPeer(PeerId, ProtocolName),
48
49	/// Call `NetworkPeers::report_peer()`
50	ReportPeer(PeerId, ReputationChange),
51
52	/// Call `NetworkRequest::start_request()`
53	StartRequest(
54		PeerId,
55		ProtocolName,
56		Vec<u8>,
57		oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
58		IfDisconnected,
59	),
60}
61
62/// Handle that is (temporarily) passed to `ChainSync` so it can
63/// communicate with `NetworkService` through `SyncingEngine`
64#[derive(Clone)]
65pub struct NetworkServiceHandle {
66	tx: TracingUnboundedSender<ToServiceCommand>,
67}
68
69impl NetworkServiceHandle {
70	/// Create new service handle
71	pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle {
72		Self { tx }
73	}
74
75	/// Report peer
76	pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
77		let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit));
78	}
79
80	/// Disconnect peer
81	pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
82		let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
83	}
84
85	/// Send request to peer
86	pub fn start_request(
87		&self,
88		who: PeerId,
89		protocol: ProtocolName,
90		request: Vec<u8>,
91		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
92		connect: IfDisconnected,
93	) {
94		let _ = self
95			.tx
96			.unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect));
97	}
98}
99
100impl NetworkServiceProvider {
101	/// Create new `NetworkServiceProvider`
102	pub fn new() -> (Self, NetworkServiceHandle) {
103		let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);
104
105		(Self { rx }, NetworkServiceHandle::new(tx))
106	}
107
108	/// Run the `NetworkServiceProvider`
109	pub async fn run(mut self, service: Arc<dyn Network + Send + Sync>) {
110		while let Some(inner) = self.rx.next().await {
111			match inner {
112				ToServiceCommand::DisconnectPeer(peer, protocol_name) =>
113					service.disconnect_peer(peer, protocol_name),
114				ToServiceCommand::ReportPeer(peer, reputation_change) =>
115					service.report_peer(peer, reputation_change),
116				ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) =>
117					service.start_request(peer, protocol, request, None, tx, connect),
118			}
119		}
120	}
121}
122
123#[cfg(test)]
124mod tests {
125	use super::*;
126	use crate::service::mock::MockNetwork;
127
128	// typical pattern in `Protocol` code where peer is disconnected
129	// and then reported
130	#[tokio::test]
131	async fn disconnect_and_report_peer() {
132		let (provider, handle) = NetworkServiceProvider::new();
133
134		let peer = PeerId::random();
135		let proto = ProtocolName::from("test-protocol");
136		let proto_clone = proto.clone();
137		let change = sc_network::ReputationChange::new_fatal("test-change");
138
139		let mut mock_network = MockNetwork::new();
140		mock_network
141			.expect_disconnect_peer()
142			.withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto)
143			.once()
144			.returning(|_, _| ());
145		mock_network
146			.expect_report_peer()
147			.withf(move |in_peer, in_change| &peer == in_peer && &change == in_change)
148			.once()
149			.returning(|_, _| ());
150
151		tokio::spawn(async move {
152			provider.run(Arc::new(mock_network)).await;
153		});
154
155		handle.disconnect_peer(peer, proto_clone);
156		handle.report_peer(peer, change);
157	}
158}