1use crate::Network;
20
21use core::marker::PhantomData;
22use std::collections::HashSet;
23
24use futures::channel::oneshot;
25
26use sc_network::multiaddr::{self, Multiaddr};
27
28pub use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
29use polkadot_node_network_protocol::{
30 peer_set::{PeerSet, PeerSetProtocolNames, PerPeerSet},
31 PeerId,
32};
33use polkadot_primitives::AuthorityDiscoveryId;
34
35const LOG_TARGET: &str = "parachain::validator-discovery";
36
37pub(super) struct Service<N, AD> {
38 state: PerPeerSet<StatePerPeerSet>,
39 peerset_protocol_names: PeerSetProtocolNames,
40 _phantom: PhantomData<(N, AD)>,
42}
43
44#[derive(Default)]
45struct StatePerPeerSet {
46 previously_requested: HashSet<PeerId>,
47}
48
49impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
50 pub fn new(peerset_protocol_names: PeerSetProtocolNames) -> Self {
51 Self { state: Default::default(), peerset_protocol_names, _phantom: PhantomData }
52 }
53
54 pub async fn on_resolved_request(
56 &mut self,
57 newly_requested: HashSet<Multiaddr>,
58 peer_set: PeerSet,
59 mut network_service: N,
60 ) -> N {
61 let state = &mut self.state[peer_set];
62 let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
63 let num_peers = new_peer_ids.len();
64
65 let peers_to_remove: Vec<PeerId> =
66 state.previously_requested.difference(&new_peer_ids).cloned().collect();
67 let removed = peers_to_remove.len();
68 state.previously_requested = new_peer_ids;
69
70 gum::debug!(
71 target: LOG_TARGET,
72 ?peer_set,
73 ?num_peers,
74 ?removed,
75 "New ConnectToValidators resolved request",
76 );
77 if let Err(e) = network_service
83 .set_reserved_peers(
84 self.peerset_protocol_names.get_main_name(peer_set),
85 newly_requested,
86 )
87 .await
88 {
89 gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
90 }
91
92 network_service
93 }
94
95 pub async fn on_add_to_resolved_request(
97 &mut self,
98 newly_requested: HashSet<Multiaddr>,
99 peer_set: PeerSet,
100 mut network_service: N,
101 ) -> N {
102 let state = &mut self.state[peer_set];
103 let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
104 let num_peers = new_peer_ids.len();
105
106 state.previously_requested.extend(new_peer_ids);
107
108 gum::debug!(
109 target: LOG_TARGET,
110 ?peer_set,
111 ?num_peers,
112 "New add to resolved validators request",
113 );
114
115 if let Err(e) = network_service
121 .add_peers_to_reserved_set(
122 self.peerset_protocol_names.get_main_name(peer_set),
123 newly_requested,
124 )
125 .await
126 {
127 gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
128 }
129
130 network_service
131 }
132
133 pub async fn on_request(
142 &mut self,
143 validator_ids: Vec<AuthorityDiscoveryId>,
144 peer_set: PeerSet,
145 failed: oneshot::Sender<usize>,
146 network_service: N,
147 mut authority_discovery_service: AD,
148 ) -> (N, AD) {
149 let mut failed_to_resolve: usize = 0;
151 let mut newly_requested = HashSet::new();
152 let requested = validator_ids.len();
153 for authority in validator_ids.into_iter() {
154 let result = authority_discovery_service
155 .get_addresses_by_authority_id(authority.clone())
156 .await;
157 if let Some(addresses) = result {
158 newly_requested.extend(addresses);
159 } else {
160 failed_to_resolve += 1;
161 gum::debug!(
162 target: LOG_TARGET,
163 "Authority Discovery couldn't resolve {:?}",
164 authority
165 );
166 }
167 }
168
169 gum::debug!(
170 target: LOG_TARGET,
171 ?peer_set,
172 ?requested,
173 ?failed_to_resolve,
174 "New ConnectToValidators request",
175 );
176
177 let r = self.on_resolved_request(newly_requested, peer_set, network_service).await;
178
179 let _ = failed.send(failed_to_resolve);
180
181 (r, authority_discovery_service)
182 }
183}
184
185fn extract_peer_ids(multiaddr: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
186 multiaddr
187 .filter_map(|mut addr| match addr.pop() {
188 Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
189 _ => None,
190 })
191 .collect()
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use crate::network::Network;
198
199 use async_trait::async_trait;
200 use polkadot_node_network_protocol::{
201 request_response::{outgoing::Requests, ReqProtocolNames},
202 PeerId,
203 };
204 use polkadot_primitives::Hash;
205 use sc_network::{IfDisconnected, ProtocolName, ReputationChange};
206 use sp_keyring::Sr25519Keyring;
207 use std::collections::{HashMap, HashSet};
208
209 fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> {
210 let genesis_hash = Hash::repeat_byte(0xff);
211 let fork_id = None;
212 let protocol_names = PeerSetProtocolNames::new(genesis_hash, fork_id);
213
214 Service::new(protocol_names)
215 }
216
217 fn new_network() -> (TestNetwork, TestAuthorityDiscovery) {
218 (TestNetwork::default(), TestAuthorityDiscovery::new())
219 }
220
221 #[derive(Default, Clone)]
222 struct TestNetwork {
223 peers_set: HashSet<PeerId>,
224 }
225
226 #[derive(Default, Clone, Debug)]
227 struct TestAuthorityDiscovery {
228 by_authority_id: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,
229 by_peer_id: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
230 }
231
232 impl TestAuthorityDiscovery {
233 fn new() -> Self {
234 let peer_ids = known_peer_ids();
235 let authorities = known_authorities();
236 let multiaddr = known_multiaddr().into_iter().zip(peer_ids.iter().cloned()).map(
237 |(mut addr, peer_id)| {
238 addr.push(multiaddr::Protocol::P2p(peer_id.into()));
239 HashSet::from([addr])
240 },
241 );
242 Self {
243 by_authority_id: authorities.iter().cloned().zip(multiaddr).collect(),
244 by_peer_id: peer_ids
245 .into_iter()
246 .zip(authorities.into_iter().map(|a| HashSet::from([a])))
247 .collect(),
248 }
249 }
250 }
251
252 #[async_trait]
253 impl Network for TestNetwork {
254 async fn set_reserved_peers(
255 &mut self,
256 _protocol: ProtocolName,
257 multiaddresses: HashSet<Multiaddr>,
258 ) -> Result<(), String> {
259 self.peers_set = extract_peer_ids(multiaddresses.into_iter());
260 Ok(())
261 }
262
263 async fn add_peers_to_reserved_set(
264 &mut self,
265 _protocol: ProtocolName,
266 multiaddresses: HashSet<Multiaddr>,
267 ) -> Result<(), String> {
268 self.peers_set.extend(extract_peer_ids(multiaddresses.into_iter()));
269 Ok(())
270 }
271
272 async fn remove_from_peers_set(
273 &mut self,
274 _protocol: ProtocolName,
275 peers: Vec<PeerId>,
276 ) -> Result<(), String> {
277 self.peers_set.retain(|elem| !peers.contains(elem));
278 Ok(())
279 }
280
281 async fn start_request<AD: AuthorityDiscovery>(
282 &self,
283 _: &mut AD,
284 _: Requests,
285 _: &ReqProtocolNames,
286 _: IfDisconnected,
287 ) {
288 }
289
290 fn report_peer(&self, _: PeerId, _: ReputationChange) {
291 panic!()
292 }
293
294 fn disconnect_peer(&self, _: PeerId, _: ProtocolName) {
295 panic!()
296 }
297
298 fn peer_role(
299 &self,
300 _peer_id: PeerId,
301 _handshake: Vec<u8>,
302 ) -> Option<sc_network::ObservedRole> {
303 panic!()
304 }
305 }
306
307 #[async_trait]
308 impl AuthorityDiscovery for TestAuthorityDiscovery {
309 async fn get_addresses_by_authority_id(
310 &mut self,
311 authority: AuthorityDiscoveryId,
312 ) -> Option<HashSet<Multiaddr>> {
313 self.by_authority_id.get(&authority).cloned()
314 }
315
316 async fn get_authority_ids_by_peer_id(
317 &mut self,
318 peer_id: PeerId,
319 ) -> Option<HashSet<AuthorityDiscoveryId>> {
320 self.by_peer_id.get(&peer_id).cloned()
321 }
322 }
323
324 fn known_authorities() -> Vec<AuthorityDiscoveryId> {
325 [Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie]
326 .iter()
327 .map(|k| k.public().into())
328 .collect()
329 }
330
331 fn known_peer_ids() -> Vec<PeerId> {
332 (0..3).map(|_| PeerId::random()).collect()
333 }
334
335 fn known_multiaddr() -> Vec<Multiaddr> {
336 vec![
337 "/ip4/127.0.0.1/tcp/1234".parse().unwrap(),
338 "/ip4/127.0.0.1/tcp/1235".parse().unwrap(),
339 "/ip4/127.0.0.1/tcp/1236".parse().unwrap(),
340 ]
341 }
342 #[test]
344 fn old_multiaddrs_are_removed_on_new_request() {
345 let mut service = new_service();
346
347 let (ns, ads) = new_network();
348
349 let authority_ids: Vec<_> =
350 ads.by_peer_id.values().flat_map(|v| v.iter()).cloned().collect();
351
352 futures::executor::block_on(async move {
353 let (failed, _) = oneshot::channel();
354 let (ns, ads) = service
355 .on_request(vec![authority_ids[0].clone()], PeerSet::Validation, failed, ns, ads)
356 .await;
357
358 let (failed, _) = oneshot::channel();
359 let (_, ads) = service
360 .on_request(vec![authority_ids[1].clone()], PeerSet::Validation, failed, ns, ads)
361 .await;
362
363 let state = &service.state[PeerSet::Validation];
364 assert_eq!(state.previously_requested.len(), 1);
365 let peer_1 = extract_peer_ids(
366 ads.by_authority_id.get(&authority_ids[1]).unwrap().clone().into_iter(),
367 )
368 .iter()
369 .cloned()
370 .next()
371 .unwrap();
372 assert!(state.previously_requested.contains(&peer_1));
373 });
374 }
375
376 #[test]
377 fn failed_resolution_is_reported_properly() {
378 let mut service = new_service();
379
380 let (ns, ads) = new_network();
381
382 let authority_ids: Vec<_> =
383 ads.by_peer_id.values().flat_map(|v| v.iter()).cloned().collect();
384
385 futures::executor::block_on(async move {
386 let (failed, failed_rx) = oneshot::channel();
387 let unknown = Sr25519Keyring::Ferdie.public().into();
388 let (_, ads) = service
389 .on_request(
390 vec![authority_ids[0].clone(), unknown],
391 PeerSet::Validation,
392 failed,
393 ns,
394 ads,
395 )
396 .await;
397
398 let state = &service.state[PeerSet::Validation];
399 assert_eq!(state.previously_requested.len(), 1);
400 let peer_0 = extract_peer_ids(
401 ads.by_authority_id.get(&authority_ids[0]).unwrap().clone().into_iter(),
402 )
403 .iter()
404 .cloned()
405 .next()
406 .unwrap();
407 assert!(state.previously_requested.contains(&peer_0));
408
409 let failed = failed_rx.await.unwrap();
410 assert_eq!(failed, 1);
411 });
412 }
413}