use bytes::Bytes;
use crate::{
protocol::libp2p::kademlia::{
message::KademliaMessage,
query::{QueryAction, QueryId},
record::{Key as RecordKey, PeerRecord, Record},
types::{Distance, KademliaPeer, Key},
Quorum,
},
PeerId,
};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::get_record";
#[derive(Debug)]
pub struct GetRecordConfig {
pub local_peer_id: PeerId,
pub known_records: usize,
pub quorum: Quorum,
pub replication_factor: usize,
pub parallelism_factor: usize,
pub query: QueryId,
pub target: Key<RecordKey>,
}
impl GetRecordConfig {
fn sufficient_records(&self, records: usize) -> bool {
let total_known = self.known_records + records;
match self.quorum {
Quorum::All => total_known >= self.replication_factor,
Quorum::One => total_known >= 1,
Quorum::N(needed_responses) => total_known >= needed_responses.get(),
}
}
}
#[derive(Debug)]
pub struct GetRecordContext {
pub config: GetRecordConfig,
kad_message: Bytes,
pub pending: HashMap<PeerId, KademliaPeer>,
pub queried: HashSet<PeerId>,
pub candidates: BTreeMap<Distance, KademliaPeer>,
pub found_records: Vec<PeerRecord>,
}
impl GetRecordContext {
pub fn new(config: GetRecordConfig, in_peers: VecDeque<KademliaPeer>) -> Self {
let mut candidates = BTreeMap::new();
for candidate in &in_peers {
let distance = config.target.distance(&candidate.key);
candidates.insert(distance, candidate.clone());
}
let kad_message = KademliaMessage::get_record(config.target.clone().into_preimage());
Self {
config,
kad_message,
candidates,
pending: HashMap::new(),
queried: HashSet::new(),
found_records: Vec::new(),
}
}
pub fn found_records(self) -> Vec<PeerRecord> {
self.found_records
}
pub fn register_response_failure(&mut self, peer: PeerId) {
let Some(peer) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist");
return;
};
self.queried.insert(peer.peer);
}
pub fn register_response(
&mut self,
peer: PeerId,
record: Option<Record>,
peers: Vec<KademliaPeer>,
) {
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer");
let Some(peer) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it");
return;
};
if let Some(record) = record {
if !record.is_expired(std::time::Instant::now()) {
self.found_records.push(PeerRecord {
peer: peer.peer,
record,
});
}
}
self.queried.insert(peer.peer);
let to_query_candidate = peers.into_iter().filter_map(|peer| {
if self.queried.contains(&peer.peer) {
return None;
}
if self.pending.contains_key(&peer.peer) {
return None;
}
if self.config.local_peer_id == peer.peer {
return None;
}
Some(peer)
});
for candidate in to_query_candidate {
let distance = self.config.target.distance(&candidate.key);
self.candidates.insert(distance, candidate);
}
}
pub fn next_peer_action(&mut self, peer: &PeerId) -> Option<QueryAction> {
self.pending.contains_key(peer).then_some(QueryAction::SendMessage {
query: self.config.query,
peer: *peer,
message: self.kad_message.clone(),
})
}
fn schedule_next_peer(&mut self) -> Option<QueryAction> {
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer");
let (_, candidate) = self.candidates.pop_first()?;
let peer = candidate.peer;
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate");
self.pending.insert(candidate.peer, candidate);
Some(QueryAction::SendMessage {
query: self.config.query,
peer,
message: self.kad_message.clone(),
})
}
fn is_done(&self) -> bool {
self.pending.is_empty() && self.candidates.is_empty()
}
pub fn next_action(&mut self) -> Option<QueryAction> {
let known_records = self.config.known_records + self.found_records.len();
if self.is_done() {
return if known_records == 0 {
Some(QueryAction::QueryFailed {
query: self.config.query,
})
} else {
Some(QueryAction::QuerySucceeded {
query: self.config.query,
})
};
}
let sufficient_records = self.config.sufficient_records(self.found_records.len());
if sufficient_records {
return Some(QueryAction::QuerySucceeded {
query: self.config.query,
});
}
if self.pending.len() == self.config.parallelism_factor {
return None;
}
self.schedule_next_peer()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::libp2p::kademlia::types::ConnectionType;
fn default_config() -> GetRecordConfig {
GetRecordConfig {
local_peer_id: PeerId::random(),
quorum: Quorum::All,
known_records: 0,
replication_factor: 20,
parallelism_factor: 10,
query: QueryId(0),
target: Key::new(vec![1, 2, 3].into()),
}
}
fn peer_to_kad(peer: PeerId) -> KademliaPeer {
KademliaPeer {
peer,
key: Key::from(peer),
addresses: vec![],
connection: ConnectionType::Connected,
}
}
#[test]
fn config_check() {
let config = GetRecordConfig {
quorum: Quorum::All,
known_records: 0,
replication_factor: 20,
..default_config()
};
assert!(config.sufficient_records(20));
assert!(!config.sufficient_records(19));
let config = GetRecordConfig {
quorum: Quorum::All,
known_records: 1,
replication_factor: 20,
..default_config()
};
assert!(config.sufficient_records(19));
assert!(!config.sufficient_records(18));
let config = GetRecordConfig {
quorum: Quorum::One,
known_records: 0,
..default_config()
};
assert!(config.sufficient_records(1));
assert!(!config.sufficient_records(0));
let config = GetRecordConfig {
quorum: Quorum::One,
known_records: 1,
..default_config()
};
assert!(config.sufficient_records(1));
assert!(config.sufficient_records(0));
let config = GetRecordConfig {
quorum: Quorum::N(std::num::NonZeroUsize::new(10).expect("valid; qed")),
known_records: 0,
..default_config()
};
assert!(config.sufficient_records(10));
assert!(!config.sufficient_records(9));
let config = GetRecordConfig {
quorum: Quorum::N(std::num::NonZeroUsize::new(10).expect("valid; qed")),
known_records: 1,
..default_config()
};
assert!(config.sufficient_records(9));
assert!(!config.sufficient_records(8));
}
#[test]
fn completes_when_no_candidates() {
let config = default_config();
let mut context = GetRecordContext::new(config, VecDeque::new());
assert!(context.is_done());
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
let config = GetRecordConfig {
known_records: 1,
..default_config()
};
let mut context = GetRecordContext::new(config, VecDeque::new());
assert!(context.is_done());
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
}
#[test]
fn fulfill_parallelism() {
let config = GetRecordConfig {
parallelism_factor: 3,
..default_config()
};
let in_peers_set: HashSet<_> =
[PeerId::random(), PeerId::random(), PeerId::random()].into_iter().collect();
assert_eq!(in_peers_set.len(), 3);
let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetRecordContext::new(config, in_peers);
for num in 0..3 {
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
assert_eq!(context.pending.len(), num + 1);
assert!(context.pending.contains_key(&peer));
assert!(in_peers_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}
}
assert!(context.next_action().is_none());
}
#[test]
fn completes_when_responses() {
let key = vec![1, 2, 3];
let config = GetRecordConfig {
parallelism_factor: 3,
replication_factor: 3,
..default_config()
};
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();
let in_peers_set: HashSet<_> = [peer_a, peer_b, peer_c].into_iter().collect();
assert_eq!(in_peers_set.len(), 3);
let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetRecordContext::new(config, in_peers);
for num in 0..3 {
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
assert_eq!(context.pending.len(), num + 1);
assert!(context.pending.contains_key(&peer));
assert!(in_peers_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}
}
let peer_d = PeerId::random();
context.register_response_failure(peer_d);
assert_eq!(context.pending.len(), 3);
assert!(context.queried.is_empty());
let record = Record::new(key.clone(), vec![1, 2, 3]);
context.register_response(peer_a, Some(record), vec![]);
assert_eq!(context.pending.len(), 2);
assert_eq!(context.queried.len(), 1);
assert_eq!(context.found_records.len(), 1);
let record = Record::new(key.clone(), vec![4, 5, 6]);
context.register_response(peer_b, Some(record), vec![peer_to_kad(peer_d.clone())]);
assert_eq!(context.pending.len(), 1);
assert_eq!(context.queried.len(), 2);
assert_eq!(context.found_records.len(), 2);
assert_eq!(context.candidates.len(), 1);
context.register_response_failure(peer_c);
assert!(context.pending.is_empty());
assert_eq!(context.queried.len(), 3);
assert_eq!(context.found_records.len(), 2);
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
assert_eq!(context.pending.len(), 1);
assert_eq!(peer, peer_d);
}
_ => panic!("Unexpected event"),
}
let record = Record::new(key.clone(), vec![4, 5, 6]);
context.register_response(peer_d, Some(record), vec![]);
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
let found_records = context.found_records();
assert_eq!(
found_records,
vec![
PeerRecord {
peer: peer_a,
record: Record::new(key.clone(), vec![1, 2, 3]),
},
PeerRecord {
peer: peer_b,
record: Record::new(key.clone(), vec![4, 5, 6]),
},
PeerRecord {
peer: peer_d,
record: Record::new(key.clone(), vec![4, 5, 6]),
},
]
);
}
}