1use crate::{
20 discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
21 event::DhtEvent,
22 peer_info,
23 peer_store::PeerStoreProvider,
24 protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
25 protocol_controller::SetId,
26 request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure},
27 service::traits::Direction,
28 types::ProtocolName,
29 ReputationChange,
30};
31
32use futures::channel::oneshot;
33use libp2p::{
34 connection_limits::ConnectionLimits,
35 core::Multiaddr,
36 identify::Info as IdentifyInfo,
37 identity::PublicKey,
38 kad::{Record, RecordKey},
39 swarm::NetworkBehaviour,
40 PeerId, StreamProtocol,
41};
42
43use parking_lot::Mutex;
44use sp_runtime::traits::Block as BlockT;
45use std::{
46 collections::HashSet,
47 sync::Arc,
48 time::{Duration, Instant},
49};
50
51pub use crate::request_responses::{InboundFailure, OutboundFailure, ResponseFailure};
52
53#[derive(NetworkBehaviour)]
55#[behaviour(to_swarm = "BehaviourOut")]
56pub struct Behaviour<B: BlockT> {
57 connection_limits: libp2p::connection_limits::Behaviour,
59 substrate: Protocol<B>,
61 peer_info: peer_info::PeerInfoBehaviour,
64 discovery: DiscoveryBehaviour,
66 request_responses: request_responses::RequestResponsesBehaviour,
68}
69
70#[derive(Debug)]
72pub enum BehaviourOut {
73 RandomKademliaStarted,
75
76 InboundRequest {
80 protocol: ProtocolName,
82 result: Result<Duration, ResponseFailure>,
85 },
86
87 RequestFinished {
91 protocol: ProtocolName,
93 duration: Duration,
95 result: Result<(), RequestFailure>,
97 },
98
99 ReputationChanges { peer: PeerId, changes: Vec<ReputationChange> },
101
102 NotificationStreamOpened {
106 remote: PeerId,
108 set_id: SetId,
110 direction: Direction,
112 negotiated_fallback: Option<ProtocolName>,
117 notifications_sink: NotificationsSink,
119 received_handshake: Vec<u8>,
121 },
122
123 NotificationStreamReplaced {
129 remote: PeerId,
131 set_id: SetId,
133 notifications_sink: NotificationsSink,
135 },
136
137 NotificationStreamClosed {
140 remote: PeerId,
142 set_id: SetId,
144 },
145
146 NotificationsReceived {
148 remote: PeerId,
150 set_id: SetId,
152 notification: Vec<u8>,
154 },
155
156 PeerIdentify {
159 peer_id: PeerId,
161 info: IdentifyInfo,
163 },
164
165 Discovered(PeerId),
167
168 Dht(DhtEvent, Option<Duration>),
172
173 None,
175}
176
177impl<B: BlockT> Behaviour<B> {
178 pub fn new(
180 substrate: Protocol<B>,
181 user_agent: String,
182 local_public_key: PublicKey,
183 disco_config: DiscoveryConfig,
184 request_response_protocols: Vec<ProtocolConfig>,
185 peer_store_handle: Arc<dyn PeerStoreProvider>,
186 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
187 public_addresses: Vec<Multiaddr>,
188 connection_limits: ConnectionLimits,
189 ) -> Result<Self, request_responses::RegisterError> {
190 Ok(Self {
191 substrate,
192 peer_info: peer_info::PeerInfoBehaviour::new(
193 user_agent,
194 local_public_key,
195 external_addresses,
196 public_addresses,
197 ),
198 discovery: disco_config.finish(),
199 request_responses: request_responses::RequestResponsesBehaviour::new(
200 request_response_protocols.into_iter(),
201 peer_store_handle,
202 )?,
203 connection_limits: libp2p::connection_limits::Behaviour::new(connection_limits),
204 })
205 }
206
207 pub fn known_peers(&mut self) -> HashSet<PeerId> {
209 self.discovery.known_peers()
210 }
211
212 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
214 self.discovery.add_known_address(peer_id, addr)
215 }
216
217 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
221 self.discovery.num_entries_per_kbucket()
222 }
223
224 pub fn num_kademlia_records(&mut self) -> Option<usize> {
226 self.discovery.num_kademlia_records()
227 }
228
229 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
231 self.discovery.kademlia_records_total_size()
232 }
233
234 pub fn node(&self, peer_id: &PeerId) -> Option<peer_info::Node> {
240 self.peer_info.node(peer_id)
241 }
242
243 pub fn send_request(
245 &mut self,
246 target: &PeerId,
247 protocol: ProtocolName,
248 request: Vec<u8>,
249 fallback_request: Option<(Vec<u8>, ProtocolName)>,
250 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
251 connect: IfDisconnected,
252 ) {
253 self.request_responses.send_request(
254 target,
255 protocol,
256 request,
257 fallback_request,
258 pending_response,
259 connect,
260 )
261 }
262
263 pub fn user_protocol(&self) -> &Protocol<B> {
265 &self.substrate
266 }
267
268 pub fn user_protocol_mut(&mut self) -> &mut Protocol<B> {
270 &mut self.substrate
271 }
272
273 pub fn add_self_reported_address_to_dht(
276 &mut self,
277 peer_id: &PeerId,
278 supported_protocols: &[StreamProtocol],
279 addr: Multiaddr,
280 ) {
281 self.discovery.add_self_reported_address(peer_id, supported_protocols, addr);
282 }
283
284 pub fn find_closest_peers(&mut self, target: PeerId) {
287 self.discovery.find_closest_peers(target);
288 }
289
290 pub fn get_value(&mut self, key: RecordKey) {
293 self.discovery.get_value(key);
294 }
295
296 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
299 self.discovery.put_value(key, value);
300 }
301
302 pub fn put_record_to(
304 &mut self,
305 record: Record,
306 peers: HashSet<sc_network_types::PeerId>,
307 update_local_storage: bool,
308 ) {
309 self.discovery.put_record_to(record, peers, update_local_storage);
310 }
311
312 pub fn store_record(
314 &mut self,
315 record_key: RecordKey,
316 record_value: Vec<u8>,
317 publisher: Option<PeerId>,
318 expires: Option<Instant>,
319 ) {
320 self.discovery.store_record(record_key, record_value, publisher, expires);
321 }
322
323 pub fn start_providing(&mut self, key: RecordKey) {
325 self.discovery.start_providing(key)
326 }
327
328 pub fn stop_providing(&mut self, key: &RecordKey) {
330 self.discovery.stop_providing(key)
331 }
332
333 pub fn get_providers(&mut self, key: RecordKey) {
336 self.discovery.get_providers(key)
337 }
338}
339
340impl From<CustomMessageOutcome> for BehaviourOut {
341 fn from(event: CustomMessageOutcome) -> Self {
342 match event {
343 CustomMessageOutcome::NotificationStreamOpened {
344 remote,
345 set_id,
346 direction,
347 negotiated_fallback,
348 received_handshake,
349 notifications_sink,
350 } => BehaviourOut::NotificationStreamOpened {
351 remote,
352 set_id,
353 direction,
354 negotiated_fallback,
355 received_handshake,
356 notifications_sink,
357 },
358 CustomMessageOutcome::NotificationStreamReplaced {
359 remote,
360 set_id,
361 notifications_sink,
362 } => BehaviourOut::NotificationStreamReplaced { remote, set_id, notifications_sink },
363 CustomMessageOutcome::NotificationStreamClosed { remote, set_id } =>
364 BehaviourOut::NotificationStreamClosed { remote, set_id },
365 CustomMessageOutcome::NotificationsReceived { remote, set_id, notification } =>
366 BehaviourOut::NotificationsReceived { remote, set_id, notification },
367 }
368 }
369}
370
371impl From<request_responses::Event> for BehaviourOut {
372 fn from(event: request_responses::Event) -> Self {
373 match event {
374 request_responses::Event::InboundRequest { protocol, result, .. } =>
375 BehaviourOut::InboundRequest { protocol, result },
376 request_responses::Event::RequestFinished { protocol, duration, result, .. } =>
377 BehaviourOut::RequestFinished { protocol, duration, result },
378 request_responses::Event::ReputationChanges { peer, changes } =>
379 BehaviourOut::ReputationChanges { peer, changes },
380 }
381 }
382}
383
384impl From<peer_info::PeerInfoEvent> for BehaviourOut {
385 fn from(event: peer_info::PeerInfoEvent) -> Self {
386 let peer_info::PeerInfoEvent::Identified { peer_id, info } = event;
387 BehaviourOut::PeerIdentify { peer_id, info }
388 }
389}
390
391impl From<DiscoveryOut> for BehaviourOut {
392 fn from(event: DiscoveryOut) -> Self {
393 match event {
394 DiscoveryOut::UnroutablePeer(_peer_id) => {
395 BehaviourOut::None
400 },
401 DiscoveryOut::Discovered(peer_id) => BehaviourOut::Discovered(peer_id),
402 DiscoveryOut::ClosestPeersFound(target, peers, duration) => BehaviourOut::Dht(
403 DhtEvent::ClosestPeersFound(
404 target.into(),
405 peers
406 .into_iter()
407 .map(|(p, addrs)| (p.into(), addrs.into_iter().map(Into::into).collect()))
408 .collect(),
409 ),
410 Some(duration),
411 ),
412 DiscoveryOut::ClosestPeersNotFound(target, duration) =>
413 BehaviourOut::Dht(DhtEvent::ClosestPeersNotFound(target.into()), Some(duration)),
414 DiscoveryOut::ValueFound(results, duration) =>
415 BehaviourOut::Dht(DhtEvent::ValueFound(results.into()), Some(duration)),
416 DiscoveryOut::ValueNotFound(key, duration) =>
417 BehaviourOut::Dht(DhtEvent::ValueNotFound(key.into()), Some(duration)),
418 DiscoveryOut::ValuePut(key, duration) =>
419 BehaviourOut::Dht(DhtEvent::ValuePut(key.into()), Some(duration)),
420 DiscoveryOut::PutRecordRequest(record_key, record_value, publisher, expires) =>
421 BehaviourOut::Dht(
422 DhtEvent::PutRecordRequest(record_key.into(), record_value, publisher, expires),
423 None,
424 ),
425 DiscoveryOut::ValuePutFailed(key, duration) =>
426 BehaviourOut::Dht(DhtEvent::ValuePutFailed(key.into()), Some(duration)),
427 DiscoveryOut::StartedProviding(key, duration) =>
428 BehaviourOut::Dht(DhtEvent::StartedProviding(key.into()), Some(duration)),
429 DiscoveryOut::StartProvidingFailed(key, duration) =>
430 BehaviourOut::Dht(DhtEvent::StartProvidingFailed(key.into()), Some(duration)),
431 DiscoveryOut::ProvidersFound(key, providers, duration) => BehaviourOut::Dht(
432 DhtEvent::ProvidersFound(
433 key.into(),
434 providers.into_iter().map(Into::into).collect(),
435 ),
436 Some(duration),
437 ),
438 DiscoveryOut::NoMoreProviders(key, duration) =>
439 BehaviourOut::Dht(DhtEvent::NoMoreProviders(key.into()), Some(duration)),
440 DiscoveryOut::ProvidersNotFound(key, duration) =>
441 BehaviourOut::Dht(DhtEvent::ProvidersNotFound(key.into()), Some(duration)),
442 DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
443 }
444 }
445}
446
447impl From<void::Void> for BehaviourOut {
448 fn from(e: void::Void) -> Self {
449 void::unreachable(e)
450 }
451}