sc_network_sync/
pending_responses.rs1use crate::{strategy::StrategyKey, types::PeerRequest, LOG_TARGET};
23use futures::{
24 channel::oneshot,
25 future::BoxFuture,
26 stream::{BoxStream, FusedStream, Stream},
27 FutureExt, StreamExt,
28};
29use log::error;
30
31use sc_network::{request_responses::RequestFailure, types::ProtocolName};
32use sc_network_types::PeerId;
33use sp_runtime::traits::Block as BlockT;
34use std::task::{Context, Poll, Waker};
35use tokio_stream::StreamMap;
36
37type ResponseResult = Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>;
39
40type ResponseFuture = BoxFuture<'static, ResponseResult>;
42
43pub(crate) struct ResponseEvent<B: BlockT> {
45 pub peer_id: PeerId,
46 pub key: StrategyKey,
47 pub request: PeerRequest<B>,
48 pub response: ResponseResult,
49}
50
51pub(crate) struct PendingResponses<B: BlockT> {
53 pending_responses:
55 StreamMap<(PeerId, StrategyKey), BoxStream<'static, (PeerRequest<B>, ResponseResult)>>,
56 waker: Option<Waker>,
58}
59
60impl<B: BlockT> PendingResponses<B> {
61 pub fn new() -> Self {
62 Self { pending_responses: StreamMap::new(), waker: None }
63 }
64
65 pub fn insert(
66 &mut self,
67 peer_id: PeerId,
68 key: StrategyKey,
69 request: PeerRequest<B>,
70 response_future: ResponseFuture,
71 ) {
72 let request_type = request.get_type();
73
74 if self
75 .pending_responses
76 .insert(
77 (peer_id, key),
78 Box::pin(async move { (request, response_future.await) }.into_stream()),
79 )
80 .is_some()
81 {
82 error!(
83 target: LOG_TARGET,
84 "Discarded pending response from peer {peer_id}, request type: {request_type:?}.",
85 );
86 debug_assert!(false);
87 }
88
89 if let Some(waker) = self.waker.take() {
90 waker.wake();
91 }
92 }
93
94 pub fn remove(&mut self, peer_id: PeerId, key: StrategyKey) -> bool {
95 self.pending_responses.remove(&(peer_id, key)).is_some()
96 }
97
98 pub fn remove_all(&mut self, peer_id: &PeerId) {
99 let to_remove = self
100 .pending_responses
101 .keys()
102 .filter(|(peer, _key)| peer == peer_id)
103 .cloned()
104 .collect::<Vec<_>>();
105 to_remove.iter().for_each(|k| {
106 self.pending_responses.remove(k);
107 });
108 }
109
110 pub fn len(&self) -> usize {
111 self.pending_responses.len()
112 }
113}
114
115impl<B: BlockT> Stream for PendingResponses<B> {
116 type Item = ResponseEvent<B>;
117
118 fn poll_next(
119 mut self: std::pin::Pin<&mut Self>,
120 cx: &mut Context<'_>,
121 ) -> Poll<Option<Self::Item>> {
122 match self.pending_responses.poll_next_unpin(cx) {
123 Poll::Ready(Some(((peer_id, key), (request, response)))) => {
124 self.pending_responses.remove(&(peer_id, key));
128
129 Poll::Ready(Some(ResponseEvent { peer_id, key, request, response }))
130 },
131 Poll::Ready(None) | Poll::Pending => {
132 self.waker = Some(cx.waker().clone());
133
134 Poll::Pending
135 },
136 }
137 }
138}
139
140impl<B: BlockT> FusedStream for PendingResponses<B> {
142 fn is_terminated(&self) -> bool {
143 false
144 }
145}