1use crate::{
20 discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
21 event::DhtEvent,
22 peer_info,
23 peer_store::PeerStoreProvider,
24 protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
25 protocol_controller::SetId,
26 request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure},
27 service::traits::Direction,
28 types::ProtocolName,
29 ReputationChange,
30};
31
32use futures::channel::oneshot;
33use libp2p::{
34 connection_limits::ConnectionLimits,
35 core::Multiaddr,
36 identify::Info as IdentifyInfo,
37 identity::PublicKey,
38 kad::{Record, RecordKey},
39 swarm::NetworkBehaviour,
40 PeerId, StreamProtocol,
41};
42
43use parking_lot::Mutex;
44use sp_runtime::traits::Block as BlockT;
45use std::{
46 collections::HashSet,
47 sync::Arc,
48 time::{Duration, Instant},
49};
50
51pub use crate::request_responses::{InboundFailure, OutboundFailure, ResponseFailure};
52
53#[derive(NetworkBehaviour)]
55#[behaviour(to_swarm = "BehaviourOut")]
56pub struct Behaviour<B: BlockT> {
57 connection_limits: libp2p::connection_limits::Behaviour,
59 substrate: Protocol<B>,
61 peer_info: peer_info::PeerInfoBehaviour,
64 discovery: DiscoveryBehaviour,
66 request_responses: request_responses::RequestResponsesBehaviour,
68}
69
70pub enum BehaviourOut {
72 RandomKademliaStarted,
74
75 InboundRequest {
79 protocol: ProtocolName,
81 result: Result<Duration, ResponseFailure>,
84 },
85
86 RequestFinished {
90 protocol: ProtocolName,
92 duration: Duration,
94 result: Result<(), RequestFailure>,
96 },
97
98 ReputationChanges { peer: PeerId, changes: Vec<ReputationChange> },
100
101 NotificationStreamOpened {
105 remote: PeerId,
107 set_id: SetId,
109 direction: Direction,
111 negotiated_fallback: Option<ProtocolName>,
116 notifications_sink: NotificationsSink,
118 received_handshake: Vec<u8>,
120 },
121
122 NotificationStreamReplaced {
128 remote: PeerId,
130 set_id: SetId,
132 notifications_sink: NotificationsSink,
134 },
135
136 NotificationStreamClosed {
139 remote: PeerId,
141 set_id: SetId,
143 },
144
145 NotificationsReceived {
147 remote: PeerId,
149 set_id: SetId,
151 notification: Vec<u8>,
153 },
154
155 PeerIdentify {
158 peer_id: PeerId,
160 info: IdentifyInfo,
162 },
163
164 Discovered(PeerId),
166
167 Dht(DhtEvent, Option<Duration>),
171
172 None,
174}
175
176impl<B: BlockT> Behaviour<B> {
177 pub fn new(
179 substrate: Protocol<B>,
180 user_agent: String,
181 local_public_key: PublicKey,
182 disco_config: DiscoveryConfig,
183 request_response_protocols: Vec<ProtocolConfig>,
184 peer_store_handle: Arc<dyn PeerStoreProvider>,
185 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
186 connection_limits: ConnectionLimits,
187 ) -> Result<Self, request_responses::RegisterError> {
188 Ok(Self {
189 substrate,
190 peer_info: peer_info::PeerInfoBehaviour::new(
191 user_agent,
192 local_public_key,
193 external_addresses,
194 ),
195 discovery: disco_config.finish(),
196 request_responses: request_responses::RequestResponsesBehaviour::new(
197 request_response_protocols.into_iter(),
198 peer_store_handle,
199 )?,
200 connection_limits: libp2p::connection_limits::Behaviour::new(connection_limits),
201 })
202 }
203
204 pub fn known_peers(&mut self) -> HashSet<PeerId> {
206 self.discovery.known_peers()
207 }
208
209 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
211 self.discovery.add_known_address(peer_id, addr)
212 }
213
214 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
218 self.discovery.num_entries_per_kbucket()
219 }
220
221 pub fn num_kademlia_records(&mut self) -> Option<usize> {
223 self.discovery.num_kademlia_records()
224 }
225
226 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
228 self.discovery.kademlia_records_total_size()
229 }
230
231 pub fn node(&self, peer_id: &PeerId) -> Option<peer_info::Node> {
237 self.peer_info.node(peer_id)
238 }
239
240 pub fn send_request(
242 &mut self,
243 target: &PeerId,
244 protocol: ProtocolName,
245 request: Vec<u8>,
246 fallback_request: Option<(Vec<u8>, ProtocolName)>,
247 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
248 connect: IfDisconnected,
249 ) {
250 self.request_responses.send_request(
251 target,
252 protocol,
253 request,
254 fallback_request,
255 pending_response,
256 connect,
257 )
258 }
259
260 pub fn user_protocol(&self) -> &Protocol<B> {
262 &self.substrate
263 }
264
265 pub fn user_protocol_mut(&mut self) -> &mut Protocol<B> {
267 &mut self.substrate
268 }
269
270 pub fn add_self_reported_address_to_dht(
273 &mut self,
274 peer_id: &PeerId,
275 supported_protocols: &[StreamProtocol],
276 addr: Multiaddr,
277 ) {
278 self.discovery.add_self_reported_address(peer_id, supported_protocols, addr);
279 }
280
281 pub fn get_value(&mut self, key: RecordKey) {
284 self.discovery.get_value(key);
285 }
286
287 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
290 self.discovery.put_value(key, value);
291 }
292
293 pub fn put_record_to(
295 &mut self,
296 record: Record,
297 peers: HashSet<sc_network_types::PeerId>,
298 update_local_storage: bool,
299 ) {
300 self.discovery.put_record_to(record, peers, update_local_storage);
301 }
302
303 pub fn store_record(
305 &mut self,
306 record_key: RecordKey,
307 record_value: Vec<u8>,
308 publisher: Option<PeerId>,
309 expires: Option<Instant>,
310 ) {
311 self.discovery.store_record(record_key, record_value, publisher, expires);
312 }
313}
314
315impl From<CustomMessageOutcome> for BehaviourOut {
316 fn from(event: CustomMessageOutcome) -> Self {
317 match event {
318 CustomMessageOutcome::NotificationStreamOpened {
319 remote,
320 set_id,
321 direction,
322 negotiated_fallback,
323 received_handshake,
324 notifications_sink,
325 } => BehaviourOut::NotificationStreamOpened {
326 remote,
327 set_id,
328 direction,
329 negotiated_fallback,
330 received_handshake,
331 notifications_sink,
332 },
333 CustomMessageOutcome::NotificationStreamReplaced {
334 remote,
335 set_id,
336 notifications_sink,
337 } => BehaviourOut::NotificationStreamReplaced { remote, set_id, notifications_sink },
338 CustomMessageOutcome::NotificationStreamClosed { remote, set_id } =>
339 BehaviourOut::NotificationStreamClosed { remote, set_id },
340 CustomMessageOutcome::NotificationsReceived { remote, set_id, notification } =>
341 BehaviourOut::NotificationsReceived { remote, set_id, notification },
342 }
343 }
344}
345
346impl From<request_responses::Event> for BehaviourOut {
347 fn from(event: request_responses::Event) -> Self {
348 match event {
349 request_responses::Event::InboundRequest { protocol, result, .. } =>
350 BehaviourOut::InboundRequest { protocol, result },
351 request_responses::Event::RequestFinished { protocol, duration, result, .. } =>
352 BehaviourOut::RequestFinished { protocol, duration, result },
353 request_responses::Event::ReputationChanges { peer, changes } =>
354 BehaviourOut::ReputationChanges { peer, changes },
355 }
356 }
357}
358
359impl From<peer_info::PeerInfoEvent> for BehaviourOut {
360 fn from(event: peer_info::PeerInfoEvent) -> Self {
361 let peer_info::PeerInfoEvent::Identified { peer_id, info } = event;
362 BehaviourOut::PeerIdentify { peer_id, info }
363 }
364}
365
366impl From<DiscoveryOut> for BehaviourOut {
367 fn from(event: DiscoveryOut) -> Self {
368 match event {
369 DiscoveryOut::UnroutablePeer(_peer_id) => {
370 BehaviourOut::None
375 },
376 DiscoveryOut::Discovered(peer_id) => BehaviourOut::Discovered(peer_id),
377 DiscoveryOut::ValueFound(results, duration) =>
378 BehaviourOut::Dht(DhtEvent::ValueFound(results), Some(duration)),
379 DiscoveryOut::ValueNotFound(key, duration) =>
380 BehaviourOut::Dht(DhtEvent::ValueNotFound(key), Some(duration)),
381 DiscoveryOut::ValuePut(key, duration) =>
382 BehaviourOut::Dht(DhtEvent::ValuePut(key), Some(duration)),
383 DiscoveryOut::PutRecordRequest(record_key, record_value, publisher, expires) =>
384 BehaviourOut::Dht(
385 DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires),
386 None,
387 ),
388 DiscoveryOut::ValuePutFailed(key, duration) =>
389 BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), Some(duration)),
390 DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
391 }
392 }
393}
394
395impl From<void::Void> for BehaviourOut {
396 fn from(e: void::Void) -> Self {
397 void::unreachable(e)
398 }
399}