1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! [`PendingResponses`] is responsible for keeping track of pending responses and
//! polling them. [`Stream`] implemented by [`PendingResponses`] never terminates.

use crate::{strategy::StrategyKey, types::PeerRequest, LOG_TARGET};
use futures::{
	channel::oneshot,
	future::BoxFuture,
	stream::{BoxStream, FusedStream, Stream},
	FutureExt, StreamExt,
};
use log::error;

use sc_network::{request_responses::RequestFailure, types::ProtocolName};
use sc_network_types::PeerId;
use sp_runtime::traits::Block as BlockT;
use std::task::{Context, Poll, Waker};
use tokio_stream::StreamMap;

/// Response result.
type ResponseResult = Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>;

/// A future yielding [`ResponseResult`].
type ResponseFuture = BoxFuture<'static, ResponseResult>;

/// An event we receive once a pending response future resolves.
pub(crate) struct ResponseEvent<B: BlockT> {
	pub peer_id: PeerId,
	pub key: StrategyKey,
	pub request: PeerRequest<B>,
	pub response: ResponseResult,
}

/// Stream taking care of polling pending responses.
pub(crate) struct PendingResponses<B: BlockT> {
	/// Pending responses
	pending_responses:
		StreamMap<(PeerId, StrategyKey), BoxStream<'static, (PeerRequest<B>, ResponseResult)>>,
	/// Waker to implement never terminating stream
	waker: Option<Waker>,
}

impl<B: BlockT> PendingResponses<B> {
	pub fn new() -> Self {
		Self { pending_responses: StreamMap::new(), waker: None }
	}

	pub fn insert(
		&mut self,
		peer_id: PeerId,
		key: StrategyKey,
		request: PeerRequest<B>,
		response_future: ResponseFuture,
	) {
		let request_type = request.get_type();

		if self
			.pending_responses
			.insert(
				(peer_id, key),
				Box::pin(async move { (request, response_future.await) }.into_stream()),
			)
			.is_some()
		{
			error!(
				target: LOG_TARGET,
				"Discarded pending response from peer {peer_id}, request type: {request_type:?}.",
			);
			debug_assert!(false);
		}

		if let Some(waker) = self.waker.take() {
			waker.wake();
		}
	}

	pub fn remove(&mut self, peer_id: PeerId, key: StrategyKey) -> bool {
		self.pending_responses.remove(&(peer_id, key)).is_some()
	}

	pub fn remove_all(&mut self, peer_id: &PeerId) {
		let to_remove = self
			.pending_responses
			.keys()
			.filter(|(peer, _key)| peer == peer_id)
			.cloned()
			.collect::<Vec<_>>();
		to_remove.iter().for_each(|k| {
			self.pending_responses.remove(k);
		});
	}

	pub fn len(&self) -> usize {
		self.pending_responses.len()
	}
}

impl<B: BlockT> Stream for PendingResponses<B> {
	type Item = ResponseEvent<B>;

	fn poll_next(
		mut self: std::pin::Pin<&mut Self>,
		cx: &mut Context<'_>,
	) -> Poll<Option<Self::Item>> {
		match self.pending_responses.poll_next_unpin(cx) {
			Poll::Ready(Some(((peer_id, key), (request, response)))) => {
				// We need to manually remove the stream, because `StreamMap` doesn't know yet that
				// it's going to yield `None`, so may not remove it before the next request is made
				// to the same peer.
				self.pending_responses.remove(&(peer_id, key));

				Poll::Ready(Some(ResponseEvent { peer_id, key, request, response }))
			},
			Poll::Ready(None) | Poll::Pending => {
				self.waker = Some(cx.waker().clone());

				Poll::Pending
			},
		}
	}
}

// As [`PendingResponses`] never terminates, we can easily implement [`FusedStream`] for it.
impl<B: BlockT> FusedStream for PendingResponses<B> {
	fn is_terminated(&self) -> bool {
		false
	}
}