referrerpolicy=no-referrer-when-downgrade

polkadot_node_subsystem_util/
nesting_sender.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! ## Background
18//!
19//! Writing concurrent and even multithreaded by default is inconvenient and slow: No references
20//! hence lots of needless cloning and data duplication, locks, mutexes, ... We should reach
21//! for concurrency and parallelism when there is an actual need, not just because we can and it is
22//! reasonably safe in Rust.
23//!
24//! I very much agree with many points in this blog post for example:
25//!
26//! <https://maciej.codes/2022-06-09-local-async.html>
27//!
28//! Another very good post by Pierre (Tomaka):
29//!
30//! <https://tomaka.medium.com/a-look-back-at-asynchronous-rust-d54d63934a1c>
31//!
32//! ## Architecture
33//!
34//! This module helps with this in part. It does not break the multithreaded by default approach,
35//! but it breaks the `spawn everything` approach. So once you `spawn` you will still be
36//! multithreaded by default, despite that for most tasks we spawn (which just wait for network or
37//! some message to arrive), that is very much pointless and needless overhead. You will just spawn
38//! less in the first place.
39//!
40//! By default your code is single threaded, except when actually needed:
41//! 		- need to wait for long running synchronous IO (a threaded runtime is actually useful here)
42//! 		- need to wait for some async event (message to arrive)
43//! 		- need to do some hefty CPU bound processing (a thread is required here as well)
44//!
45//! and it is not acceptable to block the main task for waiting for the result, because we actually
46//! really have other things to do or at least need to stay responsive just in case.
47//!
48//! With the types and traits in this module you can achieve exactly that: You write modules which
49//! just execute logic and can call into the functions of other modules - yes we are calling normal
50//! functions. For the case a module you are calling into requires an occasional background task,
51//! you provide it with a `NestingSender<M, ChildModuleMessage>` that it can pass to any spawned
52//! tasks.
53//!
54//! This way you don't have to spawn a task for each module just for it to be able to handle
55//! asynchronous events. The module relies on the using/enclosing code/module to forward it any
56//! asynchronous messages in a structured way.
57//!
58//! What makes this architecture nice is the separation of concerns - at the top you only have to
59//! provide a sender and dispatch received messages to the root module - it is completely
60//! irrelevant how complex that module is, it might consist of child modules also having the need
61//! to spawn and receive messages, which in turn do the same, still the root logic stays unchanged.
62//! Everything is isolated to the level where it belongs, while we still keep a single task scope
63//! in all non blocking/not CPU intensive parts, which allows us to share data via references for
64//! example.
65//!
66//! Because the wrapping is optional and transparent to the lower modules, each module can also be
67//! used at the top directly without any wrapping, e.g. for standalone use or for testing purposes.
68//!
69//! Checkout the documentation of [`NestingSender`][nesting_sender::NestingSender] below for a basic
70//! usage example. For a real world usage I would like to point you to the dispute-distribution
71//! subsystem which makes use of this architecture.
72//!
73//! ## Limitations
74//!
75//! Nothing is ever for free of course: Each level adds an indirect function call to message
76//! sending. which should be cheap enough for most applications, but something to keep in mind. In
77//! particular we avoided the use of of async traits, which would have required memory allocations
78//! on each send. Also cloning of [`NestingSender`][nesting_sender::NestingSender] is more
79//! expensive than cloning a plain mpsc::Sender, the overhead should be negligible though.
80//!
81//! Further limitations: Because everything is routed to the same channel, it is not possible with
82//! this approach to put back pressure on only a single source (as all are the same). If a module
83//! has a task that requires this, it indeed has to spawn a long running task which can do the
84//! back-pressure on that message source or we make it its own subsystem. This is just one of the
85//! situations that justifies the complexity of asynchrony.
86
87use std::{convert::identity, sync::Arc};
88
89use futures::{channel::mpsc, SinkExt};
90
91/// A message sender that supports sending nested messages.
92///
93/// This sender wraps an `mpsc::Sender` and a conversion function for converting given messages of
94/// type `Mnested` to the message type actually supported by the mpsc (`M`).
95///
96/// Example:
97///
98/// ```rust
99///     # use polkadot_node_subsystem_util::nesting_sender::NestingSender;
100///
101///     enum RootMessage {
102///         Child1Message(ChildMessage),
103///         Child2Message(OtherChildMessage),
104///         SomeOwnMessage,
105///     }
106///
107///     enum ChildMessage {
108///         TaskFinished(u32),
109///     }
110///
111///     enum OtherChildMessage {
112///         QueryResult(bool),
113///     }
114///
115///     // We would then pass in a `NestingSender` to our child module of the following type:
116///     type ChildSender = NestingSender<RootMessage, ChildMessage>;
117///
118///     // Types in the child module can (and should) be generic over the root type:
119///     struct ChildState<M> {
120///         tx: NestingSender<M, ChildMessage>,
121///     }
122///
123///
124///    // Create the root message sender:
125///
126///    let (root_sender, receiver) = NestingSender::new_root(1);
127///    // Get a sender for the child module based on that root sender:
128///    let child_sender = NestingSender::new(root_sender.clone(), RootMessage::Child1Message);
129///    // pass `child_sender` to child module ...
130/// ```
131///
132/// `ChildMessage` could itself have a constructor with messages of a child of its own and can use
133/// `NestingSender::new` with its own sender and a conversion function to provide a further nested
134/// sender, suitable for the child module.
135pub struct NestingSender<M, Mnested> {
136	sender: mpsc::Sender<M>,
137	conversion: Arc<dyn Fn(Mnested) -> M + 'static + Send + Sync>,
138}
139
140impl<M> NestingSender<M, M>
141where
142	M: 'static,
143{
144	/// Create a new "root" sender.
145	///
146	/// This is a sender that directly passes messages to the internal mpsc.
147	///
148	/// Params: The channel size of the created mpsc.
149	/// Returns: The newly constructed `NestingSender` and the corresponding mpsc receiver.
150	pub fn new_root(channel_size: usize) -> (Self, mpsc::Receiver<M>) {
151		let (sender, receiver) = mpsc::channel(channel_size);
152		let s = Self { sender, conversion: Arc::new(identity) };
153		(s, receiver)
154	}
155}
156
157impl<M, Mnested> NestingSender<M, Mnested>
158where
159	M: 'static,
160	Mnested: 'static,
161{
162	/// Create a new `NestingSender` which wraps a given "parent" sender.
163	///
164	/// By passing in a necessary conversion from `Mnested` to `Mparent` (the `Mnested` of the
165	/// parent sender), we can construct a derived `NestingSender<M, Mnested>` from a
166	/// `NestingSender<M, Mparent>`.
167	///
168	/// Resulting sender does the following conversion:
169	///
170	/// ```text
171	///    Mnested -> Mparent -> M
172	///    Inputs:
173	///    	F(Mparent) -> M (via parent)
174	///    	F(Mnested) -> Mparent (via child_conversion)
175	///    Result: F(Mnested) -> M
176	/// ```
177	pub fn new<Mparent>(
178		parent: NestingSender<M, Mparent>,
179		child_conversion: fn(Mnested) -> Mparent,
180	) -> Self
181	where
182		Mparent: 'static,
183	{
184		let NestingSender { sender, conversion } = parent;
185		Self { sender, conversion: Arc::new(move |x| conversion(child_conversion(x))) }
186	}
187
188	/// Send a message via the underlying mpsc.
189	///
190	/// Necessary conversion is accomplished.
191	pub async fn send_message(&mut self, m: Mnested) -> Result<(), mpsc::SendError> {
192		// Flushing on an mpsc means to wait for the receiver to pick up the data - we don't want
193		// to wait for that.
194		self.sender.feed((self.conversion)(m)).await
195	}
196}
197
198// Helper traits and implementations:
199
200impl<M, Mnested> Clone for NestingSender<M, Mnested>
201where
202	M: 'static,
203	Mnested: 'static,
204{
205	fn clone(&self) -> Self {
206		Self { sender: self.sender.clone(), conversion: self.conversion.clone() }
207	}
208}