1use crate::{
20 config, error,
21 peer_store::PeerStoreProvider,
22 protocol_controller::{self, SetId},
23 service::{metrics::NotificationMetrics, traits::Direction},
24 types::ProtocolName,
25 MAX_RESPONSE_SIZE,
26};
27
28use codec::Encode;
29use libp2p::{
30 core::Endpoint,
31 swarm::{
32 behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters,
33 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
34 },
35 Multiaddr, PeerId,
36};
37use log::warn;
38
39use codec::DecodeAll;
40use sc_network_common::role::Roles;
41use sc_utils::mpsc::TracingUnboundedReceiver;
42use sp_runtime::traits::Block as BlockT;
43
44use std::{collections::HashSet, iter, sync::Arc, task::Poll};
45
46use notifications::{Notifications, NotificationsOut};
47
48pub(crate) use notifications::ProtocolHandle;
49
50pub use notifications::{
51 notification_service, NotificationsSink, NotifsHandlerError, ProtocolHandlePair, Ready,
52};
53
54mod notifications;
55
56pub mod message;
57
58pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = MAX_RESPONSE_SIZE;
61
62const HARDCODED_PEERSETS_SYNC: SetId = SetId::from(0);
64
65pub struct Protocol<B: BlockT> {
67 behaviour: Notifications,
69 notification_protocols: Vec<ProtocolName>,
71 peer_store_handle: Arc<dyn PeerStoreProvider>,
73 bad_handshake_streams: HashSet<PeerId>,
75 sync_handle: ProtocolHandle,
76 _marker: std::marker::PhantomData<B>,
77}
78
79impl<B: BlockT> Protocol<B> {
80 pub(crate) fn new(
82 roles: Roles,
83 notification_metrics: NotificationMetrics,
84 notification_protocols: Vec<config::NonDefaultSetConfig>,
85 block_announces_protocol: config::NonDefaultSetConfig,
86 peer_store_handle: Arc<dyn PeerStoreProvider>,
87 protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
88 from_protocol_controllers: TracingUnboundedReceiver<protocol_controller::Message>,
89 ) -> error::Result<(Self, Vec<ProtocolHandle>)> {
90 let (behaviour, notification_protocols, handles) = {
91 let installed_protocols = iter::once(block_announces_protocol.protocol_name().clone())
92 .chain(notification_protocols.iter().map(|p| p.protocol_name().clone()))
93 .collect::<Vec<_>>();
94
95 let (protocol_configs, mut handles): (Vec<_>, Vec<_>) = iter::once({
99 let config = notifications::ProtocolConfig {
100 name: block_announces_protocol.protocol_name().clone(),
101 fallback_names: block_announces_protocol.fallback_names().cloned().collect(),
102 handshake: block_announces_protocol.handshake().as_ref().unwrap().to_vec(),
103 max_notification_size: block_announces_protocol.max_notification_size(),
104 };
105
106 let (handle, command_stream) =
107 block_announces_protocol.take_protocol_handle().split();
108
109 ((config, handle.clone(), command_stream), handle)
110 })
111 .chain(notification_protocols.into_iter().map(|s| {
112 let config = notifications::ProtocolConfig {
113 name: s.protocol_name().clone(),
114 fallback_names: s.fallback_names().cloned().collect(),
115 handshake: s.handshake().as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
116 max_notification_size: s.max_notification_size(),
117 };
118
119 let (handle, command_stream) = s.take_protocol_handle().split();
120
121 ((config, handle.clone(), command_stream), handle)
122 }))
123 .unzip();
124
125 handles.iter_mut().for_each(|handle| {
126 handle.set_metrics(notification_metrics.clone());
127 });
128
129 (
130 Notifications::new(
131 protocol_controller_handles,
132 from_protocol_controllers,
133 notification_metrics,
134 protocol_configs.into_iter(),
135 ),
136 installed_protocols,
137 handles,
138 )
139 };
140
141 let protocol = Self {
142 behaviour,
143 sync_handle: handles[0].clone(),
144 peer_store_handle,
145 notification_protocols,
146 bad_handshake_streams: HashSet::new(),
147 _marker: Default::default(),
149 };
150
151 Ok((protocol, handles))
152 }
153
154 pub fn num_sync_peers(&self) -> usize {
155 self.sync_handle.num_peers()
156 }
157
158 pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
160 self.behaviour.open_peers()
161 }
162
163 pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) {
165 if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name)
166 {
167 self.behaviour.disconnect_peer(peer_id, SetId::from(position));
168 } else {
169 warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name")
170 }
171 }
172
173 fn role_available(&self, peer_id: &PeerId, handshake: &Vec<u8>) -> bool {
176 match Roles::decode_all(&mut &handshake[..]) {
177 Ok(_) => true,
178 Err(_) => self.peer_store_handle.peer_role(&((*peer_id).into())).is_some(),
179 }
180 }
181}
182
183#[derive(Debug)]
185#[must_use]
186pub enum CustomMessageOutcome {
187 NotificationStreamOpened {
189 remote: PeerId,
190 set_id: SetId,
192 direction: Direction,
194 negotiated_fallback: Option<ProtocolName>,
196 received_handshake: Vec<u8>,
198 notifications_sink: NotificationsSink,
200 },
201 NotificationStreamReplaced {
203 remote: PeerId,
205 set_id: SetId,
207 notifications_sink: NotificationsSink,
209 },
210 NotificationStreamClosed {
212 remote: PeerId,
214 set_id: SetId,
216 },
217 NotificationsReceived {
219 remote: PeerId,
221 set_id: SetId,
223 notification: Vec<u8>,
225 },
226}
227
228impl<B: BlockT> NetworkBehaviour for Protocol<B> {
229 type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
230 type ToSwarm = CustomMessageOutcome;
231
232 fn handle_established_inbound_connection(
233 &mut self,
234 connection_id: ConnectionId,
235 peer: PeerId,
236 local_addr: &Multiaddr,
237 remote_addr: &Multiaddr,
238 ) -> Result<THandler<Self>, ConnectionDenied> {
239 self.behaviour.handle_established_inbound_connection(
240 connection_id,
241 peer,
242 local_addr,
243 remote_addr,
244 )
245 }
246
247 fn handle_established_outbound_connection(
248 &mut self,
249 connection_id: ConnectionId,
250 peer: PeerId,
251 addr: &Multiaddr,
252 role_override: Endpoint,
253 ) -> Result<THandler<Self>, ConnectionDenied> {
254 self.behaviour.handle_established_outbound_connection(
255 connection_id,
256 peer,
257 addr,
258 role_override,
259 )
260 }
261
262 fn handle_pending_outbound_connection(
263 &mut self,
264 _connection_id: ConnectionId,
265 _maybe_peer: Option<PeerId>,
266 _addresses: &[Multiaddr],
267 _effective_role: Endpoint,
268 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
269 Ok(Vec::new())
272 }
273
274 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
275 self.behaviour.on_swarm_event(event);
276 }
277
278 fn on_connection_handler_event(
279 &mut self,
280 peer_id: PeerId,
281 connection_id: ConnectionId,
282 event: THandlerOutEvent<Self>,
283 ) {
284 self.behaviour.on_connection_handler_event(peer_id, connection_id, event);
285 }
286
287 fn poll(
288 &mut self,
289 cx: &mut std::task::Context,
290 params: &mut impl PollParameters,
291 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
292 let event = match self.behaviour.poll(cx, params) {
293 Poll::Pending => return Poll::Pending,
294 Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev,
295 Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
296 Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) =>
297 return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
298 Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) =>
299 return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
300 Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) =>
301 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
302 Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) =>
303 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
304 Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) =>
305 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
306 Poll::Ready(ToSwarm::ListenOn { opts }) =>
307 return Poll::Ready(ToSwarm::ListenOn { opts }),
308 Poll::Ready(ToSwarm::RemoveListener { id }) =>
309 return Poll::Ready(ToSwarm::RemoveListener { id }),
310 };
311
312 let outcome = match event {
313 NotificationsOut::CustomProtocolOpen {
314 peer_id,
315 set_id,
316 direction,
317 received_handshake,
318 notifications_sink,
319 negotiated_fallback,
320 ..
321 } =>
322 if set_id == HARDCODED_PEERSETS_SYNC {
323 let _ = self.sync_handle.report_substream_opened(
324 peer_id,
325 direction,
326 received_handshake,
327 negotiated_fallback,
328 notifications_sink,
329 );
330 None
331 } else {
332 match self.role_available(&peer_id, &received_handshake) {
333 true => Some(CustomMessageOutcome::NotificationStreamOpened {
334 remote: peer_id,
335 set_id,
336 direction,
337 negotiated_fallback,
338 received_handshake,
339 notifications_sink,
340 }),
341 false => {
342 self.bad_handshake_streams.insert(peer_id);
343 None
344 },
345 }
346 },
347 NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
348 if set_id == HARDCODED_PEERSETS_SYNC {
349 let _ = self
350 .sync_handle
351 .report_notification_sink_replaced(peer_id, notifications_sink);
352 None
353 } else {
354 (!self.bad_handshake_streams.contains(&peer_id)).then_some(
355 CustomMessageOutcome::NotificationStreamReplaced {
356 remote: peer_id,
357 set_id,
358 notifications_sink,
359 },
360 )
361 },
362 NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
363 if set_id == HARDCODED_PEERSETS_SYNC {
364 let _ = self.sync_handle.report_substream_closed(peer_id);
365 None
366 } else {
367 (!self.bad_handshake_streams.remove(&peer_id)).then_some(
368 CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, set_id },
369 )
370 }
371 },
372 NotificationsOut::Notification { peer_id, set_id, message } => {
373 if set_id == HARDCODED_PEERSETS_SYNC {
374 let _ = self
375 .sync_handle
376 .report_notification_received(peer_id, message.freeze().into());
377 None
378 } else {
379 (!self.bad_handshake_streams.contains(&peer_id)).then_some(
380 CustomMessageOutcome::NotificationsReceived {
381 remote: peer_id,
382 set_id,
383 notification: message.freeze().into(),
384 },
385 )
386 }
387 },
388 };
389
390 match outcome {
391 Some(event) => Poll::Ready(ToSwarm::GenerateEvent(event)),
392 None => {
393 cx.waker().wake_by_ref();
394 Poll::Pending
395 },
396 }
397 }
398}