referrerpolicy=no-referrer-when-downgrade

sc_network_sync/
pending_responses.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PendingResponses`] is responsible for keeping track of pending responses and
20//! polling them. [`Stream`] implemented by [`PendingResponses`] never terminates.
21
22use crate::{strategy::StrategyKey, LOG_TARGET};
23use futures::{
24	channel::oneshot,
25	future::BoxFuture,
26	stream::{BoxStream, FusedStream, Stream},
27	FutureExt, StreamExt,
28};
29use log::error;
30use std::any::Any;
31
32use sc_network::{request_responses::RequestFailure, types::ProtocolName};
33use sc_network_types::PeerId;
34use std::task::{Context, Poll, Waker};
35use tokio_stream::StreamMap;
36
37/// Response result.
38type ResponseResult =
39	Result<Result<(Box<dyn Any + Send>, ProtocolName), RequestFailure>, oneshot::Canceled>;
40
41/// A future yielding [`ResponseResult`].
42pub(crate) type ResponseFuture = BoxFuture<'static, ResponseResult>;
43
44/// An event we receive once a pending response future resolves.
45pub(crate) struct ResponseEvent {
46	pub peer_id: PeerId,
47	pub key: StrategyKey,
48	pub response: ResponseResult,
49}
50
51/// Stream taking care of polling pending responses.
52pub(crate) struct PendingResponses {
53	/// Pending responses
54	pending_responses: StreamMap<(PeerId, StrategyKey), BoxStream<'static, ResponseResult>>,
55	/// Waker to implement never terminating stream
56	waker: Option<Waker>,
57}
58
59impl PendingResponses {
60	pub fn new() -> Self {
61		Self { pending_responses: StreamMap::new(), waker: None }
62	}
63
64	pub fn insert(&mut self, peer_id: PeerId, key: StrategyKey, response_future: ResponseFuture) {
65		if self
66			.pending_responses
67			.insert((peer_id, key), Box::pin(response_future.into_stream()))
68			.is_some()
69		{
70			error!(
71				target: LOG_TARGET,
72				"Discarded pending response from peer {peer_id}, strategy key: {key:?}.",
73			);
74			debug_assert!(false);
75		}
76
77		if let Some(waker) = self.waker.take() {
78			waker.wake();
79		}
80	}
81
82	pub fn remove(&mut self, peer_id: PeerId, key: StrategyKey) -> bool {
83		self.pending_responses.remove(&(peer_id, key)).is_some()
84	}
85
86	pub fn remove_all(&mut self, peer_id: &PeerId) {
87		let to_remove = self
88			.pending_responses
89			.keys()
90			.filter(|(peer, _key)| peer == peer_id)
91			.cloned()
92			.collect::<Vec<_>>();
93		to_remove.iter().for_each(|k| {
94			self.pending_responses.remove(k);
95		});
96	}
97
98	pub fn len(&self) -> usize {
99		self.pending_responses.len()
100	}
101}
102
103impl Stream for PendingResponses {
104	type Item = ResponseEvent;
105
106	fn poll_next(
107		mut self: std::pin::Pin<&mut Self>,
108		cx: &mut Context<'_>,
109	) -> Poll<Option<Self::Item>> {
110		match self.pending_responses.poll_next_unpin(cx) {
111			Poll::Ready(Some(((peer_id, key), response))) => {
112				// We need to manually remove the stream, because `StreamMap` doesn't know yet that
113				// it's going to yield `None`, so may not remove it before the next request is made
114				// to the same peer.
115				self.pending_responses.remove(&(peer_id, key));
116
117				Poll::Ready(Some(ResponseEvent { peer_id, key, response }))
118			},
119			Poll::Ready(None) | Poll::Pending => {
120				self.waker = Some(cx.waker().clone());
121
122				Poll::Pending
123			},
124		}
125	}
126}
127
128// As [`PendingResponses`] never terminates, we can easily implement [`FusedStream`] for it.
129impl FusedStream for PendingResponses {
130	fn is_terminated(&self) -> bool {
131		false
132	}
133}