referrerpolicy=no-referrer-when-downgrade

polkadot_availability_bitfield_distribution/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The bitfield distribution
18//!
19//! In case this node is a validator, gossips its own signed availability bitfield
20//! for a particular relay parent.
21//! Independently of that, gossips on received messages from peers to other interested peers.
22
23#![deny(unused_crate_dependencies)]
24
25use futures::{channel::oneshot, FutureExt};
26
27use net_protocol::filter_by_peer_version;
28use polkadot_node_network_protocol::{
29	self as net_protocol,
30	grid_topology::{
31		GridNeighbors, RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage,
32	},
33	peer_set::{ProtocolVersion, ValidationVersion},
34	v3 as protocol_v3, OurView, PeerId, UnifiedReputationChange as Rep, ValidationProtocols, View,
35};
36use polkadot_node_subsystem::{
37	messages::*, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
38	SubsystemError, SubsystemResult,
39};
40use polkadot_node_subsystem_util::{
41	self as util,
42	reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
43};
44
45use futures::select;
46use polkadot_primitives::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
47use rand::{CryptoRng, Rng, SeedableRng};
48use std::{
49	collections::{HashMap, HashSet},
50	time::Duration,
51};
52
53use self::metrics::Metrics;
54
55mod metrics;
56
57#[cfg(test)]
58mod tests;
59
60const COST_SIGNATURE_INVALID: Rep = Rep::CostMajor("Bitfield signature invalid");
61const COST_VALIDATOR_INDEX_INVALID: Rep = Rep::CostMajor("Bitfield validator index invalid");
62const COST_MISSING_PEER_SESSION_KEY: Rep = Rep::CostMinor("Missing peer session key");
63const COST_NOT_IN_VIEW: Rep = Rep::CostMinor("Not interested in that parent hash");
64const COST_PEER_DUPLICATE_MESSAGE: Rep =
65	Rep::CostMinorRepeated("Peer sent the same message multiple times");
66const BENEFIT_VALID_MESSAGE_FIRST: Rep =
67	Rep::BenefitMinorFirst("Valid message with new information");
68const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Valid message");
69
70/// Checked signed availability bitfield that is distributed
71/// to other peers.
72#[derive(Debug, Clone, PartialEq, Eq)]
73struct BitfieldGossipMessage {
74	/// The relay parent this message is relative to.
75	relay_parent: Hash,
76	/// The actual signed availability bitfield.
77	signed_availability: SignedAvailabilityBitfield,
78}
79
80impl BitfieldGossipMessage {
81	fn into_validation_protocol(
82		self,
83		recipient_version: ProtocolVersion,
84	) -> net_protocol::VersionedValidationProtocol {
85		self.into_network_message(recipient_version).into()
86	}
87
88	fn into_network_message(
89		self,
90		recipient_version: ProtocolVersion,
91	) -> net_protocol::BitfieldDistributionMessage {
92		match ValidationVersion::try_from(recipient_version).ok() {
93			Some(ValidationVersion::V3) =>
94				ValidationProtocols::V3(protocol_v3::BitfieldDistributionMessage::Bitfield(
95					self.relay_parent,
96					self.signed_availability.into(),
97				)),
98			None => {
99				gum::warn!(
100					target: LOG_TARGET,
101					version = ?recipient_version,
102					"Unknown protocol version provided for message recipient"
103				);
104
105				// fall back to v3 to avoid
106				ValidationProtocols::V3(protocol_v3::BitfieldDistributionMessage::Bitfield(
107					self.relay_parent,
108					self.signed_availability.into(),
109				))
110			},
111		}
112	}
113}
114
115/// Data stored on a per-peer basis.
116#[derive(Debug)]
117pub struct PeerData {
118	/// The peer's view.
119	view: View,
120	/// The peer's protocol version.
121	version: ProtocolVersion,
122}
123
124/// Data used to track information of peers and relay parents the
125/// overseer ordered us to work on.
126#[derive(Default)]
127struct ProtocolState {
128	/// Track all active peer views and protocol versions
129	/// to determine what is relevant to them.
130	peer_data: HashMap<PeerId, PeerData>,
131
132	/// The current and previous gossip topologies
133	topologies: SessionBoundGridTopologyStorage,
134
135	/// Our current view.
136	view: OurView,
137
138	/// Additional data particular to a relay parent.
139	per_relay_parent: HashMap<Hash, PerRelayParentData>,
140
141	/// Aggregated reputation change
142	reputation: ReputationAggregator,
143}
144
145/// Data for a particular relay parent.
146#[derive(Debug)]
147struct PerRelayParentData {
148	/// Signing context for a particular relay parent.
149	signing_context: SigningContext,
150
151	/// Set of validators for a particular relay parent.
152	validator_set: Vec<ValidatorId>,
153
154	/// Set of validators for a particular relay parent for which we
155	/// received a valid `BitfieldGossipMessage`.
156	/// Also serves as the list of known messages for peers connecting
157	/// after bitfield gossips were already received.
158	one_per_validator: HashMap<ValidatorId, BitfieldGossipMessage>,
159
160	/// Avoid duplicate message transmission to our peers.
161	message_sent_to_peer: HashMap<PeerId, HashSet<ValidatorId>>,
162
163	/// Track messages that were already received by a peer
164	/// to prevent flooding.
165	message_received_from_peer: HashMap<PeerId, HashSet<ValidatorId>>,
166}
167
168impl PerRelayParentData {
169	/// Create a new instance.
170	fn new(signing_context: SigningContext, validator_set: Vec<ValidatorId>) -> Self {
171		Self {
172			signing_context,
173			validator_set,
174			one_per_validator: Default::default(),
175			message_sent_to_peer: Default::default(),
176			message_received_from_peer: Default::default(),
177		}
178	}
179
180	/// Determines if that particular message signed by a
181	/// validator is needed by the given peer.
182	fn message_from_validator_needed_by_peer(
183		&self,
184		peer: &PeerId,
185		signed_by: &ValidatorId,
186	) -> bool {
187		self.message_sent_to_peer
188			.get(peer)
189			.map(|pubkeys| !pubkeys.contains(signed_by))
190			.unwrap_or(true) &&
191			self.message_received_from_peer
192				.get(peer)
193				.map(|pubkeys| !pubkeys.contains(signed_by))
194				.unwrap_or(true)
195	}
196}
197
198const LOG_TARGET: &str = "parachain::bitfield-distribution";
199
200/// The bitfield distribution subsystem.
201pub struct BitfieldDistribution {
202	metrics: Metrics,
203}
204
205#[overseer::contextbounds(BitfieldDistribution, prefix = self::overseer)]
206impl BitfieldDistribution {
207	/// Create a new instance of the `BitfieldDistribution` subsystem.
208	pub fn new(metrics: Metrics) -> Self {
209		Self { metrics }
210	}
211
212	/// Start processing work as passed on from the Overseer.
213	async fn run<Context>(self, ctx: Context) {
214		let mut state = ProtocolState::default();
215		let mut rng = rand::rngs::StdRng::from_entropy();
216		self.run_inner(ctx, &mut state, REPUTATION_CHANGE_INTERVAL, &mut rng).await
217	}
218
219	async fn run_inner<Context>(
220		self,
221		mut ctx: Context,
222		state: &mut ProtocolState,
223		reputation_interval: Duration,
224		rng: &mut (impl CryptoRng + Rng),
225	) {
226		// work: process incoming messages from the overseer and process accordingly.
227
228		let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
229		let mut reputation_delay = new_reputation_delay();
230
231		loop {
232			select! {
233				_ = reputation_delay => {
234					state.reputation.send(ctx.sender()).await;
235					reputation_delay = new_reputation_delay();
236				},
237				message = ctx.recv().fuse() => {
238					let message = match message {
239						Ok(message) => message,
240						Err(err) => {
241							gum::error!(
242								target: LOG_TARGET,
243								?err,
244								"Failed to receive a message from Overseer, exiting"
245							);
246							return
247						},
248					};
249					match message {
250						FromOrchestra::Communication {
251							msg:
252								BitfieldDistributionMessage::DistributeBitfield(
253									relay_parent,
254									signed_availability,
255								),
256						} => {
257							gum::trace!(target: LOG_TARGET, ?relay_parent, "Processing DistributeBitfield");
258							handle_bitfield_distribution(
259								&mut ctx,
260								state,
261								&self.metrics,
262								relay_parent,
263								signed_availability,
264								rng,
265							)
266							.await;
267						},
268						FromOrchestra::Communication {
269							msg: BitfieldDistributionMessage::NetworkBridgeUpdate(event),
270						} => {
271							gum::trace!(target: LOG_TARGET, "Processing NetworkMessage");
272							// a network message was received
273							handle_network_msg(&mut ctx, state, &self.metrics, event, rng).await;
274						},
275						FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
276							activated,
277							..
278						})) => {
279							let _timer = self.metrics.time_active_leaves_update();
280
281							if let Some(activated) = activated {
282								let relay_parent = activated.hash;
283
284								gum::trace!(target: LOG_TARGET, ?relay_parent, "activated");
285
286								// query validator set and signing context per relay_parent once only
287								match query_basics(&mut ctx, relay_parent).await {
288									Ok(Some((validator_set, signing_context))) => {
289										// If our runtime API fails, we don't take down the node,
290										// but we might alter peers' reputations erroneously as a result
291										// of not having the correct bookkeeping. If we have lost a race
292										// with state pruning, it is unlikely that peers will be sending
293										// us anything to do with this relay-parent anyway.
294										let _ = state.per_relay_parent.insert(
295											relay_parent,
296											PerRelayParentData::new(signing_context, validator_set),
297										);
298									},
299									Err(err) => {
300										gum::warn!(target: LOG_TARGET, ?err, "query_basics has failed");
301									},
302									_ => {},
303								}
304							}
305						},
306						FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
307							gum::trace!(target: LOG_TARGET, ?hash, %number, "block finalized");
308						},
309						FromOrchestra::Signal(OverseerSignal::Conclude) => {
310							gum::info!(target: LOG_TARGET, "Conclude");
311							return
312						},
313					}
314				}
315			}
316		}
317	}
318}
319
320/// Modify the reputation of a peer based on its behavior.
321async fn modify_reputation(
322	reputation: &mut ReputationAggregator,
323	sender: &mut impl overseer::BitfieldDistributionSenderTrait,
324	relay_parent: Hash,
325	peer: PeerId,
326	rep: Rep,
327) {
328	gum::trace!(target: LOG_TARGET, ?relay_parent, ?rep, %peer, "reputation change");
329
330	reputation.modify(sender, peer, rep).await;
331}
332/// Distribute a given valid and signature checked bitfield message.
333///
334/// For this variant the source is this node.
335#[overseer::contextbounds(BitfieldDistribution, prefix=self::overseer)]
336async fn handle_bitfield_distribution<Context>(
337	ctx: &mut Context,
338	state: &mut ProtocolState,
339	metrics: &Metrics,
340	relay_parent: Hash,
341	signed_availability: SignedAvailabilityBitfield,
342	rng: &mut (impl CryptoRng + Rng),
343) {
344	let _timer = metrics.time_handle_bitfield_distribution();
345
346	// Ignore anything the overseer did not tell this subsystem to work on
347	let mut job_data = state.per_relay_parent.get_mut(&relay_parent);
348	let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
349		job_data
350	} else {
351		gum::debug!(
352			target: LOG_TARGET,
353			?relay_parent,
354			"Not supposed to work on relay parent related data",
355		);
356
357		return
358	};
359
360	let session_idx = job_data.signing_context.session_index;
361	let validator_set = &job_data.validator_set;
362	if validator_set.is_empty() {
363		gum::debug!(target: LOG_TARGET, ?relay_parent, "validator set is empty");
364		return
365	}
366
367	let validator_index = signed_availability.validator_index();
368	let validator = if let Some(validator) = validator_set.get(validator_index.0 as usize) {
369		validator.clone()
370	} else {
371		gum::debug!(target: LOG_TARGET, validator_index = ?validator_index.0, "Could not find a validator for index");
372		return
373	};
374
375	let msg = BitfieldGossipMessage { relay_parent, signed_availability };
376	let topology = state.topologies.get_topology_or_fallback(session_idx).local_grid_neighbors();
377	let required_routing = topology.required_routing_by_index(validator_index, true);
378
379	relay_message(
380		ctx,
381		job_data,
382		topology,
383		&mut state.peer_data,
384		validator,
385		msg,
386		required_routing,
387		rng,
388	)
389	.await;
390
391	metrics.on_own_bitfield_sent();
392}
393
394/// Distribute a given valid and signature checked bitfield message.
395///
396/// Can be originated by another subsystem or received via network from another peer.
397#[overseer::contextbounds(BitfieldDistribution, prefix=self::overseer)]
398async fn relay_message<Context>(
399	ctx: &mut Context,
400	job_data: &mut PerRelayParentData,
401	topology_neighbors: &GridNeighbors,
402	peers: &mut HashMap<PeerId, PeerData>,
403	validator: ValidatorId,
404	message: BitfieldGossipMessage,
405	required_routing: RequiredRouting,
406	rng: &mut (impl CryptoRng + Rng),
407) {
408	let relay_parent = message.relay_parent;
409
410	// notify the overseer about a new and valid signed bitfield
411	ctx.send_message(ProvisionerMessage::ProvisionableData(
412		relay_parent,
413		ProvisionableData::Bitfield(relay_parent, message.signed_availability.clone()),
414	))
415	.await;
416
417	let total_peers = peers.len();
418	let mut random_routing: RandomRouting = Default::default();
419
420	// pass on the bitfield distribution to all interested peers
421	let interested_peers = peers
422		.iter()
423		.filter_map(|(peer, data)| {
424			// check interest in the peer in this message's relay parent
425			if data.view.contains(&message.relay_parent) {
426				let message_needed =
427					job_data.message_from_validator_needed_by_peer(&peer, &validator);
428				if message_needed {
429					let in_topology = topology_neighbors.route_to_peer(required_routing, &peer);
430					let need_routing = in_topology || {
431						let route_random = random_routing.sample(total_peers, rng);
432						if route_random {
433							random_routing.inc_sent();
434						}
435
436						route_random
437					};
438
439					if need_routing {
440						Some((*peer, data.version))
441					} else {
442						None
443					}
444				} else {
445					None
446				}
447			} else {
448				None
449			}
450		})
451		.collect::<Vec<(PeerId, ProtocolVersion)>>();
452
453	interested_peers.iter().for_each(|(peer, _)| {
454		// track the message as sent for this peer
455		job_data
456			.message_sent_to_peer
457			.entry(*peer)
458			.or_default()
459			.insert(validator.clone());
460	});
461
462	if interested_peers.is_empty() {
463		gum::trace!(
464			target: LOG_TARGET,
465			?relay_parent,
466			"no peers are interested in gossip for relay parent",
467		);
468	} else {
469		let v3_interested_peers =
470			filter_by_peer_version(&interested_peers, ValidationVersion::V3.into());
471
472		if !v3_interested_peers.is_empty() {
473			ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
474				v3_interested_peers,
475				message.into_validation_protocol(ValidationVersion::V3.into()),
476			))
477			.await
478		}
479	}
480}
481
482/// Handle an incoming message from a peer.
483#[overseer::contextbounds(BitfieldDistribution, prefix=self::overseer)]
484async fn process_incoming_peer_message<Context>(
485	ctx: &mut Context,
486	state: &mut ProtocolState,
487	metrics: &Metrics,
488	origin: PeerId,
489	message: net_protocol::BitfieldDistributionMessage,
490	rng: &mut (impl CryptoRng + Rng),
491) {
492	let (relay_parent, bitfield) = match message {
493		ValidationProtocols::V3(protocol_v3::BitfieldDistributionMessage::Bitfield(
494			relay_parent,
495			bitfield,
496		)) => (relay_parent, bitfield),
497	};
498
499	gum::trace!(
500		target: LOG_TARGET,
501		peer = %origin,
502		?relay_parent,
503		"received bitfield gossip from peer"
504	);
505	// we don't care about this, not part of our view.
506	if !state.view.contains(&relay_parent) {
507		modify_reputation(
508			&mut state.reputation,
509			ctx.sender(),
510			relay_parent,
511			origin,
512			COST_NOT_IN_VIEW,
513		)
514		.await;
515		return
516	}
517
518	// Ignore anything the overseer did not tell this subsystem to work on.
519	let mut job_data = state.per_relay_parent.get_mut(&relay_parent);
520	let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
521		job_data
522	} else {
523		modify_reputation(
524			&mut state.reputation,
525			ctx.sender(),
526			relay_parent,
527			origin,
528			COST_NOT_IN_VIEW,
529		)
530		.await;
531		return
532	};
533
534	let validator_index = bitfield.unchecked_validator_index();
535
536	let validator_set = &job_data.validator_set;
537	if validator_set.is_empty() {
538		gum::trace!(target: LOG_TARGET, ?relay_parent, ?origin, "Validator set is empty",);
539		modify_reputation(
540			&mut state.reputation,
541			ctx.sender(),
542			relay_parent,
543			origin,
544			COST_MISSING_PEER_SESSION_KEY,
545		)
546		.await;
547		return
548	}
549
550	// Use the (untrusted) validator index provided by the signed payload
551	// and see if that one actually signed the availability bitset.
552	let signing_context = job_data.signing_context.clone();
553	let validator = if let Some(validator) = validator_set.get(validator_index.0 as usize) {
554		validator.clone()
555	} else {
556		modify_reputation(
557			&mut state.reputation,
558			ctx.sender(),
559			relay_parent,
560			origin,
561			COST_VALIDATOR_INDEX_INVALID,
562		)
563		.await;
564		return
565	};
566
567	// Check if the peer already sent us a message for the validator denoted in the message earlier.
568	// Must be done after validator index verification, in order to avoid storing an unbounded
569	// number of set entries.
570	let received_set = job_data.message_received_from_peer.entry(origin).or_default();
571
572	if !received_set.contains(&validator) {
573		received_set.insert(validator.clone());
574	} else {
575		gum::trace!(target: LOG_TARGET, ?validator_index, ?origin, "Duplicate message");
576		modify_reputation(
577			&mut state.reputation,
578			ctx.sender(),
579			relay_parent,
580			origin,
581			COST_PEER_DUPLICATE_MESSAGE,
582		)
583		.await;
584		return
585	};
586
587	let one_per_validator = &mut (job_data.one_per_validator);
588
589	// relay a message received from a validator at most _once_
590	if let Some(old_message) = one_per_validator.get(&validator) {
591		gum::trace!(
592			target: LOG_TARGET,
593			?validator_index,
594			"already received a message for validator",
595		);
596		if old_message.signed_availability.as_unchecked() == &bitfield {
597			modify_reputation(
598				&mut state.reputation,
599				ctx.sender(),
600				relay_parent,
601				origin,
602				BENEFIT_VALID_MESSAGE,
603			)
604			.await;
605		}
606		return
607	}
608	let signed_availability = match bitfield.try_into_checked(&signing_context, &validator) {
609		Err(_) => {
610			modify_reputation(
611				&mut state.reputation,
612				ctx.sender(),
613				relay_parent,
614				origin,
615				COST_SIGNATURE_INVALID,
616			)
617			.await;
618			return
619		},
620		Ok(bitfield) => bitfield,
621	};
622
623	let message = BitfieldGossipMessage { relay_parent, signed_availability };
624
625	let topology = state
626		.topologies
627		.get_topology_or_fallback(job_data.signing_context.session_index)
628		.local_grid_neighbors();
629	let required_routing = topology.required_routing_by_index(validator_index, false);
630
631	metrics.on_bitfield_received();
632	one_per_validator.insert(validator.clone(), message.clone());
633
634	relay_message(
635		ctx,
636		job_data,
637		topology,
638		&mut state.peer_data,
639		validator,
640		message,
641		required_routing,
642		rng,
643	)
644	.await;
645
646	modify_reputation(
647		&mut state.reputation,
648		ctx.sender(),
649		relay_parent,
650		origin,
651		BENEFIT_VALID_MESSAGE_FIRST,
652	)
653	.await
654}
655
656/// Deal with network bridge updates and track what needs to be tracked
657/// which depends on the message type received.
658#[overseer::contextbounds(BitfieldDistribution, prefix=self::overseer)]
659async fn handle_network_msg<Context>(
660	ctx: &mut Context,
661	state: &mut ProtocolState,
662	metrics: &Metrics,
663	bridge_message: NetworkBridgeEvent<net_protocol::BitfieldDistributionMessage>,
664	rng: &mut (impl CryptoRng + Rng),
665) {
666	let _timer = metrics.time_handle_network_msg();
667
668	match bridge_message {
669		NetworkBridgeEvent::PeerConnected(peer, role, version, _) => {
670			gum::trace!(target: LOG_TARGET, ?peer, ?role, "Peer connected");
671			// insert if none already present
672			state
673				.peer_data
674				.entry(peer)
675				.or_insert_with(|| PeerData { view: View::default(), version });
676		},
677		NetworkBridgeEvent::PeerDisconnected(peer) => {
678			gum::trace!(target: LOG_TARGET, ?peer, "Peer disconnected");
679			// get rid of superfluous data
680			state.peer_data.remove(&peer);
681		},
682		NetworkBridgeEvent::NewGossipTopology(gossip_topology) => {
683			let session_index = gossip_topology.session;
684			let new_topology = gossip_topology.topology;
685			let prev_neighbors =
686				state.topologies.get_current_topology().local_grid_neighbors().clone();
687
688			state.topologies.update_topology(
689				session_index,
690				new_topology,
691				gossip_topology.local_index,
692			);
693			let current_topology = state.topologies.get_current_topology();
694
695			let newly_added = current_topology.local_grid_neighbors().peers_diff(&prev_neighbors);
696
697			gum::debug!(
698				target: LOG_TARGET,
699				?session_index,
700				newly_added_peers = ?newly_added.len(),
701				"New gossip topology received",
702			);
703
704			for new_peer in newly_added {
705				let old_view = match state.peer_data.get_mut(&new_peer) {
706					Some(d) => {
707						// in case we already knew that peer in the past
708						// it might have had an existing view, we use to initialize
709						// and minimize the delta on `PeerViewChange` to be sent
710						std::mem::replace(&mut d.view, Default::default())
711					},
712					None => {
713						// For peers which are currently unknown, we'll send topology-related
714						// messages to them when they connect and send their first view update.
715						continue
716					},
717				};
718
719				handle_peer_view_change(ctx, state, new_peer, old_view, rng).await;
720			}
721		},
722		NetworkBridgeEvent::PeerViewChange(peer_id, new_view) => {
723			gum::trace!(target: LOG_TARGET, ?peer_id, ?new_view, "Peer view change");
724			if state.peer_data.get(&peer_id).is_some() {
725				handle_peer_view_change(ctx, state, peer_id, new_view, rng).await;
726			}
727		},
728		NetworkBridgeEvent::OurViewChange(new_view) => {
729			gum::trace!(target: LOG_TARGET, ?new_view, "Our view change");
730			handle_our_view_change(state, new_view);
731		},
732		NetworkBridgeEvent::PeerMessage(remote, message) =>
733			process_incoming_peer_message(ctx, state, metrics, remote, message, rng).await,
734		NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
735			state
736				.topologies
737				.get_current_topology_mut()
738				.update_authority_ids(peer_id, &authority_ids);
739		},
740	}
741}
742
743/// Handle the changes necessary when our view changes.
744fn handle_our_view_change(state: &mut ProtocolState, view: OurView) {
745	let old_view = std::mem::replace(&mut (state.view), view);
746
747	for added in state.view.difference(&old_view) {
748		if !state.per_relay_parent.contains_key(&added) {
749			// Is guaranteed to be handled in `ActiveHead` update
750			// so this should never happen.
751			gum::error!(
752				target: LOG_TARGET,
753				%added,
754				"Our view contains {}, but not in active heads",
755				&added
756			);
757		}
758	}
759	for removed in old_view.difference(&state.view) {
760		// cleanup relay parents we are not interested in any more
761		let _ = state.per_relay_parent.remove(&removed);
762	}
763}
764
765// Send the difference between two views which were not sent
766// to that particular peer.
767//
768// This requires that there is an entry in the `peer_data` field for the
769// peer.
770#[overseer::contextbounds(BitfieldDistribution, prefix=self::overseer)]
771async fn handle_peer_view_change<Context>(
772	ctx: &mut Context,
773	state: &mut ProtocolState,
774	origin: PeerId,
775	view: View,
776	rng: &mut (impl CryptoRng + Rng),
777) {
778	let peer_data = match state.peer_data.get_mut(&origin) {
779		None => {
780			gum::warn!(
781				target: LOG_TARGET,
782				peer = ?origin,
783				"Attempted to update peer view for unknown peer."
784			);
785
786			return
787		},
788		Some(pd) => pd,
789	};
790
791	let added = peer_data.view.replace_difference(view).cloned().collect::<Vec<_>>();
792	let current_session_index = state.topologies.get_current_session_index();
793
794	let topology = state.topologies.get_current_topology().local_grid_neighbors();
795	let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &origin);
796
797	let lucky = is_gossip_peer ||
798		util::gen_ratio_rng(
799			util::MIN_GOSSIP_PEERS.saturating_sub(topology.len()),
800			util::MIN_GOSSIP_PEERS,
801			rng,
802		);
803
804	if !lucky {
805		gum::trace!(target: LOG_TARGET, ?origin, "Peer view change is ignored");
806		return
807	}
808
809	// Send all messages we've seen before and the peer is now interested
810	// in to that peer.
811	let delta_set: Vec<(ValidatorId, BitfieldGossipMessage)> = added
812		.into_iter()
813		.filter_map(|new_relay_parent_interest| {
814			if let Some(job_data) = state
815				.per_relay_parent
816				.get(&new_relay_parent_interest)
817				.filter(|job_data| job_data.signing_context.session_index == current_session_index)
818			{
819				// Send all jointly known messages for a validator (given the current relay parent)
820				// to the peer `origin`...
821				let one_per_validator = job_data.one_per_validator.clone();
822				Some(one_per_validator.into_iter().filter(move |(validator, _message)| {
823					// ..except for the ones the peer already has.
824					job_data.message_from_validator_needed_by_peer(&origin, validator)
825				}))
826			} else {
827				// A relay parent is in the peers view, which is not in ours, ignore those.
828				None
829			}
830		})
831		.flatten()
832		.collect();
833
834	for (validator, message) in delta_set.into_iter() {
835		send_tracked_gossip_message(ctx, state, origin, validator, message).await;
836	}
837}
838
839/// Send a gossip message and track it in the per relay parent data.
840#[overseer::contextbounds(BitfieldDistribution, prefix=self::overseer)]
841async fn send_tracked_gossip_message<Context>(
842	ctx: &mut Context,
843	state: &mut ProtocolState,
844	dest: PeerId,
845	validator: ValidatorId,
846	message: BitfieldGossipMessage,
847) {
848	let job_data = if let Some(job_data) = state.per_relay_parent.get_mut(&message.relay_parent) {
849		job_data
850	} else {
851		return
852	};
853
854	gum::trace!(
855		target: LOG_TARGET,
856		?dest,
857		?validator,
858		relay_parent = ?message.relay_parent,
859		"Sending gossip message"
860	);
861
862	let version =
863		if let Some(peer_data) = state.peer_data.get(&dest) { peer_data.version } else { return };
864
865	job_data.message_sent_to_peer.entry(dest).or_default().insert(validator.clone());
866
867	ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
868		vec![dest],
869		message.into_validation_protocol(version),
870	))
871	.await;
872}
873
874#[overseer::subsystem(BitfieldDistribution, error=SubsystemError, prefix=self::overseer)]
875impl<Context> BitfieldDistribution {
876	fn start(self, ctx: Context) -> SpawnedSubsystem {
877		let future = self.run(ctx).map(|_| Ok(())).boxed();
878
879		SpawnedSubsystem { name: "bitfield-distribution-subsystem", future }
880	}
881}
882
883/// Query our validator set and signing context for a particular relay parent.
884#[overseer::contextbounds(BitfieldDistribution, prefix=self::overseer)]
885async fn query_basics<Context>(
886	ctx: &mut Context,
887	relay_parent: Hash,
888) -> SubsystemResult<Option<(Vec<ValidatorId>, SigningContext)>> {
889	let (validators_tx, validators_rx) = oneshot::channel();
890	let (session_tx, session_rx) = oneshot::channel();
891
892	// query validators
893	ctx.send_message(RuntimeApiMessage::Request(
894		relay_parent,
895		RuntimeApiRequest::Validators(validators_tx),
896	))
897	.await;
898
899	// query signing context
900	ctx.send_message(RuntimeApiMessage::Request(
901		relay_parent,
902		RuntimeApiRequest::SessionIndexForChild(session_tx),
903	))
904	.await;
905
906	match (validators_rx.await?, session_rx.await?) {
907		(Ok(validators), Ok(session_index)) =>
908			Ok(Some((validators, SigningContext { parent_hash: relay_parent, session_index }))),
909		(Err(err), _) | (_, Err(err)) => {
910			gum::warn!(
911				target: LOG_TARGET,
912				?relay_parent,
913				?err,
914				"Failed to fetch basics from runtime API"
915			);
916			Ok(None)
917		},
918	}
919}