use crate::{
protocol::libp2p::kademlia::{
message::KademliaMessage,
query::{
find_node::{FindNodeConfig, FindNodeContext},
get_record::{GetRecordConfig, GetRecordContext},
},
record::{Key as RecordKey, Record},
types::{KademliaPeer, Key},
PeerRecord, Quorum,
},
PeerId,
};
use bytes::Bytes;
use std::collections::{HashMap, VecDeque};
use self::find_many_nodes::FindManyNodesContext;
mod find_many_nodes;
mod find_node;
mod get_record;
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct QueryId(pub usize);
#[derive(Debug)]
enum QueryType {
FindNode {
context: FindNodeContext<PeerId>,
},
PutRecord {
record: Record,
context: FindNodeContext<RecordKey>,
},
PutRecordToPeers {
record: Record,
context: FindManyNodesContext,
},
GetRecord {
context: GetRecordContext,
},
}
#[derive(Debug, PartialEq, Eq)]
pub enum QueryAction {
SendMessage {
query: QueryId,
peer: PeerId,
message: Bytes,
},
FindNodeQuerySucceeded {
query: QueryId,
target: PeerId,
peers: Vec<KademliaPeer>,
},
PutRecordToFoundNodes {
record: Record,
peers: Vec<KademliaPeer>,
},
GetRecordQueryDone {
query_id: QueryId,
records: Vec<PeerRecord>,
},
QuerySucceeded {
query: QueryId,
},
QueryFailed {
query: QueryId,
},
}
pub struct QueryEngine {
local_peer_id: PeerId,
replication_factor: usize,
parallelism_factor: usize,
queries: HashMap<QueryId, QueryType>,
}
impl QueryEngine {
pub fn new(
local_peer_id: PeerId,
replication_factor: usize,
parallelism_factor: usize,
) -> Self {
Self {
local_peer_id,
replication_factor,
parallelism_factor,
queries: HashMap::new(),
}
}
pub fn start_find_node(
&mut self,
query_id: QueryId,
target: PeerId,
candidates: VecDeque<KademliaPeer>,
) -> QueryId {
tracing::debug!(
target: LOG_TARGET,
?query_id,
?target,
num_peers = ?candidates.len(),
"start `FIND_NODE` query"
);
let target = Key::from(target);
let config = FindNodeConfig {
local_peer_id: self.local_peer_id,
replication_factor: self.replication_factor,
parallelism_factor: self.parallelism_factor,
query: query_id,
target,
};
self.queries.insert(
query_id,
QueryType::FindNode {
context: FindNodeContext::new(config, candidates),
},
);
query_id
}
pub fn start_put_record(
&mut self,
query_id: QueryId,
record: Record,
candidates: VecDeque<KademliaPeer>,
) -> QueryId {
tracing::debug!(
target: LOG_TARGET,
?query_id,
target = ?record.key,
num_peers = ?candidates.len(),
"start `PUT_VALUE` query"
);
let target = Key::new(record.key.clone());
let config = FindNodeConfig {
local_peer_id: self.local_peer_id,
replication_factor: self.replication_factor,
parallelism_factor: self.parallelism_factor,
query: query_id,
target,
};
self.queries.insert(
query_id,
QueryType::PutRecord {
record,
context: FindNodeContext::new(config, candidates),
},
);
query_id
}
pub fn start_put_record_to_peers(
&mut self,
query_id: QueryId,
record: Record,
peers_to_report: Vec<KademliaPeer>,
) -> QueryId {
tracing::debug!(
target: LOG_TARGET,
?query_id,
target = ?record.key,
num_peers = ?peers_to_report.len(),
"start `PUT_VALUE` query to peers"
);
self.queries.insert(
query_id,
QueryType::PutRecordToPeers {
record,
context: FindManyNodesContext::new(query_id, peers_to_report),
},
);
query_id
}
pub fn start_get_record(
&mut self,
query_id: QueryId,
target: RecordKey,
candidates: VecDeque<KademliaPeer>,
quorum: Quorum,
count: usize,
) -> QueryId {
tracing::debug!(
target: LOG_TARGET,
?query_id,
?target,
num_peers = ?candidates.len(),
"start `GET_VALUE` query"
);
let target = Key::new(target);
let config = GetRecordConfig {
local_peer_id: self.local_peer_id,
known_records: count,
quorum,
replication_factor: self.replication_factor,
parallelism_factor: self.parallelism_factor,
query: query_id,
target,
};
self.queries.insert(
query_id,
QueryType::GetRecord {
context: GetRecordContext::new(config, candidates),
},
);
query_id
}
pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure");
match self.queries.get_mut(&query) {
None => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
}
Some(QueryType::FindNode { context }) => {
context.register_response_failure(peer);
}
Some(QueryType::PutRecord { context, .. }) => {
context.register_response_failure(peer);
}
Some(QueryType::PutRecordToPeers { context, .. }) => {
context.register_response_failure(peer);
}
Some(QueryType::GetRecord { context }) => {
context.register_response_failure(peer);
}
}
}
pub fn register_response(&mut self, query: QueryId, peer: PeerId, message: KademliaMessage) {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response");
match self.queries.get_mut(&query) {
None => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
}
Some(QueryType::FindNode { context }) => match message {
KademliaMessage::FindNode { peers, .. } => {
context.register_response(peer, peers);
}
_ => unreachable!(),
},
Some(QueryType::PutRecord { context, .. }) => match message {
KademliaMessage::FindNode { peers, .. } => {
context.register_response(peer, peers);
}
_ => unreachable!(),
},
Some(QueryType::PutRecordToPeers { context, .. }) => match message {
KademliaMessage::FindNode { peers, .. } => {
context.register_response(peer, peers);
}
_ => unreachable!(),
},
Some(QueryType::GetRecord { context }) => match message {
KademliaMessage::GetRecord { record, peers, .. } => {
context.register_response(peer, record, peers);
}
_ => unreachable!(),
},
}
}
pub fn next_peer_action(&mut self, query: &QueryId, peer: &PeerId) -> Option<QueryAction> {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "get next peer action");
match self.queries.get_mut(query) {
None => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
None
}
Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
}
}
fn on_query_succeeded(&mut self, query: QueryId) -> QueryAction {
match self.queries.remove(&query).expect("query to exist") {
QueryType::FindNode { context } => QueryAction::FindNodeQuerySucceeded {
query,
target: context.config.target.into_preimage(),
peers: context.responses.into_values().collect::<Vec<_>>(),
},
QueryType::PutRecord { record, context } => QueryAction::PutRecordToFoundNodes {
record,
peers: context.responses.into_values().collect::<Vec<_>>(),
},
QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes {
record,
peers: context.peers_to_report,
},
QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
query_id: context.config.query,
records: context.found_records(),
},
}
}
fn on_query_failed(&mut self, query: QueryId) -> QueryAction {
let _ = self.queries.remove(&query).expect("query to exist");
QueryAction::QueryFailed { query }
}
pub fn next_action(&mut self) -> Option<QueryAction> {
for (_, state) in self.queries.iter_mut() {
let action = match state {
QueryType::FindNode { context } => context.next_action(),
QueryType::PutRecord { context, .. } => context.next_action(),
QueryType::PutRecordToPeers { context, .. } => context.next_action(),
QueryType::GetRecord { context } => context.next_action(),
};
match action {
Some(QueryAction::QuerySucceeded { query }) => {
return Some(self.on_query_succeeded(query));
}
Some(QueryAction::QueryFailed { query }) =>
return Some(self.on_query_failed(query)),
Some(_) => return action,
_ => continue,
}
}
None
}
}
#[cfg(test)]
mod tests {
use multihash::{Code, Multihash};
use super::*;
use crate::protocol::libp2p::kademlia::types::ConnectionType;
fn make_peer_id(first: u8, second: u8) -> PeerId {
let mut peer_id = vec![0u8; 32];
peer_id[0] = first;
peer_id[1] = second;
PeerId::from_bytes(
&Multihash::wrap(Code::Identity.into(), &peer_id)
.expect("The digest size is never too large")
.to_bytes(),
)
.unwrap()
}
#[test]
fn query_fails() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
let target_peer = PeerId::random();
let _target_key = Key::from(target_peer);
let query = engine.start_find_node(
QueryId(1337),
target_peer,
vec![
KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
]
.into(),
);
for _ in 0..4 {
if let Some(QueryAction::SendMessage { query, peer, .. }) = engine.next_action() {
engine.register_response_failure(query, peer);
}
}
if let Some(QueryAction::QueryFailed { query: failed }) = engine.next_action() {
assert_eq!(failed, query);
}
assert!(engine.next_action().is_none());
}
#[test]
fn lookup_paused() {
let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
let target_peer = PeerId::random();
let _target_key = Key::from(target_peer);
let _ = engine.start_find_node(
QueryId(1338),
target_peer,
vec![
KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
KademliaPeer::new(PeerId::random(), vec![], ConnectionType::NotConnected),
]
.into(),
);
for _ in 0..3 {
let _ = engine.next_action();
}
assert!(engine.next_action().is_none());
}
#[test]
fn find_node_query_succeeds() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
let target_peer = make_peer_id(0, 0);
let target_key = Key::from(target_peer);
let distances = {
let mut distances = std::collections::BTreeMap::new();
for i in 1..64 {
let peer = make_peer_id(i, 0);
let key = Key::from(peer);
distances.insert(target_key.distance(&key), peer);
}
distances
};
let mut iter = distances.iter();
let _query = engine.start_find_node(
QueryId(1339),
target_peer,
vec![KademliaPeer::new(
*iter.next().unwrap().1,
vec![],
ConnectionType::NotConnected,
)]
.into(),
);
let action = engine.next_action();
assert!(engine.next_action().is_none());
match action {
Some(QueryAction::SendMessage { query, peer, .. }) => {
engine.register_response(
query,
peer,
KademliaMessage::FindNode {
target: Vec::new(),
peers: vec![
KademliaPeer::new(
*iter.next().unwrap().1,
vec![],
ConnectionType::NotConnected,
),
KademliaPeer::new(
*iter.next().unwrap().1,
vec![],
ConnectionType::NotConnected,
),
KademliaPeer::new(
*iter.next().unwrap().1,
vec![],
ConnectionType::NotConnected,
),
],
},
);
}
_ => panic!("invalid event received"),
}
for _ in 0..3 {
match engine.next_action() {
Some(QueryAction::SendMessage { query, peer, .. }) => {
println!("next send message to {peer:?}");
engine.register_response(
query,
peer,
KademliaMessage::FindNode {
target: Vec::new(),
peers: vec![],
},
);
}
_ => panic!("invalid event received"),
}
}
match engine.next_action() {
Some(QueryAction::FindNodeQuerySucceeded { peers, .. }) => {
assert_eq!(peers.len(), 4);
}
_ => panic!("invalid event received"),
}
assert!(engine.next_action().is_none());
}
#[test]
fn put_record_succeeds() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
let target_key = Key::new(record_key.clone());
let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);
let distances = {
let mut distances = std::collections::BTreeMap::new();
for i in 1..64 {
let peer = make_peer_id(i, 0);
let key = Key::from(peer);
distances.insert(target_key.distance(&key), peer);
}
distances
};
let mut iter = distances.iter();
let _query = engine.start_put_record(
QueryId(1340),
original_record.clone(),
vec![KademliaPeer::new(
*iter.next().unwrap().1,
vec![],
ConnectionType::NotConnected,
)]
.into(),
);
let action = engine.next_action();
assert!(engine.next_action().is_none());
match action {
Some(QueryAction::SendMessage { query, peer, .. }) => {
engine.register_response(
query,
peer,
KademliaMessage::FindNode {
target: Vec::new(),
peers: vec![
KademliaPeer::new(
*iter.next().unwrap().1,
vec![],
ConnectionType::NotConnected,
),
KademliaPeer::new(
*iter.next().unwrap().1,
vec![],
ConnectionType::NotConnected,
),
KademliaPeer::new(
*iter.next().unwrap().1,
vec![],
ConnectionType::NotConnected,
),
],
},
);
}
_ => panic!("invalid event received"),
}
for _ in 0..3 {
match engine.next_action() {
Some(QueryAction::SendMessage { query, peer, .. }) => {
println!("next send message to {peer:?}");
engine.register_response(
query,
peer,
KademliaMessage::FindNode {
target: Vec::new(),
peers: vec![],
},
);
}
_ => panic!("invalid event received"),
}
}
let peers = match engine.next_action() {
Some(QueryAction::PutRecordToFoundNodes { peers, record }) => {
assert_eq!(peers.len(), 4);
assert_eq!(record.key, original_record.key);
assert_eq!(record.value, original_record.value);
peers
}
_ => panic!("invalid event received"),
};
assert!(engine.next_action().is_none());
let _query = engine.start_get_record(
QueryId(1341),
record_key.clone(),
vec![
KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
]
.into(),
Quorum::All,
3,
);
for _ in 0..4 {
match engine.next_action() {
Some(QueryAction::SendMessage { query, peer, .. }) => {
engine.register_response(
query,
peer,
KademliaMessage::GetRecord {
record: Some(original_record.clone()),
peers: vec![],
key: Some(record_key.clone()),
},
);
}
_ => panic!("invalid event received"),
}
}
let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
match engine.next_action() {
Some(QueryAction::GetRecordQueryDone { records, .. }) => {
let query_peers = records
.iter()
.map(|peer_record| peer_record.peer)
.collect::<std::collections::HashSet<_>>();
assert_eq!(peers, query_peers);
let records: std::collections::HashSet<_> =
records.into_iter().map(|peer_record| peer_record.record).collect();
assert_eq!(records.len(), 1);
let record = records.into_iter().next().unwrap();
assert_eq!(record.key, original_record.key);
assert_eq!(record.value, original_record.value);
}
_ => panic!("invalid event received"),
}
}
}