referrerpolicy=no-referrer-when-downgrade

polkadot_network_bridge/
validator_discovery.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! A validator discovery service for the Network Bridge.
18
19use crate::Network;
20
21use core::marker::PhantomData;
22use std::collections::HashSet;
23
24use futures::channel::oneshot;
25
26use sc_network::multiaddr::{self, Multiaddr};
27
28pub use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
29use polkadot_node_network_protocol::{
30	peer_set::{PeerSet, PeerSetProtocolNames, PerPeerSet},
31	PeerId,
32};
33use polkadot_primitives::AuthorityDiscoveryId;
34
35const LOG_TARGET: &str = "parachain::validator-discovery";
36
37pub(super) struct Service<N, AD> {
38	state: PerPeerSet<StatePerPeerSet>,
39	peerset_protocol_names: PeerSetProtocolNames,
40	// PhantomData used to make the struct generic instead of having generic methods
41	_phantom: PhantomData<(N, AD)>,
42}
43
44#[derive(Default)]
45struct StatePerPeerSet {
46	previously_requested: HashSet<PeerId>,
47}
48
49impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
50	pub fn new(peerset_protocol_names: PeerSetProtocolNames) -> Self {
51		Self { state: Default::default(), peerset_protocol_names, _phantom: PhantomData }
52	}
53
54	/// Connect to already resolved addresses.
55	pub async fn on_resolved_request(
56		&mut self,
57		newly_requested: HashSet<Multiaddr>,
58		peer_set: PeerSet,
59		mut network_service: N,
60	) -> N {
61		let state = &mut self.state[peer_set];
62		let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
63		let num_peers = new_peer_ids.len();
64
65		let peers_to_remove: Vec<PeerId> =
66			state.previously_requested.difference(&new_peer_ids).cloned().collect();
67		let removed = peers_to_remove.len();
68		state.previously_requested = new_peer_ids;
69
70		gum::debug!(
71			target: LOG_TARGET,
72			?peer_set,
73			?num_peers,
74			?removed,
75			"New ConnectToValidators resolved request",
76		);
77		// ask the network to connect to these nodes and not disconnect
78		// from them until removed from the set
79		//
80		// for peer-set management, the main protocol name should be used regardless of
81		// the negotiated version.
82		if let Err(e) = network_service
83			.set_reserved_peers(
84				self.peerset_protocol_names.get_main_name(peer_set),
85				newly_requested,
86			)
87			.await
88		{
89			gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
90		}
91
92		network_service
93	}
94
95	/// Connect to already resolved addresses.
96	pub async fn on_add_to_resolved_request(
97		&mut self,
98		newly_requested: HashSet<Multiaddr>,
99		peer_set: PeerSet,
100		mut network_service: N,
101	) -> N {
102		let state = &mut self.state[peer_set];
103		let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
104		let num_peers = new_peer_ids.len();
105
106		state.previously_requested.extend(new_peer_ids);
107
108		gum::debug!(
109			target: LOG_TARGET,
110			?peer_set,
111			?num_peers,
112			"New add to resolved validators request",
113		);
114
115		// ask the network to connect to these nodes and not disconnect
116		// from them until they are removed from the set.
117		//
118		// for peer-set management, the main protocol name should be used regardless of
119		// the negotiated version.
120		if let Err(e) = network_service
121			.add_peers_to_reserved_set(
122				self.peerset_protocol_names.get_main_name(peer_set),
123				newly_requested,
124			)
125			.await
126		{
127			gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
128		}
129
130		network_service
131	}
132
133	/// On a new connection request, a peer set update will be issued.
134	/// It will ask the network to connect to the validators and not disconnect
135	/// from them at least until the next request is issued for the same peer set.
136	///
137	/// This method will also disconnect from previously connected validators not in the
138	/// `validator_ids` set. it takes `network_service` and `authority_discovery_service` by value
139	/// and returns them as a workaround for the Future: Send requirement imposed by async function
140	/// implementation.
141	pub async fn on_request(
142		&mut self,
143		validator_ids: Vec<AuthorityDiscoveryId>,
144		peer_set: PeerSet,
145		failed: oneshot::Sender<usize>,
146		network_service: N,
147		mut authority_discovery_service: AD,
148	) -> (N, AD) {
149		// collect multiaddress of validators
150		let mut failed_to_resolve: usize = 0;
151		let mut newly_requested = HashSet::new();
152		let requested = validator_ids.len();
153		for authority in validator_ids.into_iter() {
154			let result = authority_discovery_service
155				.get_addresses_by_authority_id(authority.clone())
156				.await;
157			if let Some(addresses) = result {
158				newly_requested.extend(addresses);
159			} else {
160				failed_to_resolve += 1;
161				gum::debug!(
162					target: LOG_TARGET,
163					"Authority Discovery couldn't resolve {:?}",
164					authority
165				);
166			}
167		}
168
169		gum::debug!(
170			target: LOG_TARGET,
171			?peer_set,
172			?requested,
173			?failed_to_resolve,
174			"New ConnectToValidators request",
175		);
176
177		let r = self.on_resolved_request(newly_requested, peer_set, network_service).await;
178
179		let _ = failed.send(failed_to_resolve);
180
181		(r, authority_discovery_service)
182	}
183}
184
185fn extract_peer_ids(multiaddr: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
186	multiaddr
187		.filter_map(|mut addr| match addr.pop() {
188			Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
189			_ => None,
190		})
191		.collect()
192}
193
194#[cfg(test)]
195mod tests {
196	use super::*;
197	use crate::network::Network;
198
199	use async_trait::async_trait;
200	use polkadot_node_network_protocol::{
201		request_response::{outgoing::Requests, ReqProtocolNames},
202		PeerId,
203	};
204	use polkadot_primitives::Hash;
205	use sc_network::{IfDisconnected, ProtocolName, ReputationChange};
206	use sp_keyring::Sr25519Keyring;
207	use std::collections::{HashMap, HashSet};
208
209	fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> {
210		let genesis_hash = Hash::repeat_byte(0xff);
211		let fork_id = None;
212		let protocol_names = PeerSetProtocolNames::new(genesis_hash, fork_id);
213
214		Service::new(protocol_names)
215	}
216
217	fn new_network() -> (TestNetwork, TestAuthorityDiscovery) {
218		(TestNetwork::default(), TestAuthorityDiscovery::new())
219	}
220
221	#[derive(Default, Clone)]
222	struct TestNetwork {
223		peers_set: HashSet<PeerId>,
224	}
225
226	#[derive(Default, Clone, Debug)]
227	struct TestAuthorityDiscovery {
228		by_authority_id: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,
229		by_peer_id: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
230	}
231
232	impl TestAuthorityDiscovery {
233		fn new() -> Self {
234			let peer_ids = known_peer_ids();
235			let authorities = known_authorities();
236			let multiaddr = known_multiaddr().into_iter().zip(peer_ids.iter().cloned()).map(
237				|(mut addr, peer_id)| {
238					addr.push(multiaddr::Protocol::P2p(peer_id.into()));
239					HashSet::from([addr])
240				},
241			);
242			Self {
243				by_authority_id: authorities.iter().cloned().zip(multiaddr).collect(),
244				by_peer_id: peer_ids
245					.into_iter()
246					.zip(authorities.into_iter().map(|a| HashSet::from([a])))
247					.collect(),
248			}
249		}
250	}
251
252	#[async_trait]
253	impl Network for TestNetwork {
254		async fn set_reserved_peers(
255			&mut self,
256			_protocol: ProtocolName,
257			multiaddresses: HashSet<Multiaddr>,
258		) -> Result<(), String> {
259			self.peers_set = extract_peer_ids(multiaddresses.into_iter());
260			Ok(())
261		}
262
263		async fn add_peers_to_reserved_set(
264			&mut self,
265			_protocol: ProtocolName,
266			multiaddresses: HashSet<Multiaddr>,
267		) -> Result<(), String> {
268			self.peers_set.extend(extract_peer_ids(multiaddresses.into_iter()));
269			Ok(())
270		}
271
272		async fn remove_from_peers_set(
273			&mut self,
274			_protocol: ProtocolName,
275			peers: Vec<PeerId>,
276		) -> Result<(), String> {
277			self.peers_set.retain(|elem| !peers.contains(elem));
278			Ok(())
279		}
280
281		async fn start_request<AD: AuthorityDiscovery>(
282			&self,
283			_: &mut AD,
284			_: Requests,
285			_: &ReqProtocolNames,
286			_: IfDisconnected,
287		) {
288		}
289
290		fn report_peer(&self, _: PeerId, _: ReputationChange) {
291			panic!()
292		}
293
294		fn disconnect_peer(&self, _: PeerId, _: ProtocolName) {
295			panic!()
296		}
297
298		fn peer_role(
299			&self,
300			_peer_id: PeerId,
301			_handshake: Vec<u8>,
302		) -> Option<sc_network::ObservedRole> {
303			panic!()
304		}
305	}
306
307	#[async_trait]
308	impl AuthorityDiscovery for TestAuthorityDiscovery {
309		async fn get_addresses_by_authority_id(
310			&mut self,
311			authority: AuthorityDiscoveryId,
312		) -> Option<HashSet<Multiaddr>> {
313			self.by_authority_id.get(&authority).cloned()
314		}
315
316		async fn get_authority_ids_by_peer_id(
317			&mut self,
318			peer_id: PeerId,
319		) -> Option<HashSet<AuthorityDiscoveryId>> {
320			self.by_peer_id.get(&peer_id).cloned()
321		}
322	}
323
324	fn known_authorities() -> Vec<AuthorityDiscoveryId> {
325		[Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie]
326			.iter()
327			.map(|k| k.public().into())
328			.collect()
329	}
330
331	fn known_peer_ids() -> Vec<PeerId> {
332		(0..3).map(|_| PeerId::random()).collect()
333	}
334
335	fn known_multiaddr() -> Vec<Multiaddr> {
336		vec![
337			"/ip4/127.0.0.1/tcp/1234".parse().unwrap(),
338			"/ip4/127.0.0.1/tcp/1235".parse().unwrap(),
339			"/ip4/127.0.0.1/tcp/1236".parse().unwrap(),
340		]
341	}
342	// Test cleanup works.
343	#[test]
344	fn old_multiaddrs_are_removed_on_new_request() {
345		let mut service = new_service();
346
347		let (ns, ads) = new_network();
348
349		let authority_ids: Vec<_> =
350			ads.by_peer_id.values().flat_map(|v| v.iter()).cloned().collect();
351
352		futures::executor::block_on(async move {
353			let (failed, _) = oneshot::channel();
354			let (ns, ads) = service
355				.on_request(vec![authority_ids[0].clone()], PeerSet::Validation, failed, ns, ads)
356				.await;
357
358			let (failed, _) = oneshot::channel();
359			let (_, ads) = service
360				.on_request(vec![authority_ids[1].clone()], PeerSet::Validation, failed, ns, ads)
361				.await;
362
363			let state = &service.state[PeerSet::Validation];
364			assert_eq!(state.previously_requested.len(), 1);
365			let peer_1 = extract_peer_ids(
366				ads.by_authority_id.get(&authority_ids[1]).unwrap().clone().into_iter(),
367			)
368			.iter()
369			.cloned()
370			.next()
371			.unwrap();
372			assert!(state.previously_requested.contains(&peer_1));
373		});
374	}
375
376	#[test]
377	fn failed_resolution_is_reported_properly() {
378		let mut service = new_service();
379
380		let (ns, ads) = new_network();
381
382		let authority_ids: Vec<_> =
383			ads.by_peer_id.values().flat_map(|v| v.iter()).cloned().collect();
384
385		futures::executor::block_on(async move {
386			let (failed, failed_rx) = oneshot::channel();
387			let unknown = Sr25519Keyring::Ferdie.public().into();
388			let (_, ads) = service
389				.on_request(
390					vec![authority_ids[0].clone(), unknown],
391					PeerSet::Validation,
392					failed,
393					ns,
394					ads,
395				)
396				.await;
397
398			let state = &service.state[PeerSet::Validation];
399			assert_eq!(state.previously_requested.len(), 1);
400			let peer_0 = extract_peer_ids(
401				ads.by_authority_id.get(&authority_ids[0]).unwrap().clone().into_iter(),
402			)
403			.iter()
404			.cloned()
405			.next()
406			.unwrap();
407			assert!(state.previously_requested.contains(&peer_0));
408
409			let failed = failed_rx.await.unwrap();
410			assert_eq!(failed, 1);
411		});
412	}
413}