sc_network/
protocol.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	config, error,
21	peer_store::PeerStoreProvider,
22	protocol_controller::{self, SetId},
23	service::{metrics::NotificationMetrics, traits::Direction},
24	types::ProtocolName,
25	MAX_RESPONSE_SIZE,
26};
27
28use codec::Encode;
29use libp2p::{
30	core::Endpoint,
31	swarm::{
32		behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters,
33		THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
34	},
35	Multiaddr, PeerId,
36};
37use log::warn;
38
39use codec::DecodeAll;
40use sc_network_common::role::Roles;
41use sc_utils::mpsc::TracingUnboundedReceiver;
42use sp_runtime::traits::Block as BlockT;
43
44use std::{collections::HashSet, iter, sync::Arc, task::Poll};
45
46use notifications::{Notifications, NotificationsOut};
47
48pub(crate) use notifications::ProtocolHandle;
49
50pub use notifications::{
51	notification_service, NotificationsSink, NotifsHandlerError, ProtocolHandlePair, Ready,
52};
53
54mod notifications;
55
56pub mod message;
57
58/// Maximum size used for notifications in the block announce and transaction protocols.
59// Must be equal to `max(MAX_BLOCK_ANNOUNCE_SIZE, MAX_TRANSACTIONS_SIZE)`.
60pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = MAX_RESPONSE_SIZE;
61
62/// Identifier of the peerset for the block announces protocol.
63const HARDCODED_PEERSETS_SYNC: SetId = SetId::from(0);
64
65// Lock must always be taken in order declared here.
66pub struct Protocol<B: BlockT> {
67	/// Handles opening the unique substream and sending and receiving raw messages.
68	behaviour: Notifications,
69	/// List of notifications protocols that have been registered.
70	notification_protocols: Vec<ProtocolName>,
71	/// Handle to `PeerStore`.
72	peer_store_handle: Arc<dyn PeerStoreProvider>,
73	/// Streams for peers whose handshake couldn't be determined.
74	bad_handshake_streams: HashSet<PeerId>,
75	sync_handle: ProtocolHandle,
76	_marker: std::marker::PhantomData<B>,
77}
78
79impl<B: BlockT> Protocol<B> {
80	/// Create a new instance.
81	pub(crate) fn new(
82		roles: Roles,
83		notification_metrics: NotificationMetrics,
84		notification_protocols: Vec<config::NonDefaultSetConfig>,
85		block_announces_protocol: config::NonDefaultSetConfig,
86		peer_store_handle: Arc<dyn PeerStoreProvider>,
87		protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
88		from_protocol_controllers: TracingUnboundedReceiver<protocol_controller::Message>,
89	) -> error::Result<(Self, Vec<ProtocolHandle>)> {
90		let (behaviour, notification_protocols, handles) = {
91			let installed_protocols = iter::once(block_announces_protocol.protocol_name().clone())
92				.chain(notification_protocols.iter().map(|p| p.protocol_name().clone()))
93				.collect::<Vec<_>>();
94
95			// NOTE: Block announcement protocol is still very much hardcoded into
96			// `Protocol`. 	This protocol must be the first notification protocol given to
97			// `Notifications`
98			let (protocol_configs, mut handles): (Vec<_>, Vec<_>) = iter::once({
99				let config = notifications::ProtocolConfig {
100					name: block_announces_protocol.protocol_name().clone(),
101					fallback_names: block_announces_protocol.fallback_names().cloned().collect(),
102					handshake: block_announces_protocol.handshake().as_ref().unwrap().to_vec(),
103					max_notification_size: block_announces_protocol.max_notification_size(),
104				};
105
106				let (handle, command_stream) =
107					block_announces_protocol.take_protocol_handle().split();
108
109				((config, handle.clone(), command_stream), handle)
110			})
111			.chain(notification_protocols.into_iter().map(|s| {
112				let config = notifications::ProtocolConfig {
113					name: s.protocol_name().clone(),
114					fallback_names: s.fallback_names().cloned().collect(),
115					handshake: s.handshake().as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
116					max_notification_size: s.max_notification_size(),
117				};
118
119				let (handle, command_stream) = s.take_protocol_handle().split();
120
121				((config, handle.clone(), command_stream), handle)
122			}))
123			.unzip();
124
125			handles.iter_mut().for_each(|handle| {
126				handle.set_metrics(notification_metrics.clone());
127			});
128
129			(
130				Notifications::new(
131					protocol_controller_handles,
132					from_protocol_controllers,
133					notification_metrics,
134					protocol_configs.into_iter(),
135				),
136				installed_protocols,
137				handles,
138			)
139		};
140
141		let protocol = Self {
142			behaviour,
143			sync_handle: handles[0].clone(),
144			peer_store_handle,
145			notification_protocols,
146			bad_handshake_streams: HashSet::new(),
147			// TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol`
148			_marker: Default::default(),
149		};
150
151		Ok((protocol, handles))
152	}
153
154	pub fn num_sync_peers(&self) -> usize {
155		self.sync_handle.num_peers()
156	}
157
158	/// Returns the list of all the peers we have an open channel to.
159	pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
160		self.behaviour.open_peers()
161	}
162
163	/// Disconnects the given peer if we are connected to it.
164	pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) {
165		if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name)
166		{
167			self.behaviour.disconnect_peer(peer_id, SetId::from(position));
168		} else {
169			warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name")
170		}
171	}
172
173	/// Check if role is available for `peer_id` by attempt to decode the handshake to roles and if
174	/// that fails, check if the role has been registered to `PeerStore`.
175	fn role_available(&self, peer_id: &PeerId, handshake: &Vec<u8>) -> bool {
176		match Roles::decode_all(&mut &handshake[..]) {
177			Ok(_) => true,
178			Err(_) => self.peer_store_handle.peer_role(&((*peer_id).into())).is_some(),
179		}
180	}
181}
182
183/// Outcome of an incoming custom message.
184#[derive(Debug)]
185#[must_use]
186pub enum CustomMessageOutcome {
187	/// Notification protocols have been opened with a remote.
188	NotificationStreamOpened {
189		remote: PeerId,
190		// protocol: ProtocolName,
191		set_id: SetId,
192		/// Direction of the stream.
193		direction: Direction,
194		/// See [`crate::Event::NotificationStreamOpened::negotiated_fallback`].
195		negotiated_fallback: Option<ProtocolName>,
196		/// Received handshake.
197		received_handshake: Vec<u8>,
198		/// Notification sink.
199		notifications_sink: NotificationsSink,
200	},
201	/// The [`NotificationsSink`] of some notification protocols need an update.
202	NotificationStreamReplaced {
203		// Peer ID.
204		remote: PeerId,
205		/// Set ID.
206		set_id: SetId,
207		/// New notification sink.
208		notifications_sink: NotificationsSink,
209	},
210	/// Notification protocols have been closed with a remote.
211	NotificationStreamClosed {
212		// Peer ID.
213		remote: PeerId,
214		/// Set ID.
215		set_id: SetId,
216	},
217	/// Messages have been received on one or more notifications protocols.
218	NotificationsReceived {
219		// Peer ID.
220		remote: PeerId,
221		/// Set ID.
222		set_id: SetId,
223		/// Received notification.
224		notification: Vec<u8>,
225	},
226}
227
228impl<B: BlockT> NetworkBehaviour for Protocol<B> {
229	type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
230	type ToSwarm = CustomMessageOutcome;
231
232	fn handle_established_inbound_connection(
233		&mut self,
234		connection_id: ConnectionId,
235		peer: PeerId,
236		local_addr: &Multiaddr,
237		remote_addr: &Multiaddr,
238	) -> Result<THandler<Self>, ConnectionDenied> {
239		self.behaviour.handle_established_inbound_connection(
240			connection_id,
241			peer,
242			local_addr,
243			remote_addr,
244		)
245	}
246
247	fn handle_established_outbound_connection(
248		&mut self,
249		connection_id: ConnectionId,
250		peer: PeerId,
251		addr: &Multiaddr,
252		role_override: Endpoint,
253	) -> Result<THandler<Self>, ConnectionDenied> {
254		self.behaviour.handle_established_outbound_connection(
255			connection_id,
256			peer,
257			addr,
258			role_override,
259		)
260	}
261
262	fn handle_pending_outbound_connection(
263		&mut self,
264		_connection_id: ConnectionId,
265		_maybe_peer: Option<PeerId>,
266		_addresses: &[Multiaddr],
267		_effective_role: Endpoint,
268	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
269		// Only `Discovery::handle_pending_outbound_connection` must be returning addresses to
270		// ensure that we don't return unwanted addresses.
271		Ok(Vec::new())
272	}
273
274	fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
275		self.behaviour.on_swarm_event(event);
276	}
277
278	fn on_connection_handler_event(
279		&mut self,
280		peer_id: PeerId,
281		connection_id: ConnectionId,
282		event: THandlerOutEvent<Self>,
283	) {
284		self.behaviour.on_connection_handler_event(peer_id, connection_id, event);
285	}
286
287	fn poll(
288		&mut self,
289		cx: &mut std::task::Context,
290		params: &mut impl PollParameters,
291	) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
292		let event = match self.behaviour.poll(cx, params) {
293			Poll::Pending => return Poll::Pending,
294			Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev,
295			Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
296			Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) =>
297				return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
298			Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) =>
299				return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
300			Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) =>
301				return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
302			Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) =>
303				return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
304			Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) =>
305				return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
306			Poll::Ready(ToSwarm::ListenOn { opts }) =>
307				return Poll::Ready(ToSwarm::ListenOn { opts }),
308			Poll::Ready(ToSwarm::RemoveListener { id }) =>
309				return Poll::Ready(ToSwarm::RemoveListener { id }),
310		};
311
312		let outcome = match event {
313			NotificationsOut::CustomProtocolOpen {
314				peer_id,
315				set_id,
316				direction,
317				received_handshake,
318				notifications_sink,
319				negotiated_fallback,
320				..
321			} =>
322				if set_id == HARDCODED_PEERSETS_SYNC {
323					let _ = self.sync_handle.report_substream_opened(
324						peer_id,
325						direction,
326						received_handshake,
327						negotiated_fallback,
328						notifications_sink,
329					);
330					None
331				} else {
332					match self.role_available(&peer_id, &received_handshake) {
333						true => Some(CustomMessageOutcome::NotificationStreamOpened {
334							remote: peer_id,
335							set_id,
336							direction,
337							negotiated_fallback,
338							received_handshake,
339							notifications_sink,
340						}),
341						false => {
342							self.bad_handshake_streams.insert(peer_id);
343							None
344						},
345					}
346				},
347			NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
348				if set_id == HARDCODED_PEERSETS_SYNC {
349					let _ = self
350						.sync_handle
351						.report_notification_sink_replaced(peer_id, notifications_sink);
352					None
353				} else {
354					(!self.bad_handshake_streams.contains(&peer_id)).then_some(
355						CustomMessageOutcome::NotificationStreamReplaced {
356							remote: peer_id,
357							set_id,
358							notifications_sink,
359						},
360					)
361				},
362			NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
363				if set_id == HARDCODED_PEERSETS_SYNC {
364					let _ = self.sync_handle.report_substream_closed(peer_id);
365					None
366				} else {
367					(!self.bad_handshake_streams.remove(&peer_id)).then_some(
368						CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, set_id },
369					)
370				}
371			},
372			NotificationsOut::Notification { peer_id, set_id, message } => {
373				if set_id == HARDCODED_PEERSETS_SYNC {
374					let _ = self
375						.sync_handle
376						.report_notification_received(peer_id, message.freeze().into());
377					None
378				} else {
379					(!self.bad_handshake_streams.contains(&peer_id)).then_some(
380						CustomMessageOutcome::NotificationsReceived {
381							remote: peer_id,
382							set_id,
383							notification: message.freeze().into(),
384						},
385					)
386				}
387			},
388		};
389
390		match outcome {
391			Some(event) => Poll::Ready(ToSwarm::GenerateEvent(event)),
392			None => {
393				cx.waker().wake_by_ref();
394				Poll::Pending
395			},
396		}
397	}
398}