referrerpolicy=no-referrer-when-downgrade

polkadot_gossip_support/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! This subsystem is responsible for keeping track of session changes
18//! and issuing a connection request to the relevant validators
19//! on every new session.
20//!
21//! In addition to that, it creates a gossip overlay topology
22//! which limits the amount of messages sent and received
23//! to be an order of sqrt of the validators. Our neighbors
24//! in this graph will be forwarded to the network bridge with
25//! the `NetworkBridgeRxMessage::NewGossipTopology` message.
26
27use std::{
28	collections::{HashMap, HashSet},
29	fmt,
30	time::{Duration, Instant},
31	u32,
32};
33
34use futures::{channel::oneshot, select, FutureExt as _};
35use futures_timer::Delay;
36use rand::{Rng, SeedableRng};
37use rand_chacha::ChaCha20Rng;
38
39use sc_network::{config::parse_addr, Multiaddr};
40use sp_application_crypto::{AppCrypto, ByteArray};
41use sp_keystore::{Keystore, KeystorePtr};
42
43use polkadot_node_network_protocol::{
44	authority_discovery::AuthorityDiscovery, peer_set::PeerSet, GossipSupportNetworkMessage,
45	PeerId, ValidationProtocols,
46};
47use polkadot_node_subsystem::{
48	messages::{
49		ChainApiMessage, GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage,
50		NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest,
51	},
52	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
53};
54use polkadot_node_subsystem_util as util;
55use polkadot_primitives::{AuthorityDiscoveryId, Hash, SessionIndex, SessionInfo, ValidatorIndex};
56
57#[cfg(test)]
58mod tests;
59
60mod metrics;
61
62use metrics::Metrics;
63
64const LOG_TARGET: &str = "parachain::gossip-support";
65// How much time should we wait to reissue a connection request
66// since the last authority discovery resolution failure.
67#[cfg(not(test))]
68const BACKOFF_DURATION: Duration = Duration::from_secs(5);
69
70#[cfg(test)]
71const BACKOFF_DURATION: Duration = Duration::from_millis(500);
72
73// The authorithy_discovery queries runs every ten minutes,
74// so it make sense to run a bit more often than that to
75// detect changes as often as we can, but not too often since
76// it won't help.
77#[cfg(not(test))]
78const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(5 * 60);
79
80#[cfg(test)]
81const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(2);
82
83/// Duration after which we consider low connectivity a problem.
84///
85/// Especially at startup low connectivity is expected (authority discovery cache needs to be
86/// populated). Authority discovery on Kusama takes around 8 minutes, so warning after 10 minutes
87/// should be fine:
88///
89/// https://github.com/paritytech/substrate/blob/fc49802f263529160635471c8a17888846035f5d/client/authority-discovery/src/lib.rs#L88
90const LOW_CONNECTIVITY_WARN_DELAY: Duration = Duration::from_secs(600);
91
92/// If connectivity is lower than this in percent, issue warning in logs.
93const LOW_CONNECTIVITY_WARN_THRESHOLD: usize = 85;
94
95/// The Gossip Support subsystem.
96pub struct GossipSupport<AD> {
97	keystore: KeystorePtr,
98
99	last_session_index: Option<SessionIndex>,
100	/// Whether we are currently an authority or not.
101	is_authority_now: bool,
102	/// The minimum known session we build the topology for.
103	min_known_session: SessionIndex,
104	// Some(timestamp) if we failed to resolve
105	// at least a third of authorities the last time.
106	// `None` otherwise.
107	last_failure: Option<Instant>,
108
109	// Validators can restart during a session, so if they change
110	// their PeerID, we will connect to them in the best case after
111	// a session, so we need to try more often to resolved peers and
112	// reconnect to them. The authorithy_discovery queries runs every ten
113	// minutes, so we can't detect changes in the address more often
114	// that that.
115	last_connection_request: Option<Instant>,
116
117	/// First time we did not reach our connectivity threshold.
118	///
119	/// This is the time of the first failed attempt to connect to >2/3 of all validators in a
120	/// potential sequence of failed attempts. It will be cleared once we reached >2/3
121	/// connectivity.
122	failure_start: Option<Instant>,
123
124	/// Successfully resolved connections
125	///
126	/// waiting for actual connection.
127	resolved_authorities: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,
128
129	/// Actually connected authorities.
130	connected_authorities: HashMap<AuthorityDiscoveryId, PeerId>,
131	/// By `PeerId`.
132	///
133	/// Needed for efficient handling of disconnect events.
134	connected_peers: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
135	/// Authority discovery service.
136	authority_discovery: AD,
137
138	/// The oldest session we need to build a topology for because
139	/// the finalized blocks are from a session we haven't built a topology for.
140	finalized_needed_session: Option<u32>,
141	/// Subsystem metrics.
142	metrics: Metrics,
143}
144
145#[overseer::contextbounds(GossipSupport, prefix = self::overseer)]
146impl<AD> GossipSupport<AD>
147where
148	AD: AuthorityDiscovery,
149{
150	/// Create a new instance of the [`GossipSupport`] subsystem.
151	pub fn new(keystore: KeystorePtr, authority_discovery: AD, metrics: Metrics) -> Self {
152		// Initialize metrics to `0`.
153		metrics.on_is_not_authority();
154		metrics.on_is_not_parachain_validator();
155
156		Self {
157			keystore,
158			last_session_index: None,
159			last_failure: None,
160			last_connection_request: None,
161			failure_start: None,
162			resolved_authorities: HashMap::new(),
163			connected_authorities: HashMap::new(),
164			connected_peers: HashMap::new(),
165			min_known_session: u32::MAX,
166			authority_discovery,
167			finalized_needed_session: None,
168			is_authority_now: false,
169			metrics,
170		}
171	}
172
173	async fn run<Context>(mut self, mut ctx: Context) -> Self {
174		fn get_connectivity_check_delay() -> Delay {
175			Delay::new(LOW_CONNECTIVITY_WARN_DELAY)
176		}
177		let mut next_connectivity_check = get_connectivity_check_delay().fuse();
178		loop {
179			let message = select!(
180				_ = next_connectivity_check => {
181					self.check_connectivity();
182					next_connectivity_check = get_connectivity_check_delay().fuse();
183					continue
184				}
185				result = ctx.recv().fuse() =>
186					match result {
187						Ok(message) => message,
188						Err(e) => {
189							gum::debug!(
190								target: LOG_TARGET,
191								err = ?e,
192								"Failed to receive a message from Overseer, exiting",
193							);
194							return self
195						},
196					}
197			);
198			match message {
199				FromOrchestra::Communication {
200					msg: GossipSupportMessage::NetworkBridgeUpdate(ev),
201				} => self.handle_connect_disconnect(ev),
202				FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
203					activated,
204					..
205				})) => {
206					gum::trace!(target: LOG_TARGET, "active leaves signal");
207
208					let leaves = activated.into_iter().map(|a| a.hash);
209					if let Err(e) = self.handle_active_leaves(ctx.sender(), leaves).await {
210						gum::debug!(target: LOG_TARGET, error = ?e);
211					}
212				},
213				FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, _number)) =>
214					if let Some(session_index) = self.last_session_index {
215						if let Err(e) = self
216							.build_topology_for_last_finalized_if_needed(
217								ctx.sender(),
218								session_index,
219							)
220							.await
221						{
222							gum::warn!(
223								target: LOG_TARGET,
224								"Failed to build topology for last finalized session: {:?}",
225								e
226							);
227						}
228					},
229				FromOrchestra::Signal(OverseerSignal::Conclude) => return self,
230			}
231		}
232	}
233
234	/// 1. Determine if the current session index has changed.
235	/// 2. If it has, determine relevant validators and issue a connection request.
236	async fn handle_active_leaves(
237		&mut self,
238		sender: &mut impl overseer::GossipSupportSenderTrait,
239		leaves: impl Iterator<Item = Hash>,
240	) -> Result<(), util::Error> {
241		for leaf in leaves {
242			let current_index = util::request_session_index_for_child(leaf, sender).await.await??;
243			let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
244			let since_last_reconnect =
245				self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default();
246
247			let force_request = since_failure >= BACKOFF_DURATION;
248			let re_resolve_authorities = since_last_reconnect >= TRY_RERESOLVE_AUTHORITIES;
249			let leaf_session = Some((current_index, leaf));
250			let maybe_new_session = match self.last_session_index {
251				Some(i) if current_index <= i => None,
252				_ => leaf_session,
253			};
254
255			let maybe_issue_connection = if force_request || re_resolve_authorities {
256				leaf_session
257			} else {
258				maybe_new_session
259			};
260
261			if let Some((session_index, relay_parent)) = maybe_issue_connection {
262				let session_info =
263					util::request_session_info(leaf, session_index, sender).await.await??;
264
265				let session_info = match session_info {
266					Some(s) => s,
267					None => {
268						gum::warn!(
269							relay_parent = ?leaf,
270							session_index = self.last_session_index,
271							"Failed to get session info.",
272						);
273
274						continue
275					},
276				};
277
278				// Note: we only update `last_session_index` once we've
279				// successfully gotten the `SessionInfo`.
280				let is_new_session = maybe_new_session.is_some();
281				if is_new_session {
282					gum::debug!(
283						target: LOG_TARGET,
284						%session_index,
285						"New session detected",
286					);
287					self.last_session_index = Some(session_index);
288					self.is_authority_now =
289						ensure_i_am_an_authority(&self.keystore, &session_info.discovery_keys)
290							.is_ok();
291				}
292
293				// Connect to authorities from the past/present/future.
294				//
295				// This is maybe not the right place for this logic to live,
296				// but at the moment we're limited by the network bridge's ability
297				// to handle connection requests (it only allows one, globally).
298				//
299				// Certain network protocols - mostly req/res, but some gossip,
300				// will require being connected to past/future validators as well
301				// as current. That is, the old authority sets are not made obsolete
302				// by virtue of a new session being entered. Therefore we maintain
303				// connections to a much broader set of validators.
304				{
305					let mut connections = authorities_past_present_future(sender, leaf).await?;
306					self.last_connection_request = Some(Instant::now());
307					// Remove all of our locally controlled validator indices so we don't connect to
308					// ourself.
309					let connections =
310						if remove_all_controlled(&self.keystore, &mut connections) != 0 {
311							connections
312						} else {
313							// If we control none of them, issue an empty connection request
314							// to clean up all connections.
315							Vec::new()
316						};
317
318					if force_request || is_new_session {
319						self.issue_connection_request(sender, connections).await;
320					} else if re_resolve_authorities {
321						self.issue_connection_request_to_changed(sender, connections).await;
322					}
323				}
324
325				if is_new_session {
326					if let Err(err) = self
327						.build_topology_for_last_finalized_if_needed(sender, session_index)
328						.await
329					{
330						gum::warn!(
331							target: LOG_TARGET,
332							"Failed to build topology for last finalized session: {:?}",
333							err
334						);
335					}
336
337					// Gossip topology is only relevant for authorities in the current session.
338					let our_index = self.get_key_index_and_update_metrics(&session_info)?;
339					update_gossip_topology(
340						sender,
341						our_index,
342						session_info.discovery_keys.clone(),
343						relay_parent,
344						session_index,
345					)
346					.await?;
347				}
348				// authority_discovery is just a cache so let's try every time we try to re-connect
349				// if new authorities are present.
350				self.update_authority_ids(sender, session_info.discovery_keys).await;
351			}
352		}
353		Ok(())
354	}
355
356	/// Build the gossip topology for the session of the last finalized block if we haven't built
357	/// one.
358	///
359	/// This is needed to ensure that if finality is lagging accross session boundary and a restart
360	/// happens after the new session started, we built a topology from the session we haven't
361	/// finalized the blocks yet.
362	/// Once finalized blocks start to be from a session we've built a topology for, we can stop.
363	async fn build_topology_for_last_finalized_if_needed(
364		&mut self,
365		sender: &mut impl overseer::GossipSupportSenderTrait,
366		current_session_index: u32,
367	) -> Result<(), util::Error> {
368		self.min_known_session = self.min_known_session.min(current_session_index);
369
370		if self
371			.finalized_needed_session
372			.map(|oldest_needed_session| oldest_needed_session < self.min_known_session)
373			.unwrap_or(true)
374		{
375			let (tx, rx) = oneshot::channel();
376			sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await;
377			let finalized_block_number = match rx.await? {
378				Ok(block_number) => block_number,
379				_ => return Ok(()),
380			};
381
382			let (tx, rx) = oneshot::channel();
383			sender
384				.send_message(ChainApiMessage::FinalizedBlockHash(finalized_block_number, tx))
385				.await;
386
387			let finalized_block_hash = match rx.await? {
388				Ok(Some(block_hash)) => block_hash,
389				_ => return Ok(()),
390			};
391
392			let finalized_session_index =
393				util::request_session_index_for_child(finalized_block_hash, sender)
394					.await
395					.await??;
396
397			if finalized_session_index < self.min_known_session &&
398				Some(finalized_session_index) != self.finalized_needed_session
399			{
400				gum::debug!(
401					target: LOG_TARGET,
402					?finalized_block_hash,
403					?finalized_block_number,
404					?finalized_session_index,
405					"Building topology for finalized block session",
406				);
407
408				let finalized_session_info = match util::request_session_info(
409					finalized_block_hash,
410					finalized_session_index,
411					sender,
412				)
413				.await
414				.await??
415				{
416					Some(session_info) => session_info,
417					_ => return Ok(()),
418				};
419
420				let our_index = self.get_key_index_and_update_metrics(&finalized_session_info)?;
421				update_gossip_topology(
422					sender,
423					our_index,
424					finalized_session_info.discovery_keys.clone(),
425					finalized_block_hash,
426					finalized_session_index,
427				)
428				.await?;
429			}
430			self.finalized_needed_session = Some(finalized_session_index);
431		}
432		Ok(())
433	}
434
435	// Checks if the node is an authority and also updates `polkadot_node_is_authority` and
436	// `polkadot_node_is_parachain_validator` metrics accordingly.
437	// On success, returns the index of our keys in `session_info.discovery_keys`.
438	fn get_key_index_and_update_metrics(
439		&mut self,
440		session_info: &SessionInfo,
441	) -> Result<usize, util::Error> {
442		let authority_check_result =
443			ensure_i_am_an_authority(&self.keystore, &session_info.discovery_keys);
444
445		match authority_check_result.as_ref() {
446			Ok(index) => {
447				gum::trace!(target: LOG_TARGET, "We are now an authority",);
448				self.metrics.on_is_authority();
449
450				// The subset of authorities participating in parachain consensus.
451				let parachain_validators_this_session = session_info.validators.len();
452
453				// First `maxValidators` entries are the parachain validators. We'll check
454				// if our index is in this set to avoid searching for the keys.
455				// https://github.com/paritytech/polkadot/blob/a52dca2be7840b23c19c153cf7e110b1e3e475f8/runtime/parachains/src/configuration.rs#L148
456				if *index < parachain_validators_this_session {
457					gum::trace!(target: LOG_TARGET, "We are now a parachain validator",);
458					self.metrics.on_is_parachain_validator();
459				} else {
460					gum::trace!(target: LOG_TARGET, "We are no longer a parachain validator",);
461					self.metrics.on_is_not_parachain_validator();
462				}
463			},
464			Err(util::Error::NotAValidator) => {
465				gum::trace!(target: LOG_TARGET, "We are no longer an authority",);
466				self.metrics.on_is_not_authority();
467				self.metrics.on_is_not_parachain_validator();
468			},
469			// Don't update on runtime errors.
470			Err(_) => {},
471		};
472
473		authority_check_result
474	}
475
476	async fn resolve_authorities(
477		&mut self,
478		authorities: Vec<AuthorityDiscoveryId>,
479	) -> (Vec<HashSet<Multiaddr>>, HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>, usize) {
480		let mut validator_addrs = Vec::with_capacity(authorities.len());
481		let mut resolved = HashMap::with_capacity(authorities.len());
482		let mut failures = 0;
483
484		for authority in authorities {
485			if let Some(addrs) =
486				self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await
487			{
488				validator_addrs.push(addrs.clone());
489				resolved.insert(authority, addrs);
490			} else {
491				failures += 1;
492				gum::debug!(
493					target: LOG_TARGET,
494					"Couldn't resolve addresses of authority: {:?}",
495					authority
496				);
497			}
498		}
499		(validator_addrs, resolved, failures)
500	}
501
502	async fn issue_connection_request_to_changed<Sender>(
503		&mut self,
504		sender: &mut Sender,
505		authorities: Vec<AuthorityDiscoveryId>,
506	) where
507		Sender: overseer::GossipSupportSenderTrait,
508	{
509		let (_, resolved, _) = self.resolve_authorities(authorities).await;
510
511		let mut changed = Vec::new();
512
513		for (authority, new_addresses) in &resolved {
514			let new_peer_ids = new_addresses
515				.iter()
516				.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
517				.collect::<HashSet<_>>();
518			match self.resolved_authorities.get(authority) {
519				Some(old_addresses) => {
520					let old_peer_ids = old_addresses
521						.iter()
522						.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
523						.collect::<HashSet<_>>();
524					if !old_peer_ids.is_superset(&new_peer_ids) {
525						changed.push(new_addresses.clone());
526					}
527				},
528				None => changed.push(new_addresses.clone()),
529			}
530		}
531		gum::debug!(
532			target: LOG_TARGET,
533			num_changed = ?changed.len(),
534			?changed,
535			"Issuing a connection request to changed validators"
536		);
537		if !changed.is_empty() {
538			self.resolved_authorities = resolved;
539
540			sender
541				.send_message(NetworkBridgeTxMessage::AddToResolvedValidators {
542					validator_addrs: changed,
543					peer_set: PeerSet::Validation,
544				})
545				.await;
546		}
547	}
548
549	async fn issue_connection_request<Sender>(
550		&mut self,
551		sender: &mut Sender,
552		authorities: Vec<AuthorityDiscoveryId>,
553	) where
554		Sender: overseer::GossipSupportSenderTrait,
555	{
556		let num = authorities.len();
557
558		let (validator_addrs, resolved, failures) = self.resolve_authorities(authorities).await;
559
560		self.resolved_authorities = resolved;
561		gum::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
562
563		sender
564			.send_message(NetworkBridgeTxMessage::ConnectToResolvedValidators {
565				validator_addrs,
566				peer_set: PeerSet::Validation,
567			})
568			.await;
569
570		// issue another request for the same session
571		// if at least a third of the authorities were not resolved.
572		if num != 0 && 3 * failures >= num {
573			let timestamp = Instant::now();
574			match self.failure_start {
575				None => self.failure_start = Some(timestamp),
576				Some(first) if first.elapsed() >= LOW_CONNECTIVITY_WARN_DELAY => {
577					gum::warn!(
578						target: LOG_TARGET,
579						connected = ?(num - failures),
580						target = ?num,
581						"Low connectivity - authority lookup failed for too many validators."
582					);
583				},
584				Some(_) => {
585					gum::debug!(
586						target: LOG_TARGET,
587						connected = ?(num - failures),
588						target = ?num,
589						"Low connectivity (due to authority lookup failures) - expected on startup."
590					);
591				},
592			}
593			self.last_failure = Some(timestamp);
594		} else {
595			self.last_failure = None;
596			self.failure_start = None;
597		};
598	}
599
600	async fn update_authority_ids<Sender>(
601		&mut self,
602		sender: &mut Sender,
603		authorities: Vec<AuthorityDiscoveryId>,
604	) where
605		Sender: overseer::GossipSupportSenderTrait,
606	{
607		let mut authority_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>> = HashMap::new();
608		for authority in authorities {
609			let peer_ids = self
610				.authority_discovery
611				.get_addresses_by_authority_id(authority.clone())
612				.await
613				.into_iter()
614				.flat_map(|list| list.into_iter())
615				.flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
616				.collect::<HashSet<_>>();
617
618			gum::trace!(
619				target: LOG_TARGET,
620				?peer_ids,
621				?authority,
622				"Resolved to peer ids"
623			);
624
625			for p in peer_ids {
626				authority_ids.entry(p).or_default().insert(authority.clone());
627			}
628		}
629
630		// peer was authority and now isn't
631		for (peer_id, current) in self.connected_peers.iter_mut() {
632			// empty -> nonempty is handled in the next loop
633			if !current.is_empty() && !authority_ids.contains_key(peer_id) {
634				sender
635					.send_message(NetworkBridgeRxMessage::UpdatedAuthorityIds {
636						peer_id: *peer_id,
637						authority_ids: HashSet::new(),
638					})
639					.await;
640
641				for a in current.drain() {
642					self.connected_authorities.remove(&a);
643				}
644			}
645		}
646
647		// peer has new authority set.
648		for (peer_id, new) in authority_ids {
649			// If the peer is connected _and_ the authority IDs have changed.
650			if let Some(prev) = self.connected_peers.get(&peer_id).filter(|x| x != &&new) {
651				sender
652					.send_message(NetworkBridgeRxMessage::UpdatedAuthorityIds {
653						peer_id,
654						authority_ids: new.clone(),
655					})
656					.await;
657
658				prev.iter().for_each(|a| {
659					self.connected_authorities.remove(a);
660				});
661				new.iter().for_each(|a| {
662					self.connected_authorities.insert(a.clone(), peer_id);
663				});
664
665				self.connected_peers.insert(peer_id, new);
666			}
667		}
668	}
669
670	fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent<GossipSupportNetworkMessage>) {
671		match ev {
672			NetworkBridgeEvent::PeerConnected(peer_id, _, _, o_authority) => {
673				if let Some(authority_ids) = o_authority {
674					authority_ids.iter().for_each(|a| {
675						self.connected_authorities.insert(a.clone(), peer_id);
676					});
677					self.connected_peers.insert(peer_id, authority_ids);
678				} else {
679					self.connected_peers.insert(peer_id, HashSet::new());
680				}
681			},
682			NetworkBridgeEvent::PeerDisconnected(peer_id) => {
683				if let Some(authority_ids) = self.connected_peers.remove(&peer_id) {
684					authority_ids.into_iter().for_each(|a| {
685						self.connected_authorities.remove(&a);
686					});
687				}
688			},
689			NetworkBridgeEvent::UpdatedAuthorityIds(_, _) => {
690				// The `gossip-support` subsystem itself issues these messages.
691			},
692			NetworkBridgeEvent::OurViewChange(_) => {},
693			NetworkBridgeEvent::PeerViewChange(_, _) => {},
694			NetworkBridgeEvent::NewGossipTopology { .. } => {},
695			NetworkBridgeEvent::PeerMessage(_, message) => {
696				// match void -> LLVM unreachable
697				match message {
698					ValidationProtocols::V3(m) => match m {},
699				}
700			},
701		}
702	}
703
704	/// Check connectivity and report on it in logs.
705	fn check_connectivity(&mut self) {
706		let absolute_connected = self.connected_authorities.len();
707		let absolute_resolved = self.resolved_authorities.len();
708		let connected_ratio =
709			(100 * absolute_connected).checked_div(absolute_resolved).unwrap_or(100);
710		let unconnected_authorities = self
711			.resolved_authorities
712			.iter()
713			.filter(|(a, _)| !self.connected_authorities.contains_key(a));
714		if connected_ratio <= LOW_CONNECTIVITY_WARN_THRESHOLD && self.is_authority_now {
715			gum::error!(
716				target: LOG_TARGET,
717				session_index = self.last_session_index.as_ref().map(|s| *s).unwrap_or_default(),
718				"Connectivity seems low, we are only connected to {connected_ratio}% of available validators (see debug logs for details), if this persists more than a session action needs to be taken"
719			);
720		}
721		let pretty = PrettyAuthorities(unconnected_authorities);
722		gum::debug!(
723			target: LOG_TARGET,
724			?connected_ratio,
725			?absolute_connected,
726			?absolute_resolved,
727			unconnected_authorities = %pretty,
728			"Connectivity Report"
729		);
730	}
731}
732
733// Get the authorities of the past, present, and future.
734async fn authorities_past_present_future(
735	sender: &mut impl overseer::GossipSupportSenderTrait,
736	relay_parent: Hash,
737) -> Result<Vec<AuthorityDiscoveryId>, util::Error> {
738	let authorities = util::request_authorities(relay_parent, sender).await.await??;
739	gum::debug!(
740		target: LOG_TARGET,
741		authority_count = ?authorities.len(),
742		"Determined past/present/future authorities",
743	);
744	Ok(authorities)
745}
746
747/// Return an error if we're not a validator in the given set (do not have keys).
748/// Otherwise, returns the index of our keys in `authorities`.
749fn ensure_i_am_an_authority(
750	keystore: &KeystorePtr,
751	authorities: &[AuthorityDiscoveryId],
752) -> Result<usize, util::Error> {
753	for (i, v) in authorities.iter().enumerate() {
754		if Keystore::has_keys(&**keystore, &[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]) {
755			return Ok(i)
756		}
757	}
758	Err(util::Error::NotAValidator)
759}
760
761/// Filter out all controlled keys in the given set. Returns the number of keys removed.
762fn remove_all_controlled(
763	keystore: &KeystorePtr,
764	authorities: &mut Vec<AuthorityDiscoveryId>,
765) -> usize {
766	let mut to_remove = Vec::new();
767	for (i, v) in authorities.iter().enumerate() {
768		if Keystore::has_keys(&**keystore, &[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]) {
769			to_remove.push(i);
770		}
771	}
772
773	for i in to_remove.iter().rev().copied() {
774		authorities.remove(i);
775	}
776
777	to_remove.len()
778}
779
780/// We partition the list of all sorted `authorities` into `sqrt(len)` groups of `sqrt(len)` size
781/// and form a matrix where each validator is connected to all validators in its row and column.
782/// This is similar to `[web3]` research proposed topology, except for the groups are not parachain
783/// groups (because not all validators are parachain validators and the group size is small),
784/// but formed randomly via BABE randomness from two epochs ago.
785/// This limits the amount of gossip peers to 2 * `sqrt(len)` and ensures the diameter of 2.
786///
787/// [web3]: https://research.web3.foundation/en/latest/polkadot/networking/3-avail-valid.html#topology
788async fn update_gossip_topology(
789	sender: &mut impl overseer::GossipSupportSenderTrait,
790	our_index: usize,
791	authorities: Vec<AuthorityDiscoveryId>,
792	relay_parent: Hash,
793	session_index: SessionIndex,
794) -> Result<(), util::Error> {
795	// retrieve BABE randomness
796	let random_seed = {
797		let (tx, rx) = oneshot::channel();
798
799		// TODO https://github.com/paritytech/polkadot/issues/5316:
800		// get the random seed from the `SessionInfo` instead.
801		sender
802			.send_message(RuntimeApiMessage::Request(
803				relay_parent,
804				RuntimeApiRequest::CurrentBabeEpoch(tx),
805			))
806			.await;
807
808		let randomness = rx.await??.randomness;
809		let mut subject = [0u8; 40];
810		subject[..8].copy_from_slice(b"gossipsu");
811		subject[8..].copy_from_slice(&randomness);
812		sp_crypto_hashing::blake2_256(&subject)
813	};
814
815	// shuffle the validators and create the index mapping
816	let (shuffled_indices, canonical_shuffling) = {
817		let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed);
818		let len = authorities.len();
819		let mut shuffled_indices = vec![0; len];
820		let mut canonical_shuffling: Vec<_> = authorities
821			.iter()
822			.enumerate()
823			.map(|(i, a)| (a.clone(), ValidatorIndex(i as _)))
824			.collect();
825
826		fisher_yates_shuffle(&mut rng, &mut canonical_shuffling[..]);
827		for (i, (_, validator_index)) in canonical_shuffling.iter().enumerate() {
828			shuffled_indices[validator_index.0 as usize] = i;
829		}
830
831		(shuffled_indices, canonical_shuffling)
832	};
833
834	sender
835		.send_message(NetworkBridgeRxMessage::NewGossipTopology {
836			session: session_index,
837			local_index: Some(ValidatorIndex(our_index as _)),
838			canonical_shuffling,
839			shuffled_indices,
840		})
841		.await;
842
843	Ok(())
844}
845
846// Durstenfeld algorithm for the Fisher-Yates shuffle
847// https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_modern_algorithm
848fn fisher_yates_shuffle<T, R: Rng + ?Sized>(rng: &mut R, items: &mut [T]) {
849	for i in (1..items.len()).rev() {
850		// invariant: elements with index > i have been locked in place.
851		let index = rng.gen_range(0u32..(i as u32 + 1));
852		items.swap(i, index as usize);
853	}
854}
855
856#[overseer::subsystem(GossipSupport, error = SubsystemError, prefix = self::overseer)]
857impl<Context, AD> GossipSupport<AD>
858where
859	AD: AuthorityDiscovery + Clone,
860{
861	fn start(self, ctx: Context) -> SpawnedSubsystem {
862		let future = self.run(ctx).map(|_| Ok(())).boxed();
863
864		SpawnedSubsystem { name: "gossip-support-subsystem", future }
865	}
866}
867
868/// Helper struct to get a nice rendering of unreachable authorities.
869struct PrettyAuthorities<I>(I);
870
871impl<'a, I> fmt::Display for PrettyAuthorities<I>
872where
873	I: Iterator<Item = (&'a AuthorityDiscoveryId, &'a HashSet<Multiaddr>)> + Clone,
874{
875	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
876		let mut authorities = self.0.clone().peekable();
877		if authorities.peek().is_none() {
878			write!(f, "None")?;
879		} else {
880			write!(f, "\n")?;
881		}
882		for (authority, addrs) in authorities {
883			write!(f, "{}:\n", authority)?;
884			for addr in addrs {
885				write!(f, "  {}\n", addr)?;
886			}
887			write!(f, "\n")?;
888		}
889		Ok(())
890	}
891}