referrerpolicy=no-referrer-when-downgrade

sc_network/
behaviour.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
21	event::DhtEvent,
22	peer_info,
23	peer_store::PeerStoreProvider,
24	protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
25	protocol_controller::SetId,
26	request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure},
27	service::traits::Direction,
28	types::ProtocolName,
29	ReputationChange,
30};
31
32use futures::channel::oneshot;
33use libp2p::{
34	connection_limits::ConnectionLimits,
35	core::Multiaddr,
36	identify::Info as IdentifyInfo,
37	identity::PublicKey,
38	kad::{Record, RecordKey},
39	swarm::NetworkBehaviour,
40	PeerId, StreamProtocol,
41};
42
43use parking_lot::Mutex;
44use sp_runtime::traits::Block as BlockT;
45use std::{
46	collections::HashSet,
47	sync::Arc,
48	time::{Duration, Instant},
49};
50
51pub use crate::request_responses::{InboundFailure, OutboundFailure, ResponseFailure};
52
53/// General behaviour of the network. Combines all protocols together.
54#[derive(NetworkBehaviour)]
55#[behaviour(to_swarm = "BehaviourOut")]
56pub struct Behaviour<B: BlockT> {
57	/// Connection limits.
58	connection_limits: libp2p::connection_limits::Behaviour,
59	/// All the substrate-specific protocols.
60	substrate: Protocol<B>,
61	/// Periodically pings and identifies the nodes we are connected to, and store information in a
62	/// cache.
63	peer_info: peer_info::PeerInfoBehaviour,
64	/// Discovers nodes of the network.
65	discovery: DiscoveryBehaviour,
66	/// Generic request-response protocols.
67	request_responses: request_responses::RequestResponsesBehaviour,
68}
69
70/// Event generated by `Behaviour`.
71#[derive(Debug)]
72pub enum BehaviourOut {
73	/// Started a random iterative Kademlia discovery query.
74	RandomKademliaStarted,
75
76	/// We have received a request from a peer and answered it.
77	///
78	/// This event is generated for statistics purposes.
79	InboundRequest {
80		/// Protocol name of the request.
81		protocol: ProtocolName,
82		/// If `Ok`, contains the time elapsed between when we received the request and when we
83		/// sent back the response. If `Err`, the error that happened.
84		result: Result<Duration, ResponseFailure>,
85	},
86
87	/// A request has succeeded or failed.
88	///
89	/// This event is generated for statistics purposes.
90	RequestFinished {
91		/// Name of the protocol in question.
92		protocol: ProtocolName,
93		/// Duration the request took.
94		duration: Duration,
95		/// Result of the request.
96		result: Result<(), RequestFailure>,
97	},
98
99	/// A request protocol handler issued reputation changes for the given peer.
100	ReputationChanges { peer: PeerId, changes: Vec<ReputationChange> },
101
102	/// Opened a substream with the given node with the given notifications protocol.
103	///
104	/// The protocol is always one of the notification protocols that have been registered.
105	NotificationStreamOpened {
106		/// Node we opened the substream with.
107		remote: PeerId,
108		/// Set ID.
109		set_id: SetId,
110		/// Direction of the stream.
111		direction: Direction,
112		/// If the negotiation didn't use the main name of the protocol (the one in
113		/// `notifications_protocol`), then this field contains which name has actually been
114		/// used.
115		/// See also [`crate::Event::NotificationStreamOpened`].
116		negotiated_fallback: Option<ProtocolName>,
117		/// Object that permits sending notifications to the peer.
118		notifications_sink: NotificationsSink,
119		/// Received handshake.
120		received_handshake: Vec<u8>,
121	},
122
123	/// The [`NotificationsSink`] object used to send notifications with the given peer must be
124	/// replaced with a new one.
125	///
126	/// This event is typically emitted when a transport-level connection is closed and we fall
127	/// back to a secondary connection.
128	NotificationStreamReplaced {
129		/// Id of the peer we are connected to.
130		remote: PeerId,
131		/// Set ID.
132		set_id: SetId,
133		/// Replacement for the previous [`NotificationsSink`].
134		notifications_sink: NotificationsSink,
135	},
136
137	/// Closed a substream with the given node. Always matches a corresponding previous
138	/// `NotificationStreamOpened` message.
139	NotificationStreamClosed {
140		/// Node we closed the substream with.
141		remote: PeerId,
142		/// Set ID.
143		set_id: SetId,
144	},
145
146	/// Received one or more messages from the given node using the given protocol.
147	NotificationsReceived {
148		/// Node we received the message from.
149		remote: PeerId,
150		/// Set ID.
151		set_id: SetId,
152		/// Concerned protocol and associated message.
153		notification: Vec<u8>,
154	},
155
156	/// We have obtained identity information from a peer, including the addresses it is listening
157	/// on.
158	PeerIdentify {
159		/// Id of the peer that has been identified.
160		peer_id: PeerId,
161		/// Information about the peer.
162		info: IdentifyInfo,
163	},
164
165	/// We have learned about the existence of a node on the default set.
166	Discovered(PeerId),
167
168	/// Events generated by a DHT as a response to get_value or put_value requests with the
169	/// request duration. Or events generated by the DHT as a consequnce of receiving a record
170	/// to store from peers.
171	Dht(DhtEvent, Option<Duration>),
172
173	/// Ignored event generated by lower layers.
174	None,
175}
176
177impl<B: BlockT> Behaviour<B> {
178	/// Builds a new `Behaviour`.
179	pub fn new(
180		substrate: Protocol<B>,
181		user_agent: String,
182		local_public_key: PublicKey,
183		disco_config: DiscoveryConfig,
184		request_response_protocols: Vec<ProtocolConfig>,
185		peer_store_handle: Arc<dyn PeerStoreProvider>,
186		external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
187		public_addresses: Vec<Multiaddr>,
188		connection_limits: ConnectionLimits,
189	) -> Result<Self, request_responses::RegisterError> {
190		Ok(Self {
191			substrate,
192			peer_info: peer_info::PeerInfoBehaviour::new(
193				user_agent,
194				local_public_key,
195				external_addresses,
196				public_addresses,
197			),
198			discovery: disco_config.finish(),
199			request_responses: request_responses::RequestResponsesBehaviour::new(
200				request_response_protocols.into_iter(),
201				peer_store_handle,
202			)?,
203			connection_limits: libp2p::connection_limits::Behaviour::new(connection_limits),
204		})
205	}
206
207	/// Returns the list of nodes that we know exist in the network.
208	pub fn known_peers(&mut self) -> HashSet<PeerId> {
209		self.discovery.known_peers()
210	}
211
212	/// Adds a hard-coded address for the given peer, that never expires.
213	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
214		self.discovery.add_known_address(peer_id, addr)
215	}
216
217	/// Returns the number of nodes in each Kademlia kbucket.
218	///
219	/// Identifies kbuckets by the base 2 logarithm of their lower bound.
220	pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
221		self.discovery.num_entries_per_kbucket()
222	}
223
224	/// Returns the number of records in the Kademlia record stores.
225	pub fn num_kademlia_records(&mut self) -> Option<usize> {
226		self.discovery.num_kademlia_records()
227	}
228
229	/// Returns the total size in bytes of all the records in the Kademlia record stores.
230	pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
231		self.discovery.kademlia_records_total_size()
232	}
233
234	/// Borrows `self` and returns a struct giving access to the information about a node.
235	///
236	/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
237	/// we're connected to, meaning that if `None` is returned then we're not connected to that
238	/// node.
239	pub fn node(&self, peer_id: &PeerId) -> Option<peer_info::Node> {
240		self.peer_info.node(peer_id)
241	}
242
243	/// Initiates sending a request.
244	pub fn send_request(
245		&mut self,
246		target: &PeerId,
247		protocol: ProtocolName,
248		request: Vec<u8>,
249		fallback_request: Option<(Vec<u8>, ProtocolName)>,
250		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
251		connect: IfDisconnected,
252	) {
253		self.request_responses.send_request(
254			target,
255			protocol,
256			request,
257			fallback_request,
258			pending_response,
259			connect,
260		)
261	}
262
263	/// Returns a shared reference to the user protocol.
264	pub fn user_protocol(&self) -> &Protocol<B> {
265		&self.substrate
266	}
267
268	/// Returns a mutable reference to the user protocol.
269	pub fn user_protocol_mut(&mut self) -> &mut Protocol<B> {
270		&mut self.substrate
271	}
272
273	/// Add a self-reported address of a remote peer to the k-buckets of the supported
274	/// DHTs (`supported_protocols`).
275	pub fn add_self_reported_address_to_dht(
276		&mut self,
277		peer_id: &PeerId,
278		supported_protocols: &[StreamProtocol],
279		addr: Multiaddr,
280	) {
281		self.discovery.add_self_reported_address(peer_id, supported_protocols, addr);
282	}
283
284	/// Start finding closest peerst to the target `PeerId`. Will later produce either a
285	/// `ClosestPeersFound` or `ClosestPeersNotFound` event.
286	pub fn find_closest_peers(&mut self, target: PeerId) {
287		self.discovery.find_closest_peers(target);
288	}
289
290	/// Start querying a record from the DHT. Will later produce either a `ValueFound` or a
291	/// `ValueNotFound` event.
292	pub fn get_value(&mut self, key: RecordKey) {
293		self.discovery.get_value(key);
294	}
295
296	/// Starts putting a record into DHT. Will later produce either a `ValuePut` or a
297	/// `ValuePutFailed` event.
298	pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
299		self.discovery.put_value(key, value);
300	}
301
302	/// Puts a record into DHT, on the provided Peers
303	pub fn put_record_to(
304		&mut self,
305		record: Record,
306		peers: HashSet<sc_network_types::PeerId>,
307		update_local_storage: bool,
308	) {
309		self.discovery.put_record_to(record, peers, update_local_storage);
310	}
311
312	/// Stores value in DHT
313	pub fn store_record(
314		&mut self,
315		record_key: RecordKey,
316		record_value: Vec<u8>,
317		publisher: Option<PeerId>,
318		expires: Option<Instant>,
319	) {
320		self.discovery.store_record(record_key, record_value, publisher, expires);
321	}
322
323	/// Start providing `key` on the DHT.
324	pub fn start_providing(&mut self, key: RecordKey) {
325		self.discovery.start_providing(key)
326	}
327
328	/// Stop providing `key` on the DHT.
329	pub fn stop_providing(&mut self, key: &RecordKey) {
330		self.discovery.stop_providing(key)
331	}
332
333	/// Start searching for providers on the DHT. Will later produce either a `ProvidersFound`
334	/// or `ProvidersNotFound` event.
335	pub fn get_providers(&mut self, key: RecordKey) {
336		self.discovery.get_providers(key)
337	}
338}
339
340impl From<CustomMessageOutcome> for BehaviourOut {
341	fn from(event: CustomMessageOutcome) -> Self {
342		match event {
343			CustomMessageOutcome::NotificationStreamOpened {
344				remote,
345				set_id,
346				direction,
347				negotiated_fallback,
348				received_handshake,
349				notifications_sink,
350			} => BehaviourOut::NotificationStreamOpened {
351				remote,
352				set_id,
353				direction,
354				negotiated_fallback,
355				received_handshake,
356				notifications_sink,
357			},
358			CustomMessageOutcome::NotificationStreamReplaced {
359				remote,
360				set_id,
361				notifications_sink,
362			} => BehaviourOut::NotificationStreamReplaced { remote, set_id, notifications_sink },
363			CustomMessageOutcome::NotificationStreamClosed { remote, set_id } =>
364				BehaviourOut::NotificationStreamClosed { remote, set_id },
365			CustomMessageOutcome::NotificationsReceived { remote, set_id, notification } =>
366				BehaviourOut::NotificationsReceived { remote, set_id, notification },
367		}
368	}
369}
370
371impl From<request_responses::Event> for BehaviourOut {
372	fn from(event: request_responses::Event) -> Self {
373		match event {
374			request_responses::Event::InboundRequest { protocol, result, .. } =>
375				BehaviourOut::InboundRequest { protocol, result },
376			request_responses::Event::RequestFinished { protocol, duration, result, .. } =>
377				BehaviourOut::RequestFinished { protocol, duration, result },
378			request_responses::Event::ReputationChanges { peer, changes } =>
379				BehaviourOut::ReputationChanges { peer, changes },
380		}
381	}
382}
383
384impl From<peer_info::PeerInfoEvent> for BehaviourOut {
385	fn from(event: peer_info::PeerInfoEvent) -> Self {
386		let peer_info::PeerInfoEvent::Identified { peer_id, info } = event;
387		BehaviourOut::PeerIdentify { peer_id, info }
388	}
389}
390
391impl From<DiscoveryOut> for BehaviourOut {
392	fn from(event: DiscoveryOut) -> Self {
393		match event {
394			DiscoveryOut::UnroutablePeer(_peer_id) => {
395				// Obtaining and reporting listen addresses for unroutable peers back
396				// to Kademlia is handled by the `Identify` protocol, part of the
397				// `PeerInfoBehaviour`. See the `From<peer_info::PeerInfoEvent>`
398				// implementation.
399				BehaviourOut::None
400			},
401			DiscoveryOut::Discovered(peer_id) => BehaviourOut::Discovered(peer_id),
402			DiscoveryOut::ClosestPeersFound(target, peers, duration) => BehaviourOut::Dht(
403				DhtEvent::ClosestPeersFound(
404					target.into(),
405					peers
406						.into_iter()
407						.map(|(p, addrs)| (p.into(), addrs.into_iter().map(Into::into).collect()))
408						.collect(),
409				),
410				Some(duration),
411			),
412			DiscoveryOut::ClosestPeersNotFound(target, duration) =>
413				BehaviourOut::Dht(DhtEvent::ClosestPeersNotFound(target.into()), Some(duration)),
414			DiscoveryOut::ValueFound(results, duration) =>
415				BehaviourOut::Dht(DhtEvent::ValueFound(results.into()), Some(duration)),
416			DiscoveryOut::ValueNotFound(key, duration) =>
417				BehaviourOut::Dht(DhtEvent::ValueNotFound(key.into()), Some(duration)),
418			DiscoveryOut::ValuePut(key, duration) =>
419				BehaviourOut::Dht(DhtEvent::ValuePut(key.into()), Some(duration)),
420			DiscoveryOut::PutRecordRequest(record_key, record_value, publisher, expires) =>
421				BehaviourOut::Dht(
422					DhtEvent::PutRecordRequest(record_key.into(), record_value, publisher, expires),
423					None,
424				),
425			DiscoveryOut::ValuePutFailed(key, duration) =>
426				BehaviourOut::Dht(DhtEvent::ValuePutFailed(key.into()), Some(duration)),
427			DiscoveryOut::StartedProviding(key, duration) =>
428				BehaviourOut::Dht(DhtEvent::StartedProviding(key.into()), Some(duration)),
429			DiscoveryOut::StartProvidingFailed(key, duration) =>
430				BehaviourOut::Dht(DhtEvent::StartProvidingFailed(key.into()), Some(duration)),
431			DiscoveryOut::ProvidersFound(key, providers, duration) => BehaviourOut::Dht(
432				DhtEvent::ProvidersFound(
433					key.into(),
434					providers.into_iter().map(Into::into).collect(),
435				),
436				Some(duration),
437			),
438			DiscoveryOut::NoMoreProviders(key, duration) =>
439				BehaviourOut::Dht(DhtEvent::NoMoreProviders(key.into()), Some(duration)),
440			DiscoveryOut::ProvidersNotFound(key, duration) =>
441				BehaviourOut::Dht(DhtEvent::ProvidersNotFound(key.into()), Some(duration)),
442			DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
443		}
444	}
445}
446
447impl From<void::Void> for BehaviourOut {
448	fn from(e: void::Void) -> Self {
449		void::unreachable(e)
450	}
451}