sc_network_sync/
pending_responses.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PendingResponses`] is responsible for keeping track of pending responses and
20//! polling them. [`Stream`] implemented by [`PendingResponses`] never terminates.
21
22use crate::{strategy::StrategyKey, types::PeerRequest, LOG_TARGET};
23use futures::{
24	channel::oneshot,
25	future::BoxFuture,
26	stream::{BoxStream, FusedStream, Stream},
27	FutureExt, StreamExt,
28};
29use log::error;
30
31use sc_network::{request_responses::RequestFailure, types::ProtocolName};
32use sc_network_types::PeerId;
33use sp_runtime::traits::Block as BlockT;
34use std::task::{Context, Poll, Waker};
35use tokio_stream::StreamMap;
36
37/// Response result.
38type ResponseResult = Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>;
39
40/// A future yielding [`ResponseResult`].
41type ResponseFuture = BoxFuture<'static, ResponseResult>;
42
43/// An event we receive once a pending response future resolves.
44pub(crate) struct ResponseEvent<B: BlockT> {
45	pub peer_id: PeerId,
46	pub key: StrategyKey,
47	pub request: PeerRequest<B>,
48	pub response: ResponseResult,
49}
50
51/// Stream taking care of polling pending responses.
52pub(crate) struct PendingResponses<B: BlockT> {
53	/// Pending responses
54	pending_responses:
55		StreamMap<(PeerId, StrategyKey), BoxStream<'static, (PeerRequest<B>, ResponseResult)>>,
56	/// Waker to implement never terminating stream
57	waker: Option<Waker>,
58}
59
60impl<B: BlockT> PendingResponses<B> {
61	pub fn new() -> Self {
62		Self { pending_responses: StreamMap::new(), waker: None }
63	}
64
65	pub fn insert(
66		&mut self,
67		peer_id: PeerId,
68		key: StrategyKey,
69		request: PeerRequest<B>,
70		response_future: ResponseFuture,
71	) {
72		let request_type = request.get_type();
73
74		if self
75			.pending_responses
76			.insert(
77				(peer_id, key),
78				Box::pin(async move { (request, response_future.await) }.into_stream()),
79			)
80			.is_some()
81		{
82			error!(
83				target: LOG_TARGET,
84				"Discarded pending response from peer {peer_id}, request type: {request_type:?}.",
85			);
86			debug_assert!(false);
87		}
88
89		if let Some(waker) = self.waker.take() {
90			waker.wake();
91		}
92	}
93
94	pub fn remove(&mut self, peer_id: PeerId, key: StrategyKey) -> bool {
95		self.pending_responses.remove(&(peer_id, key)).is_some()
96	}
97
98	pub fn remove_all(&mut self, peer_id: &PeerId) {
99		let to_remove = self
100			.pending_responses
101			.keys()
102			.filter(|(peer, _key)| peer == peer_id)
103			.cloned()
104			.collect::<Vec<_>>();
105		to_remove.iter().for_each(|k| {
106			self.pending_responses.remove(k);
107		});
108	}
109
110	pub fn len(&self) -> usize {
111		self.pending_responses.len()
112	}
113}
114
115impl<B: BlockT> Stream for PendingResponses<B> {
116	type Item = ResponseEvent<B>;
117
118	fn poll_next(
119		mut self: std::pin::Pin<&mut Self>,
120		cx: &mut Context<'_>,
121	) -> Poll<Option<Self::Item>> {
122		match self.pending_responses.poll_next_unpin(cx) {
123			Poll::Ready(Some(((peer_id, key), (request, response)))) => {
124				// We need to manually remove the stream, because `StreamMap` doesn't know yet that
125				// it's going to yield `None`, so may not remove it before the next request is made
126				// to the same peer.
127				self.pending_responses.remove(&(peer_id, key));
128
129				Poll::Ready(Some(ResponseEvent { peer_id, key, request, response }))
130			},
131			Poll::Ready(None) | Poll::Pending => {
132				self.waker = Some(cx.waker().clone());
133
134				Poll::Pending
135			},
136		}
137	}
138}
139
140// As [`PendingResponses`] never terminates, we can easily implement [`FusedStream`] for it.
141impl<B: BlockT> FusedStream for PendingResponses<B> {
142	fn is_terminated(&self) -> bool {
143		false
144	}
145}