use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use bitvec::{bitvec, vec::BitVec};
use futures::{
channel::oneshot, future::Fuse, pin_mut, select, stream::FuturesUnordered, FutureExt, StreamExt,
};
use schnellru::{ByLength, LruMap};
use sp_core::Pair;
use polkadot_node_network_protocol::{
self as net_protocol,
peer_set::{CollationVersion, PeerSet},
request_response::{
incoming::{self, OutgoingResponse},
v1 as request_v1, v2 as request_v2, IncomingRequestReceiver,
},
v1 as protocol_v1, v2 as protocol_v2, OurView, PeerId, UnifiedReputationChange as Rep,
Versioned, View,
};
use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement};
use polkadot_node_subsystem::{
messages::{
CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData,
RuntimeApiMessage,
},
overseer, FromOrchestra, OverseerSignal,
};
use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
runtime::{
fetch_claim_queue, get_availability_cores, get_group_rotation_info,
prospective_parachains_mode, ProspectiveParachainsMode, RuntimeInfo,
},
TimeoutExt,
};
use polkadot_primitives::{
vstaging::{CandidateReceiptV2 as CandidateReceipt, CoreState},
AuthorityDiscoveryId, CandidateHash, CollatorPair, CoreIndex, GroupIndex, Hash, HeadData,
Id as ParaId, SessionIndex,
};
use super::LOG_TARGET;
use crate::{
error::{log_error, Error, FatalError, Result},
modify_reputation,
};
mod collation;
mod metrics;
#[cfg(test)]
mod tests;
mod validators_buffer;
use collation::{
ActiveCollationFetches, Collation, CollationSendResult, CollationStatus,
VersionedCollationRequest, WaitingCollationFetches,
};
use validators_buffer::{
ResetInterestTimeout, ValidatorGroupsBuffer, RESET_INTEREST_TIMEOUT, VALIDATORS_BUFFER_CAPACITY,
};
pub use metrics::Metrics;
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
const COST_APPARENT_FLOOD: Rep =
Rep::CostMinor("Message received when previous one was still being processed");
const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150);
const RECONNECT_AFTER_LEAF_TIMEOUT: Duration = Duration::from_secs(4);
type ReconnectTimeout = Fuse<futures_timer::Delay>;
#[derive(Debug)]
enum ShouldAdvertiseTo {
Yes,
NotAuthority,
AlreadyAdvertised,
}
#[derive(Debug, Default)]
struct ValidatorGroup {
validators: Vec<AuthorityDiscoveryId>,
advertised_to: HashMap<CandidateHash, BitVec>,
}
impl ValidatorGroup {
fn should_advertise_to(
&self,
candidate_hash: &CandidateHash,
peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
peer: &PeerId,
) -> ShouldAdvertiseTo {
let authority_ids = match peer_ids.get(peer) {
Some(authority_ids) => authority_ids,
None => return ShouldAdvertiseTo::NotAuthority,
};
for id in authority_ids {
let validator_index = match self.validators.iter().position(|v| v == id) {
Some(idx) => idx,
None => continue,
};
if self
.advertised_to
.get(candidate_hash)
.map_or(true, |advertised| !advertised[validator_index])
{
return ShouldAdvertiseTo::Yes
} else {
return ShouldAdvertiseTo::AlreadyAdvertised
}
}
ShouldAdvertiseTo::NotAuthority
}
fn advertised_to_peer(
&mut self,
candidate_hash: &CandidateHash,
peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
peer: &PeerId,
) {
if let Some(authority_ids) = peer_ids.get(peer) {
for id in authority_ids {
let validator_index = match self.validators.iter().position(|v| v == id) {
Some(idx) => idx,
None => continue,
};
self.advertised_to
.entry(*candidate_hash)
.or_insert_with(|| bitvec![0; self.validators.len()])
.set(validator_index, true);
}
}
}
}
#[derive(Debug)]
struct PeerData {
view: View,
version: CollationVersion,
unknown_heads: LruMap<Hash, (), ByLength>,
}
struct CollationWithCoreIndex(Collation, CoreIndex);
impl CollationWithCoreIndex {
pub fn collation(&self) -> &Collation {
&self.0
}
pub fn collation_mut(&mut self) -> &mut Collation {
&mut self.0
}
pub fn core_index(&self) -> &CoreIndex {
&self.1
}
}
struct PerRelayParent {
prospective_parachains_mode: ProspectiveParachainsMode,
validator_group: HashMap<CoreIndex, ValidatorGroup>,
collations: HashMap<CandidateHash, CollationWithCoreIndex>,
}
impl PerRelayParent {
fn new(mode: ProspectiveParachainsMode) -> Self {
Self {
prospective_parachains_mode: mode,
validator_group: HashMap::default(),
collations: HashMap::new(),
}
}
}
struct State {
local_peer_id: PeerId,
collator_pair: CollatorPair,
collating_on: Option<ParaId>,
peer_data: HashMap<PeerId, PeerData>,
implicit_view: Option<ImplicitView>,
active_leaves: HashMap<Hash, ProspectiveParachainsMode>,
per_relay_parent: HashMap<Hash, PerRelayParent>,
collation_result_senders: HashMap<CandidateHash, oneshot::Sender<CollationSecondedSignal>>,
peer_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
validator_groups_buf: ValidatorGroupsBuffer,
reconnect_timeout: ReconnectTimeout,
metrics: Metrics,
waiting_collation_fetches: HashMap<Hash, WaitingCollationFetches>,
active_collation_fetches: ActiveCollationFetches,
advertisement_timeouts: FuturesUnordered<ResetInterestTimeout>,
reputation: ReputationAggregator,
}
impl State {
fn new(
local_peer_id: PeerId,
collator_pair: CollatorPair,
metrics: Metrics,
reputation: ReputationAggregator,
) -> State {
State {
local_peer_id,
collator_pair,
metrics,
collating_on: Default::default(),
peer_data: Default::default(),
implicit_view: None,
active_leaves: Default::default(),
per_relay_parent: Default::default(),
collation_result_senders: Default::default(),
peer_ids: Default::default(),
validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY),
reconnect_timeout: Fuse::terminated(),
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
advertisement_timeouts: Default::default(),
reputation,
}
}
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn distribute_collation<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
id: ParaId,
receipt: CandidateReceipt,
parent_head_data_hash: Hash,
pov: PoV,
parent_head_data: HeadData,
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
core_index: CoreIndex,
) -> Result<()> {
let candidate_relay_parent = receipt.descriptor.relay_parent();
let candidate_hash = receipt.hash();
let per_relay_parent = match state.per_relay_parent.get_mut(&candidate_relay_parent) {
Some(per_relay_parent) => per_relay_parent,
None => {
gum::debug!(
target: LOG_TARGET,
para_id = %id,
candidate_relay_parent = %candidate_relay_parent,
candidate_hash = ?candidate_hash,
"Candidate relay parent is out of our view",
);
return Ok(())
},
};
let relay_parent_mode = per_relay_parent.prospective_parachains_mode;
let collations_limit = match relay_parent_mode {
ProspectiveParachainsMode::Disabled => 1,
ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth + 1,
};
if per_relay_parent.collations.len() >= collations_limit {
gum::debug!(
target: LOG_TARGET,
?candidate_relay_parent,
?relay_parent_mode,
"The limit of {} collations per relay parent is already reached",
collations_limit,
);
return Ok(())
}
if per_relay_parent.collations.contains_key(&candidate_hash) {
gum::debug!(
target: LOG_TARGET,
?candidate_relay_parent,
?candidate_hash,
"Already seen this candidate",
);
return Ok(())
}
let (our_cores, num_cores) =
match determine_cores(ctx.sender(), id, candidate_relay_parent, relay_parent_mode).await? {
(cores, _num_cores) if cores.is_empty() => {
gum::warn!(
target: LOG_TARGET,
para_id = %id,
"looks like no core is assigned to {} at {}", id, candidate_relay_parent,
);
return Ok(())
},
(cores, num_cores) => (cores, num_cores),
};
let elastic_scaling = our_cores.len() > 1;
if elastic_scaling {
gum::debug!(
target: LOG_TARGET,
para_id = %id,
cores = ?our_cores,
"{} is assigned to {} cores at {}", id, our_cores.len(), candidate_relay_parent,
);
}
if !our_cores.iter().any(|assigned_core| assigned_core == &core_index) {
gum::warn!(
target: LOG_TARGET,
para_id = %id,
relay_parent = ?candidate_relay_parent,
cores = ?our_cores,
?core_index,
"Attempting to distribute collation for a core we are not assigned to ",
);
return Ok(())
}
let our_core = core_index;
let GroupValidators { validators, session_index, group_index } =
determine_our_validators(ctx, runtime, our_core, num_cores, candidate_relay_parent).await?;
if validators.is_empty() {
gum::warn!(
target: LOG_TARGET,
core = ?our_core,
"there are no validators assigned to core",
);
return Ok(())
}
state.validator_groups_buf.note_collation_advertised(
candidate_hash,
session_index,
group_index,
&validators,
);
gum::debug!(
target: LOG_TARGET,
para_id = %id,
candidate_relay_parent = %candidate_relay_parent,
relay_parent_mode = ?relay_parent_mode,
?candidate_hash,
pov_hash = ?pov.hash(),
core = ?our_core,
current_validators = ?validators,
"Accepted collation, connecting to validators."
);
per_relay_parent.validator_group.entry(core_index).or_insert_with(|| {
let mut group = ValidatorGroup::default();
group.validators = validators;
group
});
connect_to_validators(ctx, &state.validator_groups_buf).await;
if let Some(result_sender) = result_sender {
state.collation_result_senders.insert(candidate_hash, result_sender);
}
let parent_head_data = if elastic_scaling {
ParentHeadData::WithData { hash: parent_head_data_hash, head_data: parent_head_data }
} else {
ParentHeadData::OnlyHash(parent_head_data_hash)
};
per_relay_parent.collations.insert(
candidate_hash,
CollationWithCoreIndex(
Collation { receipt, pov, parent_head_data, status: CollationStatus::Created },
core_index,
),
);
let interested =
state
.peer_data
.iter()
.filter(|(_, PeerData { view: v, .. })| match relay_parent_mode {
ProspectiveParachainsMode::Disabled => v.contains(&candidate_relay_parent),
ProspectiveParachainsMode::Enabled { .. } => v.iter().any(|block_hash| {
state.implicit_view.as_ref().map(|implicit_view| {
implicit_view
.known_allowed_relay_parents_under(block_hash, Some(id))
.unwrap_or_default()
.contains(&candidate_relay_parent)
}) == Some(true)
}),
});
for (peer_id, peer_data) in interested {
advertise_collation(
ctx,
candidate_relay_parent,
per_relay_parent,
peer_id,
peer_data.version,
&state.peer_ids,
&mut state.advertisement_timeouts,
&state.metrics,
)
.await;
}
Ok(())
}
async fn determine_cores(
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
para_id: ParaId,
relay_parent: Hash,
relay_parent_mode: ProspectiveParachainsMode,
) -> Result<(Vec<CoreIndex>, usize)> {
let cores = get_availability_cores(sender, relay_parent).await?;
let n_cores = cores.len();
let mut assigned_cores = Vec::new();
let maybe_claim_queue = fetch_claim_queue(sender, relay_parent).await?;
for (idx, core) in cores.iter().enumerate() {
let core_is_scheduled = match maybe_claim_queue {
Some(ref claim_queue) => {
claim_queue
.iter_claims_for_core(&CoreIndex(idx as u32))
.any(|para| para == ¶_id)
},
None => match core {
CoreState::Scheduled(scheduled) if scheduled.para_id == para_id => true,
CoreState::Occupied(occupied) if relay_parent_mode.is_enabled() =>
occupied.next_up_on_available.as_ref().map(|c| c.para_id) == Some(para_id),
_ => false,
},
};
if core_is_scheduled {
assigned_cores.push(CoreIndex::from(idx as u32));
}
}
Ok((assigned_cores, n_cores))
}
#[derive(Debug)]
struct GroupValidators {
validators: Vec<AuthorityDiscoveryId>,
session_index: SessionIndex,
group_index: GroupIndex,
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn determine_our_validators<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
core_index: CoreIndex,
cores: usize,
relay_parent: Hash,
) -> Result<GroupValidators> {
let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = &runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?
.session_info;
gum::debug!(target: LOG_TARGET, ?session_index, "Received session info");
let groups = &info.validator_groups;
let rotation_info = get_group_rotation_info(ctx.sender(), relay_parent).await?;
let current_group_index = rotation_info.group_for_core(core_index, cores);
let current_validators =
groups.get(current_group_index).map(|v| v.as_slice()).unwrap_or_default();
let validators = &info.discovery_keys;
let current_validators =
current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
let current_validators = GroupValidators {
validators: current_validators,
session_index,
group_index: current_group_index,
};
Ok(current_validators)
}
fn declare_message(
state: &mut State,
version: CollationVersion,
) -> Option<Versioned<protocol_v1::CollationProtocol, protocol_v2::CollationProtocol>> {
let para_id = state.collating_on?;
Some(match version {
CollationVersion::V1 => {
let declare_signature_payload =
protocol_v1::declare_signature_payload(&state.local_peer_id);
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(
state.collator_pair.public(),
para_id,
state.collator_pair.sign(&declare_signature_payload),
);
Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message))
},
CollationVersion::V2 => {
let declare_signature_payload =
protocol_v2::declare_signature_payload(&state.local_peer_id);
let wire_message = protocol_v2::CollatorProtocolMessage::Declare(
state.collator_pair.public(),
para_id,
state.collator_pair.sign(&declare_signature_payload),
);
Versioned::V2(protocol_v2::CollationProtocol::CollatorProtocol(wire_message))
},
})
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn declare<Context>(
ctx: &mut Context,
state: &mut State,
peer: &PeerId,
version: CollationVersion,
) {
if let Some(wire_message) = declare_message(state, version) {
ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(vec![*peer], wire_message))
.await;
}
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn connect_to_validators<Context>(
ctx: &mut Context,
validator_groups_buf: &ValidatorGroupsBuffer,
) {
let validator_ids = validator_groups_buf.validators_to_connect();
let (failed, _) = oneshot::channel();
ctx.send_message(NetworkBridgeTxMessage::ConnectToValidators {
validator_ids,
peer_set: PeerSet::Collation,
failed,
})
.await;
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn advertise_collation<Context>(
ctx: &mut Context,
relay_parent: Hash,
per_relay_parent: &mut PerRelayParent,
peer: &PeerId,
protocol_version: CollationVersion,
peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
advertisement_timeouts: &mut FuturesUnordered<ResetInterestTimeout>,
metrics: &Metrics,
) {
for (candidate_hash, collation_and_core) in per_relay_parent.collations.iter_mut() {
let core_index = *collation_and_core.core_index();
let collation = collation_and_core.collation_mut();
if let CollationVersion::V1 = protocol_version {
if per_relay_parent.prospective_parachains_mode.is_enabled() {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
peer_id = %peer,
"Skipping advertising to validator, incorrect network protocol version",
);
return
}
}
let Some(validator_group) = per_relay_parent.validator_group.get_mut(&core_index) else {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?core_index,
"Skipping advertising to validator, validator group for core not found",
);
return
};
let should_advertise = validator_group.should_advertise_to(candidate_hash, peer_ids, &peer);
match should_advertise {
ShouldAdvertiseTo::Yes => {},
ShouldAdvertiseTo::NotAuthority | ShouldAdvertiseTo::AlreadyAdvertised => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
?candidate_hash,
peer_id = %peer,
reason = ?should_advertise,
"Not advertising collation"
);
continue
},
}
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?candidate_hash,
peer_id = %peer,
"Advertising collation.",
);
collation.status.advance_to_advertised();
let collation_message = match protocol_version {
CollationVersion::V2 => {
let wire_message = protocol_v2::CollatorProtocolMessage::AdvertiseCollation {
relay_parent,
candidate_hash: *candidate_hash,
parent_head_data_hash: collation.parent_head_data.hash(),
};
Versioned::V2(protocol_v2::CollationProtocol::CollatorProtocol(wire_message))
},
CollationVersion::V1 => {
let wire_message =
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent);
Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message))
},
};
ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(
vec![*peer],
collation_message,
))
.await;
validator_group.advertised_to_peer(candidate_hash, &peer_ids, peer);
advertisement_timeouts.push(ResetInterestTimeout::new(
*candidate_hash,
*peer,
RESET_INTEREST_TIMEOUT,
));
metrics.on_advertisement_made();
}
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn process_msg<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
msg: CollatorProtocolMessage,
) -> Result<()> {
use CollatorProtocolMessage::*;
match msg {
CollateOn(id) => {
state.collating_on = Some(id);
state.implicit_view = Some(ImplicitView::new(Some(id)));
},
DistributeCollation {
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
core_index,
} => {
match state.collating_on {
Some(id) if candidate_receipt.descriptor.para_id() != id => {
gum::warn!(
target: LOG_TARGET,
para_id = %candidate_receipt.descriptor.para_id(),
collating_on = %id,
"DistributeCollation for unexpected para_id",
);
},
Some(id) => {
let _ = state.metrics.time_collation_distribution("distribute");
distribute_collation(
ctx,
runtime,
state,
id,
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
core_index,
)
.await?;
},
None => {
gum::warn!(
target: LOG_TARGET,
para_id = %candidate_receipt.descriptor.para_id(),
"DistributeCollation message while not collating on any",
);
},
}
},
NetworkBridgeUpdate(event) => {
let _ = state.metrics.time_process_msg();
if let Err(e) = handle_network_msg(ctx, runtime, state, event).await {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to handle incoming network message",
);
}
},
msg @ (Invalid(..) | Seconded(..)) => {
gum::warn!(
target: LOG_TARGET,
"{:?} message is not expected on the collator side of the protocol",
msg,
);
},
}
Ok(())
}
async fn send_collation(
state: &mut State,
request: VersionedCollationRequest,
receipt: CandidateReceipt,
pov: PoV,
parent_head_data: ParentHeadData,
) {
let (tx, rx) = oneshot::channel();
let relay_parent = request.relay_parent();
let peer_id = request.peer_id();
let candidate_hash = receipt.hash();
let result = match parent_head_data {
ParentHeadData::WithData { head_data, .. } =>
Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData {
receipt,
pov,
parent_head_data: head_data,
}),
ParentHeadData::OnlyHash(_) =>
Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov)),
};
let response =
OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };
if let Err(_) = request.send_outgoing_response(response) {
gum::warn!(target: LOG_TARGET, "Sending collation response failed");
}
state.active_collation_fetches.push(
async move {
let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
let timed_out = r.is_none();
CollationSendResult { relay_parent, candidate_hash, peer_id, timed_out }
}
.boxed(),
);
state.metrics.on_collation_sent();
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_incoming_peer_message<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
origin: PeerId,
msg: Versioned<protocol_v1::CollatorProtocolMessage, protocol_v2::CollatorProtocolMessage>,
) -> Result<()> {
use protocol_v1::CollatorProtocolMessage as V1;
use protocol_v2::CollatorProtocolMessage as V2;
match msg {
Versioned::V1(V1::Declare(..)) |
Versioned::V2(V2::Declare(..)) |
Versioned::V3(V2::Declare(..)) => {
gum::trace!(
target: LOG_TARGET,
?origin,
"Declare message is not expected on the collator side of the protocol",
);
ctx.send_message(NetworkBridgeTxMessage::DisconnectPeer(origin, PeerSet::Collation))
.await;
},
Versioned::V1(V1::AdvertiseCollation(_)) |
Versioned::V2(V2::AdvertiseCollation { .. }) |
Versioned::V3(V2::AdvertiseCollation { .. }) => {
gum::trace!(
target: LOG_TARGET,
?origin,
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
modify_reputation(&mut state.reputation, ctx.sender(), origin, COST_UNEXPECTED_MESSAGE)
.await;
ctx.send_message(NetworkBridgeTxMessage::DisconnectPeer(origin, PeerSet::Collation))
.await;
},
Versioned::V1(V1::CollationSeconded(relay_parent, statement)) |
Versioned::V2(V2::CollationSeconded(relay_parent, statement)) |
Versioned::V3(V2::CollationSeconded(relay_parent, statement)) => {
if !matches!(statement.unchecked_payload(), Statement::Seconded(_)) {
gum::warn!(
target: LOG_TARGET,
?statement,
?origin,
"Collation seconded message received with none-seconded statement.",
);
} else {
let statement = runtime
.check_signature(ctx.sender(), relay_parent, statement)
.await?
.map_err(Error::InvalidStatementSignature)?;
let removed =
state.collation_result_senders.remove(&statement.payload().candidate_hash());
if let Some(sender) = removed {
gum::trace!(
target: LOG_TARGET,
?statement,
?origin,
"received a valid `CollationSeconded`, forwarding result to collator",
);
let _ = sender.send(CollationSecondedSignal { statement, relay_parent });
} else {
let relay_parent = match state.per_relay_parent.get(&relay_parent) {
Some(per_relay_parent) => per_relay_parent,
None => {
gum::debug!(
target: LOG_TARGET,
candidate_relay_parent = %relay_parent,
candidate_hash = ?&statement.payload().candidate_hash(),
"Seconded statement relay parent is out of our view",
);
return Ok(())
},
};
match relay_parent.collations.get(&statement.payload().candidate_hash()) {
Some(_) => {
gum::trace!(
target: LOG_TARGET,
?statement,
?origin,
"received a valid `CollationSeconded`",
);
},
None => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?&statement.payload().candidate_hash(),
?origin,
"received an unexpected `CollationSeconded`: unknown statement",
);
},
}
}
}
},
}
Ok(())
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_incoming_request<Context>(
ctx: &mut Context,
state: &mut State,
req: std::result::Result<VersionedCollationRequest, incoming::Error>,
) -> Result<()> {
let req = req?;
let relay_parent = req.relay_parent();
let peer_id = req.peer_id();
let para_id = req.para_id();
match state.collating_on {
Some(our_para_id) if our_para_id == para_id => {
let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) {
Some(per_relay_parent) => per_relay_parent,
None => {
gum::debug!(
target: LOG_TARGET,
relay_parent = %relay_parent,
"received a `RequestCollation` for a relay parent out of our view",
);
return Ok(())
},
};
let mode = per_relay_parent.prospective_parachains_mode;
let collation_with_core = match &req {
VersionedCollationRequest::V1(_) if !mode.is_enabled() =>
per_relay_parent.collations.values_mut().next(),
VersionedCollationRequest::V2(req) =>
per_relay_parent.collations.get_mut(&req.payload.candidate_hash),
_ => {
gum::warn!(
target: LOG_TARGET,
relay_parent = %relay_parent,
prospective_parachains_mode = ?mode,
?peer_id,
"Collation request version is invalid",
);
return Ok(())
},
};
let (receipt, pov, parent_head_data) =
if let Some(collation_with_core) = collation_with_core {
let collation = collation_with_core.collation_mut();
collation.status.advance_to_requested();
(
collation.receipt.clone(),
collation.pov.clone(),
collation.parent_head_data.clone(),
)
} else {
gum::warn!(
target: LOG_TARGET,
relay_parent = %relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);
return Ok(())
};
state.metrics.on_collation_sent_requested();
let waiting = state.waiting_collation_fetches.entry(relay_parent).or_default();
let candidate_hash = receipt.hash();
if !waiting.waiting_peers.insert((peer_id, candidate_hash)) {
gum::debug!(
target: LOG_TARGET,
"Dropping incoming request as peer has a request in flight already."
);
modify_reputation(
&mut state.reputation,
ctx.sender(),
peer_id,
COST_APPARENT_FLOOD.into(),
)
.await;
return Ok(())
}
if waiting.collation_fetch_active {
waiting.req_queue.push_back(req);
} else {
waiting.collation_fetch_active = true;
let _ = state.metrics.time_collation_distribution("send");
send_collation(state, req, receipt, pov, parent_head_data).await;
}
},
Some(our_para_id) => {
gum::warn!(
target: LOG_TARGET,
for_para_id = %para_id,
our_para_id = %our_para_id,
"received a `CollationFetchingRequest` for unexpected para_id",
);
},
None => {
gum::warn!(
target: LOG_TARGET,
for_para_id = %para_id,
"received a `RequestCollation` while not collating on any para",
);
},
}
Ok(())
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_peer_view_change<Context>(
ctx: &mut Context,
state: &mut State,
peer_id: PeerId,
view: View,
) {
let Some(PeerData { view: current, version, unknown_heads }) =
state.peer_data.get_mut(&peer_id)
else {
return
};
let added: Vec<Hash> = view.difference(&*current).cloned().collect();
*current = view;
for added in added.into_iter() {
let block_hashes = match state
.per_relay_parent
.get(&added)
.map(|per_relay_parent| per_relay_parent.prospective_parachains_mode)
{
Some(ProspectiveParachainsMode::Disabled) => std::slice::from_ref(&added),
Some(ProspectiveParachainsMode::Enabled { .. }) => state
.implicit_view
.as_ref()
.and_then(|implicit_view| {
implicit_view.known_allowed_relay_parents_under(&added, state.collating_on)
})
.unwrap_or_default(),
None => {
gum::trace!(
target: LOG_TARGET,
?peer_id,
new_leaf = ?added,
"New leaf in peer's view is unknown",
);
unknown_heads.insert(added, ());
continue
},
};
for block_hash in block_hashes {
let Some(per_relay_parent) = state.per_relay_parent.get_mut(block_hash) else {
continue
};
advertise_collation(
ctx,
*block_hash,
per_relay_parent,
&peer_id,
*version,
&state.peer_ids,
&mut state.advertisement_timeouts,
&state.metrics,
)
.await;
}
}
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_network_msg<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
bridge_message: NetworkBridgeEvent<net_protocol::CollatorProtocolMessage>,
) -> Result<()> {
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(peer_id, observed_role, protocol_version, maybe_authority) => {
gum::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, ?maybe_authority, "Peer connected");
let version = match protocol_version.try_into() {
Ok(version) => version,
Err(err) => {
gum::error!(
target: LOG_TARGET,
?peer_id,
?observed_role,
?err,
"Unsupported protocol version"
);
return Ok(())
},
};
state.peer_data.entry(peer_id).or_insert_with(|| PeerData {
view: View::default(),
version,
unknown_heads: LruMap::new(ByLength::new(10)),
});
if let Some(authority_ids) = maybe_authority {
gum::trace!(
target: LOG_TARGET,
?authority_ids,
?peer_id,
"Connected to requested validator"
);
state.peer_ids.insert(peer_id, authority_ids);
declare(ctx, state, &peer_id, version).await;
}
},
PeerViewChange(peer_id, view) => {
gum::trace!(target: LOG_TARGET, ?peer_id, ?view, "Peer view change");
handle_peer_view_change(ctx, state, peer_id, view).await;
},
PeerDisconnected(peer_id) => {
gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
state.peer_data.remove(&peer_id);
state.peer_ids.remove(&peer_id);
},
OurViewChange(view) => {
gum::trace!(target: LOG_TARGET, ?view, "Own view change");
handle_our_view_change(ctx, state, view).await?;
},
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
},
UpdatedAuthorityIds(peer_id, authority_ids) => {
gum::trace!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Updated authority ids");
if let Some(version) = state.peer_data.get(&peer_id).map(|d| d.version) {
if state.peer_ids.insert(peer_id, authority_ids).is_none() {
declare(ctx, state, &peer_id, version).await;
}
}
},
NewGossipTopology { .. } => {
},
}
Ok(())
}
#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
async fn handle_our_view_change<Context>(
ctx: &mut Context,
state: &mut State,
view: OurView,
) -> Result<()> {
let current_leaves = state.active_leaves.clone();
let removed = current_leaves.iter().filter(|(h, _)| !view.contains(h));
let added = view.iter().filter(|h| !current_leaves.contains_key(h));
for leaf in added {
let mode = prospective_parachains_mode(ctx.sender(), *leaf).await?;
state.active_leaves.insert(*leaf, mode);
state.per_relay_parent.insert(*leaf, PerRelayParent::new(mode));
if mode.is_enabled() {
if let Some(ref mut implicit_view) = state.implicit_view {
implicit_view
.activate_leaf(ctx.sender(), *leaf)
.await
.map_err(Error::ImplicitViewFetchError)?;
let allowed_ancestry = implicit_view
.known_allowed_relay_parents_under(leaf, state.collating_on)
.unwrap_or_default();
let peers = state
.peer_data
.iter_mut()
.filter_map(|(id, data)| {
data.unknown_heads.remove(leaf).map(|_| (id, data.version))
})
.collect::<Vec<_>>();
for block_hash in allowed_ancestry {
let per_relay_parent = state
.per_relay_parent
.entry(*block_hash)
.or_insert_with(|| PerRelayParent::new(mode));
for (peer_id, peer_version) in &peers {
advertise_collation(
ctx,
*block_hash,
per_relay_parent,
&peer_id,
*peer_version,
&state.peer_ids,
&mut state.advertisement_timeouts,
&state.metrics,
)
.await;
}
}
}
}
}
for (leaf, mode) in removed {
state.active_leaves.remove(leaf);
let pruned = if mode.is_enabled() {
state
.implicit_view
.as_mut()
.map(|view| view.deactivate_leaf(*leaf))
.unwrap_or_default()
} else {
vec![*leaf]
};
for removed in &pruned {
gum::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");
let collations = state
.per_relay_parent
.remove(removed)
.map(|per_relay_parent| per_relay_parent.collations)
.unwrap_or_default();
for collation_with_core in collations.into_values() {
let collation = collation_with_core.collation();
let candidate_hash = collation.receipt.hash();
state.collation_result_senders.remove(&candidate_hash);
state.validator_groups_buf.remove_candidate(&candidate_hash);
match collation.status {
CollationStatus::Created => gum::warn!(
target: LOG_TARGET,
candidate_hash = ?collation.receipt.hash(),
pov_hash = ?collation.pov.hash(),
"Collation wasn't advertised to any validator.",
),
CollationStatus::Advertised => gum::debug!(
target: LOG_TARGET,
candidate_hash = ?collation.receipt.hash(),
pov_hash = ?collation.pov.hash(),
"Collation was advertised but not requested by any validator.",
),
CollationStatus::Requested => gum::debug!(
target: LOG_TARGET,
candidate_hash = ?collation.receipt.hash(),
pov_hash = ?collation.pov.hash(),
"Collation was requested.",
),
}
}
state.waiting_collation_fetches.remove(removed);
}
}
Ok(())
}
#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
pub(crate) async fn run<Context>(
ctx: Context,
local_peer_id: PeerId,
collator_pair: CollatorPair,
req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
metrics: Metrics,
) -> std::result::Result<(), FatalError> {
run_inner(
ctx,
local_peer_id,
collator_pair,
req_v1_receiver,
req_v2_receiver,
metrics,
ReputationAggregator::default(),
REPUTATION_CHANGE_INTERVAL,
)
.await
}
#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
async fn run_inner<Context>(
mut ctx: Context,
local_peer_id: PeerId,
collator_pair: CollatorPair,
mut req_v1_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
mut req_v2_receiver: IncomingRequestReceiver<request_v2::CollationFetchingRequest>,
metrics: Metrics,
reputation: ReputationAggregator,
reputation_interval: Duration,
) -> std::result::Result<(), FatalError> {
use OverseerSignal::*;
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();
let mut state = State::new(local_peer_id, collator_pair, metrics, reputation);
let mut runtime = RuntimeInfo::new(None);
loop {
let reputation_changes = || vec![COST_INVALID_REQUEST];
let recv_req_v1 = req_v1_receiver.recv(reputation_changes).fuse();
let recv_req_v2 = req_v2_receiver.recv(reputation_changes).fuse();
pin_mut!(recv_req_v1);
pin_mut!(recv_req_v2);
let mut reconnect_timeout = &mut state.reconnect_timeout;
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
msg = ctx.recv().fuse() => match msg.map_err(FatalError::SubsystemReceive)? {
FromOrchestra::Communication { msg } => {
log_error(
process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
"Failed to process message"
)?;
},
FromOrchestra::Signal(ActiveLeaves(update)) => {
if update.activated.is_some() {
*reconnect_timeout = futures_timer::Delay::new(RECONNECT_AFTER_LEAF_TIMEOUT).fuse();
}
}
FromOrchestra::Signal(BlockFinalized(..)) => {}
FromOrchestra::Signal(Conclude) => return Ok(()),
},
CollationSendResult { relay_parent, candidate_hash, peer_id, timed_out } =
state.active_collation_fetches.select_next_some() => {
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
if timed_out {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?peer_id,
?candidate_hash,
"Sending collation to validator timed out, carrying on with next validator."
);
} else {
for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
state.validator_groups_buf.reset_validator_interest(candidate_hash, authority_id);
}
waiting.waiting_peers.remove(&(peer_id, candidate_hash));
}
if let Some(next) = waiting.req_queue.pop_front() {
next
} else {
waiting.collation_fetch_active = false;
continue
}
} else {
continue
};
let next_collation_with_core = {
let per_relay_parent = match state.per_relay_parent.get(&relay_parent) {
Some(per_relay_parent) => per_relay_parent,
None => continue,
};
match (per_relay_parent.prospective_parachains_mode, &next) {
(ProspectiveParachainsMode::Disabled, VersionedCollationRequest::V1(_)) => {
per_relay_parent.collations.values().next()
},
(ProspectiveParachainsMode::Enabled { .. }, VersionedCollationRequest::V2(req)) => {
per_relay_parent.collations.get(&req.payload.candidate_hash)
},
_ => {
continue
},
}
};
if let Some(collation_with_core) = next_collation_with_core {
let collation = collation_with_core.collation();
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();
let parent_head_data = collation.parent_head_data.clone();
send_collation(&mut state, next, receipt, pov, parent_head_data).await;
}
},
(candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => {
for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
state
.validator_groups_buf
.reset_validator_interest(candidate_hash, &authority_id);
}
}
_ = reconnect_timeout => {
connect_to_validators(&mut ctx, &state.validator_groups_buf).await;
gum::trace!(
target: LOG_TARGET,
timeout = ?RECONNECT_AFTER_LEAF_TIMEOUT,
"Peer-set updated due to a timeout"
);
},
in_req = recv_req_v1 => {
let request = in_req.map(VersionedCollationRequest::from);
log_error(
handle_incoming_request(&mut ctx, &mut state, request).await,
"Handling incoming collation fetch request V1"
)?;
}
in_req = recv_req_v2 => {
let request = in_req.map(VersionedCollationRequest::from);
log_error(
handle_incoming_request(&mut ctx, &mut state, request).await,
"Handling incoming collation fetch request V2"
)?;
}
}
}
}