sc_network_sync/
pending_responses.rs1use crate::{strategy::StrategyKey, LOG_TARGET};
23use futures::{
24 channel::oneshot,
25 future::BoxFuture,
26 stream::{BoxStream, FusedStream, Stream},
27 FutureExt, StreamExt,
28};
29use log::error;
30use std::any::Any;
31
32use sc_network::{request_responses::RequestFailure, types::ProtocolName};
33use sc_network_types::PeerId;
34use std::task::{Context, Poll, Waker};
35use tokio_stream::StreamMap;
36
37type ResponseResult =
39 Result<Result<(Box<dyn Any + Send>, ProtocolName), RequestFailure>, oneshot::Canceled>;
40
41pub(crate) type ResponseFuture = BoxFuture<'static, ResponseResult>;
43
44pub(crate) struct ResponseEvent {
46 pub peer_id: PeerId,
47 pub key: StrategyKey,
48 pub response: ResponseResult,
49}
50
51pub(crate) struct PendingResponses {
53 pending_responses: StreamMap<(PeerId, StrategyKey), BoxStream<'static, ResponseResult>>,
55 waker: Option<Waker>,
57}
58
59impl PendingResponses {
60 pub fn new() -> Self {
61 Self { pending_responses: StreamMap::new(), waker: None }
62 }
63
64 pub fn insert(&mut self, peer_id: PeerId, key: StrategyKey, response_future: ResponseFuture) {
65 if self
66 .pending_responses
67 .insert((peer_id, key), Box::pin(response_future.into_stream()))
68 .is_some()
69 {
70 error!(
71 target: LOG_TARGET,
72 "Discarded pending response from peer {peer_id}, strategy key: {key:?}.",
73 );
74 debug_assert!(false);
75 }
76
77 if let Some(waker) = self.waker.take() {
78 waker.wake();
79 }
80 }
81
82 pub fn remove(&mut self, peer_id: PeerId, key: StrategyKey) -> bool {
83 self.pending_responses.remove(&(peer_id, key)).is_some()
84 }
85
86 pub fn remove_all(&mut self, peer_id: &PeerId) {
87 let to_remove = self
88 .pending_responses
89 .keys()
90 .filter(|(peer, _key)| peer == peer_id)
91 .cloned()
92 .collect::<Vec<_>>();
93 to_remove.iter().for_each(|k| {
94 self.pending_responses.remove(k);
95 });
96 }
97
98 pub fn len(&self) -> usize {
99 self.pending_responses.len()
100 }
101}
102
103impl Stream for PendingResponses {
104 type Item = ResponseEvent;
105
106 fn poll_next(
107 mut self: std::pin::Pin<&mut Self>,
108 cx: &mut Context<'_>,
109 ) -> Poll<Option<Self::Item>> {
110 match self.pending_responses.poll_next_unpin(cx) {
111 Poll::Ready(Some(((peer_id, key), response))) => {
112 self.pending_responses.remove(&(peer_id, key));
116
117 Poll::Ready(Some(ResponseEvent { peer_id, key, response }))
118 },
119 Poll::Ready(None) | Poll::Pending => {
120 self.waker = Some(cx.waker().clone());
121
122 Poll::Pending
123 },
124 }
125 }
126}
127
128impl FusedStream for PendingResponses {
130 fn is_terminated(&self) -> bool {
131 false
132 }
133}