1use crate::{
20 config, error,
21 peer_store::PeerStoreProvider,
22 protocol_controller::{self, SetId},
23 service::{metrics::NotificationMetrics, traits::Direction},
24 types::ProtocolName,
25};
26
27use codec::Encode;
28use libp2p::{
29 core::{transport::PortUse, Endpoint},
30 swarm::{
31 behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, THandler,
32 THandlerInEvent, THandlerOutEvent, ToSwarm,
33 },
34 Multiaddr, PeerId,
35};
36use log::{debug, warn};
37
38use codec::DecodeAll;
39use sc_network_common::{role::Roles, types::ReputationChange};
40use sc_utils::mpsc::TracingUnboundedReceiver;
41use sp_runtime::traits::Block as BlockT;
42
43use std::{collections::HashSet, iter, sync::Arc, task::Poll};
44
45use notifications::{Notifications, NotificationsOut};
46
47pub(crate) use notifications::ProtocolHandle;
48
49pub use notifications::{notification_service, NotificationsSink, ProtocolHandlePair, Ready};
50
51mod notifications;
52
53pub mod message;
54
55const LOG_TARGET: &str = "sub-libp2p";
57
58const HARDCODED_PEERSETS_SYNC: SetId = SetId::from(0);
60
61pub struct Protocol<B: BlockT> {
63 behaviour: Notifications,
65 notification_protocols: Vec<ProtocolName>,
67 peer_store_handle: Arc<dyn PeerStoreProvider>,
69 bad_handshake_streams: HashSet<PeerId>,
71 sync_handle: ProtocolHandle,
72 _marker: std::marker::PhantomData<B>,
73}
74
75impl<B: BlockT> Protocol<B> {
76 pub(crate) fn new(
78 roles: Roles,
79 notification_metrics: NotificationMetrics,
80 notification_protocols: Vec<config::NonDefaultSetConfig>,
81 block_announces_protocol: config::NonDefaultSetConfig,
82 peer_store_handle: Arc<dyn PeerStoreProvider>,
83 protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
84 from_protocol_controllers: TracingUnboundedReceiver<protocol_controller::Message>,
85 ) -> error::Result<(Self, Vec<ProtocolHandle>)> {
86 let (behaviour, notification_protocols, handles) = {
87 let installed_protocols = iter::once(block_announces_protocol.protocol_name().clone())
88 .chain(notification_protocols.iter().map(|p| p.protocol_name().clone()))
89 .collect::<Vec<_>>();
90
91 let (protocol_configs, mut handles): (Vec<_>, Vec<_>) = iter::once({
95 let config = notifications::ProtocolConfig {
96 name: block_announces_protocol.protocol_name().clone(),
97 fallback_names: block_announces_protocol.fallback_names().cloned().collect(),
98 handshake: block_announces_protocol.handshake().as_ref().unwrap().to_vec(),
99 max_notification_size: block_announces_protocol.max_notification_size(),
100 };
101
102 let (handle, command_stream) =
103 block_announces_protocol.take_protocol_handle().split();
104
105 ((config, handle.clone(), command_stream), handle)
106 })
107 .chain(notification_protocols.into_iter().map(|s| {
108 let config = notifications::ProtocolConfig {
109 name: s.protocol_name().clone(),
110 fallback_names: s.fallback_names().cloned().collect(),
111 handshake: s.handshake().as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
112 max_notification_size: s.max_notification_size(),
113 };
114
115 let (handle, command_stream) = s.take_protocol_handle().split();
116
117 ((config, handle.clone(), command_stream), handle)
118 }))
119 .unzip();
120
121 handles.iter_mut().for_each(|handle| {
122 handle.set_metrics(notification_metrics.clone());
123 });
124
125 protocol_configs.iter().enumerate().for_each(|(i, (p, _, _))| {
126 debug!(target: LOG_TARGET, "Notifications protocol {:?}: {}", SetId::from(i), p.name);
127 });
128
129 (
130 Notifications::new(
131 protocol_controller_handles,
132 from_protocol_controllers,
133 notification_metrics,
134 protocol_configs.into_iter(),
135 ),
136 installed_protocols,
137 handles,
138 )
139 };
140
141 let protocol = Self {
142 behaviour,
143 sync_handle: handles[0].clone(),
144 peer_store_handle,
145 notification_protocols,
146 bad_handshake_streams: HashSet::new(),
147 _marker: Default::default(),
149 };
150
151 Ok((protocol, handles))
152 }
153
154 pub fn num_sync_peers(&self) -> usize {
155 self.sync_handle.num_peers()
156 }
157
158 pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
160 self.behaviour.open_peers()
161 }
162
163 pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) {
165 if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name)
166 {
167 self.behaviour.disconnect_peer(peer_id, SetId::from(position));
168 } else {
169 warn!(target: LOG_TARGET, "disconnect_peer() with invalid protocol name")
170 }
171 }
172
173 fn role_available(&self, peer_id: &PeerId, handshake: &Vec<u8>) -> bool {
176 match Roles::decode_all(&mut &handshake[..]) {
177 Ok(_) => true,
178 Err(_) => self.peer_store_handle.peer_role(&((*peer_id).into())).is_some(),
179 }
180 }
181}
182
183#[derive(Debug)]
185#[must_use]
186pub enum CustomMessageOutcome {
187 NotificationStreamOpened {
189 remote: PeerId,
190 set_id: SetId,
192 direction: Direction,
194 negotiated_fallback: Option<ProtocolName>,
196 received_handshake: Vec<u8>,
198 notifications_sink: NotificationsSink,
200 },
201 NotificationStreamReplaced {
203 remote: PeerId,
205 set_id: SetId,
207 notifications_sink: NotificationsSink,
209 },
210 NotificationStreamClosed {
212 remote: PeerId,
214 set_id: SetId,
216 },
217 NotificationsReceived {
219 remote: PeerId,
221 set_id: SetId,
223 notification: Vec<u8>,
225 },
226}
227
228impl<B: BlockT> NetworkBehaviour for Protocol<B> {
229 type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
230 type ToSwarm = CustomMessageOutcome;
231
232 fn handle_established_inbound_connection(
233 &mut self,
234 connection_id: ConnectionId,
235 peer: PeerId,
236 local_addr: &Multiaddr,
237 remote_addr: &Multiaddr,
238 ) -> Result<THandler<Self>, ConnectionDenied> {
239 self.behaviour.handle_established_inbound_connection(
240 connection_id,
241 peer,
242 local_addr,
243 remote_addr,
244 )
245 }
246
247 fn handle_established_outbound_connection(
248 &mut self,
249 connection_id: ConnectionId,
250 peer: PeerId,
251 addr: &Multiaddr,
252 role_override: Endpoint,
253 port_use: PortUse,
254 ) -> Result<THandler<Self>, ConnectionDenied> {
255 self.behaviour.handle_established_outbound_connection(
256 connection_id,
257 peer,
258 addr,
259 role_override,
260 port_use,
261 )
262 }
263
264 fn handle_pending_outbound_connection(
265 &mut self,
266 _connection_id: ConnectionId,
267 _maybe_peer: Option<PeerId>,
268 _addresses: &[Multiaddr],
269 _effective_role: Endpoint,
270 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
271 Ok(Vec::new())
274 }
275
276 fn on_swarm_event(&mut self, event: FromSwarm) {
277 self.behaviour.on_swarm_event(event);
278 }
279
280 fn on_connection_handler_event(
281 &mut self,
282 peer_id: PeerId,
283 connection_id: ConnectionId,
284 event: THandlerOutEvent<Self>,
285 ) {
286 self.behaviour.on_connection_handler_event(peer_id, connection_id, event);
287 }
288
289 fn poll(
290 &mut self,
291 cx: &mut std::task::Context,
292 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
293 let event = match self.behaviour.poll(cx) {
294 Poll::Pending => return Poll::Pending,
295 Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev,
296 Poll::Ready(event) => {
297 return Poll::Ready(event.map_out(|_| {
298 unreachable!("`GenerateEvent` is handled in a branch above; qed")
299 }));
300 },
301 };
302
303 let outcome = match event {
304 NotificationsOut::CustomProtocolOpen {
305 peer_id,
306 set_id,
307 direction,
308 received_handshake,
309 notifications_sink,
310 negotiated_fallback,
311 ..
312 } =>
313 if set_id == HARDCODED_PEERSETS_SYNC {
314 let _ = self.sync_handle.report_substream_opened(
315 peer_id,
316 direction,
317 received_handshake,
318 negotiated_fallback,
319 notifications_sink,
320 );
321 None
322 } else {
323 match self.role_available(&peer_id, &received_handshake) {
324 true => Some(CustomMessageOutcome::NotificationStreamOpened {
325 remote: peer_id,
326 set_id,
327 direction,
328 negotiated_fallback,
329 received_handshake,
330 notifications_sink,
331 }),
332 false => {
333 self.bad_handshake_streams.insert(peer_id);
334 None
335 },
336 }
337 },
338 NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
339 if set_id == HARDCODED_PEERSETS_SYNC {
340 let _ = self
341 .sync_handle
342 .report_notification_sink_replaced(peer_id, notifications_sink);
343 None
344 } else {
345 (!self.bad_handshake_streams.contains(&peer_id)).then_some(
346 CustomMessageOutcome::NotificationStreamReplaced {
347 remote: peer_id,
348 set_id,
349 notifications_sink,
350 },
351 )
352 },
353 NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
354 if set_id == HARDCODED_PEERSETS_SYNC {
355 let _ = self.sync_handle.report_substream_closed(peer_id);
356 None
357 } else {
358 (!self.bad_handshake_streams.remove(&peer_id)).then_some(
359 CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, set_id },
360 )
361 }
362 },
363 NotificationsOut::Notification { peer_id, set_id, message } => {
364 if set_id == HARDCODED_PEERSETS_SYNC {
365 let _ = self
366 .sync_handle
367 .report_notification_received(peer_id, message.freeze().into());
368 None
369 } else {
370 (!self.bad_handshake_streams.contains(&peer_id)).then_some(
371 CustomMessageOutcome::NotificationsReceived {
372 remote: peer_id,
373 set_id,
374 notification: message.freeze().into(),
375 },
376 )
377 }
378 },
379
380 NotificationsOut::ProtocolMisbehavior { peer_id, set_id } => {
381 let index: usize = set_id.into();
382 let protocol_name = self.notification_protocols.get(index);
383
384 debug!(
385 target: LOG_TARGET,
386 "Received unexpected data on outbound notification stream from peer {:?} on protocol {:?}",
387 peer_id,
388 protocol_name
389 );
390
391 self.peer_store_handle.report_peer(
392 peer_id.into(),
393 ReputationChange::new_fatal(
394 "Received unexpected data on outbound notification stream",
395 ),
396 );
397
398 None
399 },
400 };
401
402 match outcome {
403 Some(event) => Poll::Ready(ToSwarm::GenerateEvent(event)),
404 None => {
405 cx.waker().wake_by_ref();
406 Poll::Pending
407 },
408 }
409 }
410}