use crate::{
config, error,
peer_store::PeerStoreProvider,
protocol_controller::{self, SetId},
service::{metrics::NotificationMetrics, traits::Direction},
types::ProtocolName,
MAX_RESPONSE_SIZE,
};
use codec::Encode;
use libp2p::{
core::Endpoint,
swarm::{
behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters,
THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use log::warn;
use codec::DecodeAll;
use sc_network_common::role::Roles;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_runtime::traits::Block as BlockT;
use std::{collections::HashSet, iter, sync::Arc, task::Poll};
use notifications::{Notifications, NotificationsOut};
pub(crate) use notifications::ProtocolHandle;
pub use notifications::{
notification_service, NotificationsSink, NotifsHandlerError, ProtocolHandlePair, Ready,
};
mod notifications;
pub mod message;
pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = MAX_RESPONSE_SIZE;
const HARDCODED_PEERSETS_SYNC: SetId = SetId::from(0);
pub struct Protocol<B: BlockT> {
behaviour: Notifications,
notification_protocols: Vec<ProtocolName>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
bad_handshake_streams: HashSet<PeerId>,
sync_handle: ProtocolHandle,
_marker: std::marker::PhantomData<B>,
}
impl<B: BlockT> Protocol<B> {
pub(crate) fn new(
roles: Roles,
notification_metrics: NotificationMetrics,
notification_protocols: Vec<config::NonDefaultSetConfig>,
block_announces_protocol: config::NonDefaultSetConfig,
peer_store_handle: Arc<dyn PeerStoreProvider>,
protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
from_protocol_controllers: TracingUnboundedReceiver<protocol_controller::Message>,
) -> error::Result<(Self, Vec<ProtocolHandle>)> {
let (behaviour, notification_protocols, handles) = {
let installed_protocols = iter::once(block_announces_protocol.protocol_name().clone())
.chain(notification_protocols.iter().map(|p| p.protocol_name().clone()))
.collect::<Vec<_>>();
let (protocol_configs, mut handles): (Vec<_>, Vec<_>) = iter::once({
let config = notifications::ProtocolConfig {
name: block_announces_protocol.protocol_name().clone(),
fallback_names: block_announces_protocol.fallback_names().cloned().collect(),
handshake: block_announces_protocol.handshake().as_ref().unwrap().to_vec(),
max_notification_size: block_announces_protocol.max_notification_size(),
};
let (handle, command_stream) =
block_announces_protocol.take_protocol_handle().split();
((config, handle.clone(), command_stream), handle)
})
.chain(notification_protocols.into_iter().map(|s| {
let config = notifications::ProtocolConfig {
name: s.protocol_name().clone(),
fallback_names: s.fallback_names().cloned().collect(),
handshake: s.handshake().as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
max_notification_size: s.max_notification_size(),
};
let (handle, command_stream) = s.take_protocol_handle().split();
((config, handle.clone(), command_stream), handle)
}))
.unzip();
handles.iter_mut().for_each(|handle| {
handle.set_metrics(notification_metrics.clone());
});
(
Notifications::new(
protocol_controller_handles,
from_protocol_controllers,
notification_metrics,
protocol_configs.into_iter(),
),
installed_protocols,
handles,
)
};
let protocol = Self {
behaviour,
sync_handle: handles[0].clone(),
peer_store_handle,
notification_protocols,
bad_handshake_streams: HashSet::new(),
_marker: Default::default(),
};
Ok((protocol, handles))
}
pub fn num_sync_peers(&self) -> usize {
self.sync_handle.num_peers()
}
pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
self.behaviour.open_peers()
}
pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) {
if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name)
{
self.behaviour.disconnect_peer(peer_id, SetId::from(position));
} else {
warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name")
}
}
fn role_available(&self, peer_id: &PeerId, handshake: &Vec<u8>) -> bool {
match Roles::decode_all(&mut &handshake[..]) {
Ok(_) => true,
Err(_) => self.peer_store_handle.peer_role(&((*peer_id).into())).is_some(),
}
}
}
#[derive(Debug)]
#[must_use]
pub enum CustomMessageOutcome {
NotificationStreamOpened {
remote: PeerId,
set_id: SetId,
direction: Direction,
negotiated_fallback: Option<ProtocolName>,
received_handshake: Vec<u8>,
notifications_sink: NotificationsSink,
},
NotificationStreamReplaced {
remote: PeerId,
set_id: SetId,
notifications_sink: NotificationsSink,
},
NotificationStreamClosed {
remote: PeerId,
set_id: SetId,
},
NotificationsReceived {
remote: PeerId,
set_id: SetId,
notification: Vec<u8>,
},
}
impl<B: BlockT> NetworkBehaviour for Protocol<B> {
type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = CustomMessageOutcome;
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.behaviour.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.behaviour.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
)
}
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
Ok(Vec::new())
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.behaviour.on_swarm_event(event);
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.behaviour.on_connection_handler_event(peer_id, connection_id, event);
}
fn poll(
&mut self,
cx: &mut std::task::Context,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
let event = match self.behaviour.poll(cx, params) {
Poll::Pending => return Poll::Pending,
Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev,
Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) =>
return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) =>
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) =>
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) =>
return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
Poll::Ready(ToSwarm::ListenOn { opts }) =>
return Poll::Ready(ToSwarm::ListenOn { opts }),
Poll::Ready(ToSwarm::RemoveListener { id }) =>
return Poll::Ready(ToSwarm::RemoveListener { id }),
};
let outcome = match event {
NotificationsOut::CustomProtocolOpen {
peer_id,
set_id,
direction,
received_handshake,
notifications_sink,
negotiated_fallback,
..
} =>
if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.sync_handle.report_substream_opened(
peer_id,
direction,
received_handshake,
negotiated_fallback,
notifications_sink,
);
None
} else {
match self.role_available(&peer_id, &received_handshake) {
true => Some(CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
set_id,
direction,
negotiated_fallback,
received_handshake,
notifications_sink,
}),
false => {
self.bad_handshake_streams.insert(peer_id);
None
},
}
},
NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self
.sync_handle
.report_notification_sink_replaced(peer_id, notifications_sink);
None
} else {
(!self.bad_handshake_streams.contains(&peer_id)).then_some(
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
set_id,
notifications_sink,
},
)
},
NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.sync_handle.report_substream_closed(peer_id);
None
} else {
(!self.bad_handshake_streams.remove(&peer_id)).then_some(
CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, set_id },
)
}
},
NotificationsOut::Notification { peer_id, set_id, message } => {
if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self
.sync_handle
.report_notification_received(peer_id, message.freeze().into());
None
} else {
(!self.bad_handshake_streams.contains(&peer_id)).then_some(
CustomMessageOutcome::NotificationsReceived {
remote: peer_id,
set_id,
notification: message.freeze().into(),
},
)
}
},
};
match outcome {
Some(event) => Poll::Ready(ToSwarm::GenerateEvent(event)),
None => {
cx.waker().wake_by_ref();
Poll::Pending
},
}
}
}