sc_network/
behaviour.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
21	event::DhtEvent,
22	peer_info,
23	peer_store::PeerStoreProvider,
24	protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
25	protocol_controller::SetId,
26	request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure},
27	service::traits::Direction,
28	types::ProtocolName,
29	ReputationChange,
30};
31
32use futures::channel::oneshot;
33use libp2p::{
34	connection_limits::ConnectionLimits,
35	core::Multiaddr,
36	identify::Info as IdentifyInfo,
37	identity::PublicKey,
38	kad::{Record, RecordKey},
39	swarm::NetworkBehaviour,
40	PeerId, StreamProtocol,
41};
42
43use parking_lot::Mutex;
44use sp_runtime::traits::Block as BlockT;
45use std::{
46	collections::HashSet,
47	sync::Arc,
48	time::{Duration, Instant},
49};
50
51pub use crate::request_responses::{InboundFailure, OutboundFailure, ResponseFailure};
52
53/// General behaviour of the network. Combines all protocols together.
54#[derive(NetworkBehaviour)]
55#[behaviour(to_swarm = "BehaviourOut")]
56pub struct Behaviour<B: BlockT> {
57	/// Connection limits.
58	connection_limits: libp2p::connection_limits::Behaviour,
59	/// All the substrate-specific protocols.
60	substrate: Protocol<B>,
61	/// Periodically pings and identifies the nodes we are connected to, and store information in a
62	/// cache.
63	peer_info: peer_info::PeerInfoBehaviour,
64	/// Discovers nodes of the network.
65	discovery: DiscoveryBehaviour,
66	/// Generic request-response protocols.
67	request_responses: request_responses::RequestResponsesBehaviour,
68}
69
70/// Event generated by `Behaviour`.
71pub enum BehaviourOut {
72	/// Started a random iterative Kademlia discovery query.
73	RandomKademliaStarted,
74
75	/// We have received a request from a peer and answered it.
76	///
77	/// This event is generated for statistics purposes.
78	InboundRequest {
79		/// Protocol name of the request.
80		protocol: ProtocolName,
81		/// If `Ok`, contains the time elapsed between when we received the request and when we
82		/// sent back the response. If `Err`, the error that happened.
83		result: Result<Duration, ResponseFailure>,
84	},
85
86	/// A request has succeeded or failed.
87	///
88	/// This event is generated for statistics purposes.
89	RequestFinished {
90		/// Name of the protocol in question.
91		protocol: ProtocolName,
92		/// Duration the request took.
93		duration: Duration,
94		/// Result of the request.
95		result: Result<(), RequestFailure>,
96	},
97
98	/// A request protocol handler issued reputation changes for the given peer.
99	ReputationChanges { peer: PeerId, changes: Vec<ReputationChange> },
100
101	/// Opened a substream with the given node with the given notifications protocol.
102	///
103	/// The protocol is always one of the notification protocols that have been registered.
104	NotificationStreamOpened {
105		/// Node we opened the substream with.
106		remote: PeerId,
107		/// Set ID.
108		set_id: SetId,
109		/// Direction of the stream.
110		direction: Direction,
111		/// If the negotiation didn't use the main name of the protocol (the one in
112		/// `notifications_protocol`), then this field contains which name has actually been
113		/// used.
114		/// See also [`crate::Event::NotificationStreamOpened`].
115		negotiated_fallback: Option<ProtocolName>,
116		/// Object that permits sending notifications to the peer.
117		notifications_sink: NotificationsSink,
118		/// Received handshake.
119		received_handshake: Vec<u8>,
120	},
121
122	/// The [`NotificationsSink`] object used to send notifications with the given peer must be
123	/// replaced with a new one.
124	///
125	/// This event is typically emitted when a transport-level connection is closed and we fall
126	/// back to a secondary connection.
127	NotificationStreamReplaced {
128		/// Id of the peer we are connected to.
129		remote: PeerId,
130		/// Set ID.
131		set_id: SetId,
132		/// Replacement for the previous [`NotificationsSink`].
133		notifications_sink: NotificationsSink,
134	},
135
136	/// Closed a substream with the given node. Always matches a corresponding previous
137	/// `NotificationStreamOpened` message.
138	NotificationStreamClosed {
139		/// Node we closed the substream with.
140		remote: PeerId,
141		/// Set ID.
142		set_id: SetId,
143	},
144
145	/// Received one or more messages from the given node using the given protocol.
146	NotificationsReceived {
147		/// Node we received the message from.
148		remote: PeerId,
149		/// Set ID.
150		set_id: SetId,
151		/// Concerned protocol and associated message.
152		notification: Vec<u8>,
153	},
154
155	/// We have obtained identity information from a peer, including the addresses it is listening
156	/// on.
157	PeerIdentify {
158		/// Id of the peer that has been identified.
159		peer_id: PeerId,
160		/// Information about the peer.
161		info: IdentifyInfo,
162	},
163
164	/// We have learned about the existence of a node on the default set.
165	Discovered(PeerId),
166
167	/// Events generated by a DHT as a response to get_value or put_value requests with the
168	/// request duration. Or events generated by the DHT as a consequnce of receiving a record
169	/// to store from peers.
170	Dht(DhtEvent, Option<Duration>),
171
172	/// Ignored event generated by lower layers.
173	None,
174}
175
176impl<B: BlockT> Behaviour<B> {
177	/// Builds a new `Behaviour`.
178	pub fn new(
179		substrate: Protocol<B>,
180		user_agent: String,
181		local_public_key: PublicKey,
182		disco_config: DiscoveryConfig,
183		request_response_protocols: Vec<ProtocolConfig>,
184		peer_store_handle: Arc<dyn PeerStoreProvider>,
185		external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
186		connection_limits: ConnectionLimits,
187	) -> Result<Self, request_responses::RegisterError> {
188		Ok(Self {
189			substrate,
190			peer_info: peer_info::PeerInfoBehaviour::new(
191				user_agent,
192				local_public_key,
193				external_addresses,
194			),
195			discovery: disco_config.finish(),
196			request_responses: request_responses::RequestResponsesBehaviour::new(
197				request_response_protocols.into_iter(),
198				peer_store_handle,
199			)?,
200			connection_limits: libp2p::connection_limits::Behaviour::new(connection_limits),
201		})
202	}
203
204	/// Returns the list of nodes that we know exist in the network.
205	pub fn known_peers(&mut self) -> HashSet<PeerId> {
206		self.discovery.known_peers()
207	}
208
209	/// Adds a hard-coded address for the given peer, that never expires.
210	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
211		self.discovery.add_known_address(peer_id, addr)
212	}
213
214	/// Returns the number of nodes in each Kademlia kbucket.
215	///
216	/// Identifies kbuckets by the base 2 logarithm of their lower bound.
217	pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
218		self.discovery.num_entries_per_kbucket()
219	}
220
221	/// Returns the number of records in the Kademlia record stores.
222	pub fn num_kademlia_records(&mut self) -> Option<usize> {
223		self.discovery.num_kademlia_records()
224	}
225
226	/// Returns the total size in bytes of all the records in the Kademlia record stores.
227	pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
228		self.discovery.kademlia_records_total_size()
229	}
230
231	/// Borrows `self` and returns a struct giving access to the information about a node.
232	///
233	/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
234	/// we're connected to, meaning that if `None` is returned then we're not connected to that
235	/// node.
236	pub fn node(&self, peer_id: &PeerId) -> Option<peer_info::Node> {
237		self.peer_info.node(peer_id)
238	}
239
240	/// Initiates sending a request.
241	pub fn send_request(
242		&mut self,
243		target: &PeerId,
244		protocol: ProtocolName,
245		request: Vec<u8>,
246		fallback_request: Option<(Vec<u8>, ProtocolName)>,
247		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
248		connect: IfDisconnected,
249	) {
250		self.request_responses.send_request(
251			target,
252			protocol,
253			request,
254			fallback_request,
255			pending_response,
256			connect,
257		)
258	}
259
260	/// Returns a shared reference to the user protocol.
261	pub fn user_protocol(&self) -> &Protocol<B> {
262		&self.substrate
263	}
264
265	/// Returns a mutable reference to the user protocol.
266	pub fn user_protocol_mut(&mut self) -> &mut Protocol<B> {
267		&mut self.substrate
268	}
269
270	/// Add a self-reported address of a remote peer to the k-buckets of the supported
271	/// DHTs (`supported_protocols`).
272	pub fn add_self_reported_address_to_dht(
273		&mut self,
274		peer_id: &PeerId,
275		supported_protocols: &[StreamProtocol],
276		addr: Multiaddr,
277	) {
278		self.discovery.add_self_reported_address(peer_id, supported_protocols, addr);
279	}
280
281	/// Start querying a record from the DHT. Will later produce either a `ValueFound` or a
282	/// `ValueNotFound` event.
283	pub fn get_value(&mut self, key: RecordKey) {
284		self.discovery.get_value(key);
285	}
286
287	/// Starts putting a record into DHT. Will later produce either a `ValuePut` or a
288	/// `ValuePutFailed` event.
289	pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
290		self.discovery.put_value(key, value);
291	}
292
293	/// Puts a record into DHT, on the provided Peers
294	pub fn put_record_to(
295		&mut self,
296		record: Record,
297		peers: HashSet<sc_network_types::PeerId>,
298		update_local_storage: bool,
299	) {
300		self.discovery.put_record_to(record, peers, update_local_storage);
301	}
302
303	/// Stores value in DHT
304	pub fn store_record(
305		&mut self,
306		record_key: RecordKey,
307		record_value: Vec<u8>,
308		publisher: Option<PeerId>,
309		expires: Option<Instant>,
310	) {
311		self.discovery.store_record(record_key, record_value, publisher, expires);
312	}
313}
314
315impl From<CustomMessageOutcome> for BehaviourOut {
316	fn from(event: CustomMessageOutcome) -> Self {
317		match event {
318			CustomMessageOutcome::NotificationStreamOpened {
319				remote,
320				set_id,
321				direction,
322				negotiated_fallback,
323				received_handshake,
324				notifications_sink,
325			} => BehaviourOut::NotificationStreamOpened {
326				remote,
327				set_id,
328				direction,
329				negotiated_fallback,
330				received_handshake,
331				notifications_sink,
332			},
333			CustomMessageOutcome::NotificationStreamReplaced {
334				remote,
335				set_id,
336				notifications_sink,
337			} => BehaviourOut::NotificationStreamReplaced { remote, set_id, notifications_sink },
338			CustomMessageOutcome::NotificationStreamClosed { remote, set_id } =>
339				BehaviourOut::NotificationStreamClosed { remote, set_id },
340			CustomMessageOutcome::NotificationsReceived { remote, set_id, notification } =>
341				BehaviourOut::NotificationsReceived { remote, set_id, notification },
342		}
343	}
344}
345
346impl From<request_responses::Event> for BehaviourOut {
347	fn from(event: request_responses::Event) -> Self {
348		match event {
349			request_responses::Event::InboundRequest { protocol, result, .. } =>
350				BehaviourOut::InboundRequest { protocol, result },
351			request_responses::Event::RequestFinished { protocol, duration, result, .. } =>
352				BehaviourOut::RequestFinished { protocol, duration, result },
353			request_responses::Event::ReputationChanges { peer, changes } =>
354				BehaviourOut::ReputationChanges { peer, changes },
355		}
356	}
357}
358
359impl From<peer_info::PeerInfoEvent> for BehaviourOut {
360	fn from(event: peer_info::PeerInfoEvent) -> Self {
361		let peer_info::PeerInfoEvent::Identified { peer_id, info } = event;
362		BehaviourOut::PeerIdentify { peer_id, info }
363	}
364}
365
366impl From<DiscoveryOut> for BehaviourOut {
367	fn from(event: DiscoveryOut) -> Self {
368		match event {
369			DiscoveryOut::UnroutablePeer(_peer_id) => {
370				// Obtaining and reporting listen addresses for unroutable peers back
371				// to Kademlia is handled by the `Identify` protocol, part of the
372				// `PeerInfoBehaviour`. See the `From<peer_info::PeerInfoEvent>`
373				// implementation.
374				BehaviourOut::None
375			},
376			DiscoveryOut::Discovered(peer_id) => BehaviourOut::Discovered(peer_id),
377			DiscoveryOut::ValueFound(results, duration) =>
378				BehaviourOut::Dht(DhtEvent::ValueFound(results), Some(duration)),
379			DiscoveryOut::ValueNotFound(key, duration) =>
380				BehaviourOut::Dht(DhtEvent::ValueNotFound(key), Some(duration)),
381			DiscoveryOut::ValuePut(key, duration) =>
382				BehaviourOut::Dht(DhtEvent::ValuePut(key), Some(duration)),
383			DiscoveryOut::PutRecordRequest(record_key, record_value, publisher, expires) =>
384				BehaviourOut::Dht(
385					DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires),
386					None,
387				),
388			DiscoveryOut::ValuePutFailed(key, duration) =>
389				BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), Some(duration)),
390			DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
391		}
392	}
393}
394
395impl From<void::Void> for BehaviourOut {
396	fn from(e: void::Void) -> Self {
397		void::unreachable(e)
398	}
399}