use crate::{
protocol::libp2p::kademlia::{
record::{Key as RecordKey, ProviderRecord, Record},
schema,
types::{ConnectionType, KademliaPeer},
},
PeerId,
};
use bytes::{Bytes, BytesMut};
use prost::Message;
use std::time::{Duration, Instant};
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::message";
#[derive(Debug, Clone)]
pub enum KademliaMessage {
FindNode {
target: Vec<u8>,
peers: Vec<KademliaPeer>,
},
PutValue {
record: Record,
},
GetRecord {
key: Option<RecordKey>,
record: Option<Record>,
peers: Vec<KademliaPeer>,
},
AddProvider {
key: RecordKey,
providers: Vec<KademliaPeer>,
},
GetProviders {
key: Option<RecordKey>,
peers: Vec<KademliaPeer>,
providers: Vec<KademliaPeer>,
},
}
impl KademliaMessage {
pub fn find_node<T: Into<Vec<u8>>>(key: T) -> Bytes {
let message = schema::kademlia::Message {
key: key.into(),
r#type: schema::kademlia::MessageType::FindNode.into(),
cluster_level_raw: 10,
..Default::default()
};
let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
buf.freeze()
}
pub fn put_value(record: Record) -> Bytes {
let message = schema::kademlia::Message {
key: record.key.clone().into(),
r#type: schema::kademlia::MessageType::PutValue.into(),
record: Some(record_to_schema(record)),
cluster_level_raw: 10,
..Default::default()
};
let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("BytesMut to provide needed capacity");
buf.freeze()
}
pub fn get_record(key: RecordKey) -> Bytes {
let message = schema::kademlia::Message {
key: key.clone().into(),
r#type: schema::kademlia::MessageType::GetValue.into(),
cluster_level_raw: 10,
..Default::default()
};
let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("BytesMut to provide needed capacity");
buf.freeze()
}
pub fn find_node_response<K: AsRef<[u8]>>(key: K, peers: Vec<KademliaPeer>) -> Vec<u8> {
let message = schema::kademlia::Message {
key: key.as_ref().to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::FindNode.into(),
closer_peers: peers.iter().map(|peer| peer.into()).collect(),
..Default::default()
};
let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
buf
}
pub fn get_value_response(
key: RecordKey,
peers: Vec<KademliaPeer>,
record: Option<Record>,
) -> Vec<u8> {
let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetValue.into(),
closer_peers: peers.iter().map(|peer| peer.into()).collect(),
record: record.map(record_to_schema),
..Default::default()
};
let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
buf
}
#[allow(unused)]
pub fn add_provider(provider: ProviderRecord) -> Vec<u8> {
let peer = KademliaPeer::new(
provider.provider,
provider.addresses,
ConnectionType::CanConnect, );
let message = schema::kademlia::Message {
key: provider.key.clone().to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::AddProvider.into(),
provider_peers: std::iter::once((&peer).into()).collect(),
..Default::default()
};
let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
buf
}
#[allow(unused)]
pub fn get_providers_request(key: RecordKey) -> Vec<u8> {
let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
..Default::default()
};
let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
buf
}
pub fn get_providers_response(
key: RecordKey,
providers: Vec<ProviderRecord>,
closer_peers: &[KademliaPeer],
) -> Vec<u8> {
debug_assert!(providers.iter().all(|p| p.key == key));
let provider_peers = providers
.into_iter()
.map(|p| {
KademliaPeer::new(
p.provider,
p.addresses,
ConnectionType::CanConnect, )
})
.map(|p| (&p).into())
.collect();
let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
closer_peers: closer_peers.iter().map(Into::into).collect(),
provider_peers,
..Default::default()
};
let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
buf
}
pub fn from_bytes(bytes: BytesMut) -> Option<Self> {
match schema::kademlia::Message::decode(bytes) {
Ok(message) => match message.r#type {
4 => {
let peers = message
.closer_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();
Some(Self::FindNode {
target: message.key,
peers,
})
}
0 => {
let record = message.record?;
Some(Self::PutValue {
record: record_from_schema(record)?,
})
}
1 => {
let key = match message.key.is_empty() {
true => message.record.as_ref().and_then(|record| {
(!record.key.is_empty()).then_some(RecordKey::from(record.key.clone()))
}),
false => Some(RecordKey::from(message.key.clone())),
};
let record = if let Some(record) = message.record {
Some(record_from_schema(record)?)
} else {
None
};
Some(Self::GetRecord {
key,
record,
peers: message
.closer_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect(),
})
}
2 => {
let key = (!message.key.is_empty()).then_some(message.key.into())?;
let providers = message
.provider_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();
Some(Self::AddProvider { key, providers })
}
3 => {
let key = (!message.key.is_empty()).then_some(message.key.into());
let peers = message
.closer_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();
let providers = message
.provider_peers
.iter()
.filter_map(|peer| KademliaPeer::try_from(peer).ok())
.collect();
Some(Self::GetProviders {
key,
peers,
providers,
})
}
message_type => {
tracing::warn!(target: LOG_TARGET, ?message_type, "unhandled message");
None
}
},
Err(error) => {
tracing::debug!(target: LOG_TARGET, ?error, "failed to decode message");
None
}
}
}
}
fn record_to_schema(record: Record) -> schema::kademlia::Record {
schema::kademlia::Record {
key: record.key.into(),
value: record.value,
time_received: String::new(),
publisher: record.publisher.map(|peer_id| peer_id.to_bytes()).unwrap_or_default(),
ttl: record
.expires
.map(|expires| {
let now = Instant::now();
if expires > now {
u32::try_from((expires - now).as_secs()).unwrap_or(u32::MAX)
} else {
1 }
})
.unwrap_or(0),
}
}
fn record_from_schema(record: schema::kademlia::Record) -> Option<Record> {
Some(Record {
key: record.key.into(),
value: record.value,
publisher: if !record.publisher.is_empty() {
Some(PeerId::from_bytes(&record.publisher).ok()?)
} else {
None
},
expires: if record.ttl > 0 {
Some(Instant::now() + Duration::from_secs(record.ttl as u64))
} else {
None
},
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn non_empty_publisher_and_ttl_are_preserved() {
let expires = Instant::now() + Duration::from_secs(3600);
let record = Record {
key: vec![1, 2, 3].into(),
value: vec![17],
publisher: Some(PeerId::random()),
expires: Some(expires),
};
let got_record = record_from_schema(record_to_schema(record.clone())).unwrap();
assert_eq!(got_record.key, record.key);
assert_eq!(got_record.value, record.value);
assert_eq!(got_record.publisher, record.publisher);
let got_expires = got_record.expires.unwrap();
assert!(got_expires - expires >= Duration::ZERO);
assert!(got_expires - expires < Duration::from_secs(10));
}
#[test]
fn empty_publisher_and_ttl_are_preserved() {
let record = Record {
key: vec![1, 2, 3].into(),
value: vec![17],
publisher: None,
expires: None,
};
let got_record = record_from_schema(record_to_schema(record.clone())).unwrap();
assert_eq!(got_record, record);
}
}