referrerpolicy=no-referrer-when-downgrade

sc_network/
protocol.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	config, error,
21	peer_store::PeerStoreProvider,
22	protocol_controller::{self, SetId},
23	service::{metrics::NotificationMetrics, traits::Direction},
24	types::ProtocolName,
25};
26
27use codec::Encode;
28use libp2p::{
29	core::{transport::PortUse, Endpoint},
30	swarm::{
31		behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, THandler,
32		THandlerInEvent, THandlerOutEvent, ToSwarm,
33	},
34	Multiaddr, PeerId,
35};
36use log::{debug, warn};
37
38use codec::DecodeAll;
39use sc_network_common::{role::Roles, types::ReputationChange};
40use sc_utils::mpsc::TracingUnboundedReceiver;
41use sp_runtime::traits::Block as BlockT;
42
43use std::{collections::HashSet, iter, sync::Arc, task::Poll};
44
45use notifications::{Notifications, NotificationsOut};
46
47pub(crate) use notifications::ProtocolHandle;
48
49pub use notifications::{notification_service, NotificationsSink, ProtocolHandlePair, Ready};
50
51mod notifications;
52
53pub mod message;
54
55// Log target for this file.
56const LOG_TARGET: &str = "sub-libp2p";
57
58/// Identifier of the peerset for the block announces protocol.
59const HARDCODED_PEERSETS_SYNC: SetId = SetId::from(0);
60
61// Lock must always be taken in order declared here.
62pub struct Protocol<B: BlockT> {
63	/// Handles opening the unique substream and sending and receiving raw messages.
64	behaviour: Notifications,
65	/// List of notifications protocols that have been registered.
66	notification_protocols: Vec<ProtocolName>,
67	/// Handle to `PeerStore`.
68	peer_store_handle: Arc<dyn PeerStoreProvider>,
69	/// Streams for peers whose handshake couldn't be determined.
70	bad_handshake_streams: HashSet<PeerId>,
71	sync_handle: ProtocolHandle,
72	_marker: std::marker::PhantomData<B>,
73}
74
75impl<B: BlockT> Protocol<B> {
76	/// Create a new instance.
77	pub(crate) fn new(
78		roles: Roles,
79		notification_metrics: NotificationMetrics,
80		notification_protocols: Vec<config::NonDefaultSetConfig>,
81		block_announces_protocol: config::NonDefaultSetConfig,
82		peer_store_handle: Arc<dyn PeerStoreProvider>,
83		protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
84		from_protocol_controllers: TracingUnboundedReceiver<protocol_controller::Message>,
85	) -> error::Result<(Self, Vec<ProtocolHandle>)> {
86		let (behaviour, notification_protocols, handles) = {
87			let installed_protocols = iter::once(block_announces_protocol.protocol_name().clone())
88				.chain(notification_protocols.iter().map(|p| p.protocol_name().clone()))
89				.collect::<Vec<_>>();
90
91			// NOTE: Block announcement protocol is still very much hardcoded into
92			// `Protocol`. 	This protocol must be the first notification protocol given to
93			// `Notifications`
94			let (protocol_configs, mut handles): (Vec<_>, Vec<_>) = iter::once({
95				let config = notifications::ProtocolConfig {
96					name: block_announces_protocol.protocol_name().clone(),
97					fallback_names: block_announces_protocol.fallback_names().cloned().collect(),
98					handshake: block_announces_protocol.handshake().as_ref().unwrap().to_vec(),
99					max_notification_size: block_announces_protocol.max_notification_size(),
100				};
101
102				let (handle, command_stream) =
103					block_announces_protocol.take_protocol_handle().split();
104
105				((config, handle.clone(), command_stream), handle)
106			})
107			.chain(notification_protocols.into_iter().map(|s| {
108				let config = notifications::ProtocolConfig {
109					name: s.protocol_name().clone(),
110					fallback_names: s.fallback_names().cloned().collect(),
111					handshake: s.handshake().as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
112					max_notification_size: s.max_notification_size(),
113				};
114
115				let (handle, command_stream) = s.take_protocol_handle().split();
116
117				((config, handle.clone(), command_stream), handle)
118			}))
119			.unzip();
120
121			handles.iter_mut().for_each(|handle| {
122				handle.set_metrics(notification_metrics.clone());
123			});
124
125			protocol_configs.iter().enumerate().for_each(|(i, (p, _, _))| {
126				debug!(target: LOG_TARGET, "Notifications protocol {:?}: {}", SetId::from(i), p.name);
127			});
128
129			(
130				Notifications::new(
131					protocol_controller_handles,
132					from_protocol_controllers,
133					notification_metrics,
134					protocol_configs.into_iter(),
135				),
136				installed_protocols,
137				handles,
138			)
139		};
140
141		let protocol = Self {
142			behaviour,
143			sync_handle: handles[0].clone(),
144			peer_store_handle,
145			notification_protocols,
146			bad_handshake_streams: HashSet::new(),
147			// TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol`
148			_marker: Default::default(),
149		};
150
151		Ok((protocol, handles))
152	}
153
154	pub fn num_sync_peers(&self) -> usize {
155		self.sync_handle.num_peers()
156	}
157
158	/// Returns the list of all the peers we have an open channel to.
159	pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
160		self.behaviour.open_peers()
161	}
162
163	/// Disconnects the given peer if we are connected to it.
164	pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) {
165		if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name)
166		{
167			self.behaviour.disconnect_peer(peer_id, SetId::from(position));
168		} else {
169			warn!(target: LOG_TARGET, "disconnect_peer() with invalid protocol name")
170		}
171	}
172
173	/// Check if role is available for `peer_id` by attempt to decode the handshake to roles and if
174	/// that fails, check if the role has been registered to `PeerStore`.
175	fn role_available(&self, peer_id: &PeerId, handshake: &Vec<u8>) -> bool {
176		match Roles::decode_all(&mut &handshake[..]) {
177			Ok(_) => true,
178			Err(_) => self.peer_store_handle.peer_role(&((*peer_id).into())).is_some(),
179		}
180	}
181}
182
183/// Outcome of an incoming custom message.
184#[derive(Debug)]
185#[must_use]
186pub enum CustomMessageOutcome {
187	/// Notification protocols have been opened with a remote.
188	NotificationStreamOpened {
189		remote: PeerId,
190		// protocol: ProtocolName,
191		set_id: SetId,
192		/// Direction of the stream.
193		direction: Direction,
194		/// See [`crate::Event::NotificationStreamOpened::negotiated_fallback`].
195		negotiated_fallback: Option<ProtocolName>,
196		/// Received handshake.
197		received_handshake: Vec<u8>,
198		/// Notification sink.
199		notifications_sink: NotificationsSink,
200	},
201	/// The [`NotificationsSink`] of some notification protocols need an update.
202	NotificationStreamReplaced {
203		// Peer ID.
204		remote: PeerId,
205		/// Set ID.
206		set_id: SetId,
207		/// New notification sink.
208		notifications_sink: NotificationsSink,
209	},
210	/// Notification protocols have been closed with a remote.
211	NotificationStreamClosed {
212		// Peer ID.
213		remote: PeerId,
214		/// Set ID.
215		set_id: SetId,
216	},
217	/// Messages have been received on one or more notifications protocols.
218	NotificationsReceived {
219		// Peer ID.
220		remote: PeerId,
221		/// Set ID.
222		set_id: SetId,
223		/// Received notification.
224		notification: Vec<u8>,
225	},
226}
227
228impl<B: BlockT> NetworkBehaviour for Protocol<B> {
229	type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
230	type ToSwarm = CustomMessageOutcome;
231
232	fn handle_established_inbound_connection(
233		&mut self,
234		connection_id: ConnectionId,
235		peer: PeerId,
236		local_addr: &Multiaddr,
237		remote_addr: &Multiaddr,
238	) -> Result<THandler<Self>, ConnectionDenied> {
239		self.behaviour.handle_established_inbound_connection(
240			connection_id,
241			peer,
242			local_addr,
243			remote_addr,
244		)
245	}
246
247	fn handle_established_outbound_connection(
248		&mut self,
249		connection_id: ConnectionId,
250		peer: PeerId,
251		addr: &Multiaddr,
252		role_override: Endpoint,
253		port_use: PortUse,
254	) -> Result<THandler<Self>, ConnectionDenied> {
255		self.behaviour.handle_established_outbound_connection(
256			connection_id,
257			peer,
258			addr,
259			role_override,
260			port_use,
261		)
262	}
263
264	fn handle_pending_outbound_connection(
265		&mut self,
266		_connection_id: ConnectionId,
267		_maybe_peer: Option<PeerId>,
268		_addresses: &[Multiaddr],
269		_effective_role: Endpoint,
270	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
271		// Only `Discovery::handle_pending_outbound_connection` must be returning addresses to
272		// ensure that we don't return unwanted addresses.
273		Ok(Vec::new())
274	}
275
276	fn on_swarm_event(&mut self, event: FromSwarm) {
277		self.behaviour.on_swarm_event(event);
278	}
279
280	fn on_connection_handler_event(
281		&mut self,
282		peer_id: PeerId,
283		connection_id: ConnectionId,
284		event: THandlerOutEvent<Self>,
285	) {
286		self.behaviour.on_connection_handler_event(peer_id, connection_id, event);
287	}
288
289	fn poll(
290		&mut self,
291		cx: &mut std::task::Context,
292	) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
293		let event = match self.behaviour.poll(cx) {
294			Poll::Pending => return Poll::Pending,
295			Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev,
296			Poll::Ready(event) => {
297				return Poll::Ready(event.map_out(|_| {
298					unreachable!("`GenerateEvent` is handled in a branch above; qed")
299				}));
300			},
301		};
302
303		let outcome = match event {
304			NotificationsOut::CustomProtocolOpen {
305				peer_id,
306				set_id,
307				direction,
308				received_handshake,
309				notifications_sink,
310				negotiated_fallback,
311				..
312			} =>
313				if set_id == HARDCODED_PEERSETS_SYNC {
314					let _ = self.sync_handle.report_substream_opened(
315						peer_id,
316						direction,
317						received_handshake,
318						negotiated_fallback,
319						notifications_sink,
320					);
321					None
322				} else {
323					match self.role_available(&peer_id, &received_handshake) {
324						true => Some(CustomMessageOutcome::NotificationStreamOpened {
325							remote: peer_id,
326							set_id,
327							direction,
328							negotiated_fallback,
329							received_handshake,
330							notifications_sink,
331						}),
332						false => {
333							self.bad_handshake_streams.insert(peer_id);
334							None
335						},
336					}
337				},
338			NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
339				if set_id == HARDCODED_PEERSETS_SYNC {
340					let _ = self
341						.sync_handle
342						.report_notification_sink_replaced(peer_id, notifications_sink);
343					None
344				} else {
345					(!self.bad_handshake_streams.contains(&peer_id)).then_some(
346						CustomMessageOutcome::NotificationStreamReplaced {
347							remote: peer_id,
348							set_id,
349							notifications_sink,
350						},
351					)
352				},
353			NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
354				if set_id == HARDCODED_PEERSETS_SYNC {
355					let _ = self.sync_handle.report_substream_closed(peer_id);
356					None
357				} else {
358					(!self.bad_handshake_streams.remove(&peer_id)).then_some(
359						CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, set_id },
360					)
361				}
362			},
363			NotificationsOut::Notification { peer_id, set_id, message } => {
364				if set_id == HARDCODED_PEERSETS_SYNC {
365					let _ = self
366						.sync_handle
367						.report_notification_received(peer_id, message.freeze().into());
368					None
369				} else {
370					(!self.bad_handshake_streams.contains(&peer_id)).then_some(
371						CustomMessageOutcome::NotificationsReceived {
372							remote: peer_id,
373							set_id,
374							notification: message.freeze().into(),
375						},
376					)
377				}
378			},
379
380			NotificationsOut::ProtocolMisbehavior { peer_id, set_id } => {
381				let index: usize = set_id.into();
382				let protocol_name = self.notification_protocols.get(index);
383
384				debug!(
385					target: LOG_TARGET,
386					"Received unexpected data on outbound notification stream from peer {:?} on protocol {:?}",
387					peer_id,
388					protocol_name
389				);
390
391				self.peer_store_handle.report_peer(
392					peer_id.into(),
393					ReputationChange::new_fatal(
394						"Received unexpected data on outbound notification stream",
395					),
396				);
397
398				None
399			},
400		};
401
402		match outcome {
403			Some(event) => Poll::Ready(ToSwarm::GenerateEvent(event)),
404			None => {
405				cx.waker().wake_by_ref();
406				Poll::Pending
407			},
408		}
409	}
410}