1use futures::{
4 channel::mpsc::{unbounded, UnboundedSender},
5 Stream,
6};
7use netlink_packet_core::NetlinkMessage;
8use std::fmt::Debug;
9
10use crate::{errors::Error, sys::SocketAddr, Request};
11
12#[derive(Clone, Debug)]
14pub struct ConnectionHandle<T>
15where
16 T: Debug,
17{
18 requests_tx: UnboundedSender<Request<T>>,
19}
20
21impl<T> ConnectionHandle<T>
22where
23 T: Debug,
24{
25 pub(crate) fn new(requests_tx: UnboundedSender<Request<T>>) -> Self {
26 ConnectionHandle { requests_tx }
27 }
28
29 pub fn request(
36 &mut self,
37 message: NetlinkMessage<T>,
38 destination: SocketAddr,
39 ) -> Result<impl Stream<Item = NetlinkMessage<T>>, Error<T>> {
40 let (tx, rx) = unbounded::<NetlinkMessage<T>>();
41 let request = Request::from((message, destination, tx));
42 debug!("handle: forwarding new request to connection");
43 UnboundedSender::unbounded_send(&self.requests_tx, request).map_err(|e| {
44 if e.is_full() {
47 panic!("internal error: unbounded channel full?!");
48 } else if e.is_disconnected() {
49 Error::ConnectionClosed
50 } else {
51 panic!("unknown error: {:?}", e);
52 }
53 })?;
54 Ok(rx)
55 }
56
57 pub fn notify(
58 &mut self,
59 message: NetlinkMessage<T>,
60 destination: SocketAddr,
61 ) -> Result<(), Error<T>> {
62 let (tx, _rx) = unbounded::<NetlinkMessage<T>>();
63 let request = Request::from((message, destination, tx));
64 debug!("handle: forwarding new request to connection");
65 UnboundedSender::unbounded_send(&self.requests_tx, request)
66 .map_err(|_| Error::ConnectionClosed)
67 }
68}