thrift/server/
threaded.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use log::warn;
19
20use std::net::{TcpListener, TcpStream, ToSocketAddrs};
21use std::sync::Arc;
22use threadpool::ThreadPool;
23
24use crate::protocol::{
25    TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory,
26};
27use crate::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
28use crate::{ApplicationError, ApplicationErrorKind};
29
30use super::TProcessor;
31use crate::TransportErrorKind;
32
33/// Fixed-size thread-pool blocking Thrift server.
34///
35/// A `TServer` listens on a given address and submits accepted connections
36/// to an **unbounded** queue. Connections from this queue are serviced by
37/// the first available worker thread from a **fixed-size** thread pool. Each
38/// accepted connection is handled by that worker thread, and communication
39/// over this thread occurs sequentially and synchronously (i.e. calls block).
40/// Accepted connections have an input half and an output half, each of which
41/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
42/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
43/// and `TTransport` may be used.
44///
45/// # Examples
46///
47/// Creating and running a `TServer` using Thrift-compiler-generated
48/// service code.
49///
50/// ```no_run
51/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
52/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
53/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
54/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
55///                         TReadTransportFactory, TWriteTransportFactory};
56/// use thrift::server::{TProcessor, TServer};
57///
58/// //
59/// // auto-generated
60/// //
61///
62/// // processor for `SimpleService`
63/// struct SimpleServiceSyncProcessor;
64/// impl SimpleServiceSyncProcessor {
65///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
66///         unimplemented!();
67///     }
68/// }
69///
70/// // `TProcessor` implementation for `SimpleService`
71/// impl TProcessor for SimpleServiceSyncProcessor {
72///     fn process(&self, i: &mut dyn TInputProtocol, o: &mut dyn TOutputProtocol) -> thrift::Result<()> {
73///         unimplemented!();
74///     }
75/// }
76///
77/// // service functions for SimpleService
78/// trait SimpleServiceSyncHandler {
79///     fn service_call(&self) -> thrift::Result<()>;
80/// }
81///
82/// //
83/// // user-code follows
84/// //
85///
86/// // define a handler that will be invoked when `service_call` is received
87/// struct SimpleServiceHandlerImpl;
88/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
89///     fn service_call(&self) -> thrift::Result<()> {
90///         unimplemented!();
91///     }
92/// }
93///
94/// // instantiate the processor
95/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
96///
97/// // instantiate the server
98/// let i_tr_fact: Box<dyn TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
99/// let i_pr_fact: Box<dyn TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
100/// let o_tr_fact: Box<dyn TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
101/// let o_pr_fact: Box<dyn TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
102///
103/// let mut server = TServer::new(
104///     i_tr_fact,
105///     i_pr_fact,
106///     o_tr_fact,
107///     o_pr_fact,
108///     processor,
109///     10
110/// );
111///
112/// // start listening for incoming connections
113/// match server.listen("127.0.0.1:8080") {
114///   Ok(_)  => println!("listen completed"),
115///   Err(e) => println!("listen failed with error {:?}", e),
116/// }
117/// ```
118#[derive(Debug)]
119pub struct TServer<PRC, RTF, IPF, WTF, OPF>
120where
121    PRC: TProcessor + Send + Sync + 'static,
122    RTF: TReadTransportFactory + 'static,
123    IPF: TInputProtocolFactory + 'static,
124    WTF: TWriteTransportFactory + 'static,
125    OPF: TOutputProtocolFactory + 'static,
126{
127    r_trans_factory: RTF,
128    i_proto_factory: IPF,
129    w_trans_factory: WTF,
130    o_proto_factory: OPF,
131    processor: Arc<PRC>,
132    worker_pool: ThreadPool,
133}
134
135impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
136where
137    PRC: TProcessor + Send + Sync + 'static,
138    RTF: TReadTransportFactory + 'static,
139    IPF: TInputProtocolFactory + 'static,
140    WTF: TWriteTransportFactory + 'static,
141    OPF: TOutputProtocolFactory + 'static,
142{
143    /// Create a `TServer`.
144    ///
145    /// Each accepted connection has an input and output half, each of which
146    /// requires a `TTransport` and `TProtocol`. `TServer` uses
147    /// `read_transport_factory` and `input_protocol_factory` to create
148    /// implementations for the input, and `write_transport_factory` and
149    /// `output_protocol_factory` to create implementations for the output.
150    pub fn new(
151        read_transport_factory: RTF,
152        input_protocol_factory: IPF,
153        write_transport_factory: WTF,
154        output_protocol_factory: OPF,
155        processor: PRC,
156        num_workers: usize,
157    ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
158        TServer {
159            r_trans_factory: read_transport_factory,
160            i_proto_factory: input_protocol_factory,
161            w_trans_factory: write_transport_factory,
162            o_proto_factory: output_protocol_factory,
163            processor: Arc::new(processor),
164            worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
165        }
166    }
167
168    /// Listen for incoming connections on `listen_address`.
169    ///
170    /// `listen_address` should implement `ToSocketAddrs` trait.
171    ///
172    /// Return `()` if successful.
173    ///
174    /// Return `Err` when the server cannot bind to `listen_address` or there
175    /// is an unrecoverable error.
176    pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()> {
177        let listener = TcpListener::bind(listen_address)?;
178        for stream in listener.incoming() {
179            match stream {
180                Ok(s) => {
181                    let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
182                    let processor = self.processor.clone();
183                    self.worker_pool
184                        .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
185                }
186                Err(e) => {
187                    warn!("failed to accept remote connection with error {:?}", e);
188                }
189            }
190        }
191
192        Err(crate::Error::Application(ApplicationError {
193            kind: ApplicationErrorKind::Unknown,
194            message: "aborted listen loop".into(),
195        }))
196    }
197
198    fn new_protocols_for_connection(
199        &mut self,
200        stream: TcpStream,
201    ) -> crate::Result<(
202        Box<dyn TInputProtocol + Send>,
203        Box<dyn TOutputProtocol + Send>,
204    )> {
205        // create the shared tcp stream
206        let channel = TTcpChannel::with_stream(stream);
207
208        // split it into two - one to be owned by the
209        // input tran/proto and the other by the output
210        let (r_chan, w_chan) = channel.split()?;
211
212        // input protocol and transport
213        let r_tran = self.r_trans_factory.create(Box::new(r_chan));
214        let i_prot = self.i_proto_factory.create(r_tran);
215
216        // output protocol and transport
217        let w_tran = self.w_trans_factory.create(Box::new(w_chan));
218        let o_prot = self.o_proto_factory.create(w_tran);
219
220        Ok((i_prot, o_prot))
221    }
222}
223
224fn handle_incoming_connection<PRC>(
225    processor: Arc<PRC>,
226    i_prot: Box<dyn TInputProtocol>,
227    o_prot: Box<dyn TOutputProtocol>,
228) where
229    PRC: TProcessor,
230{
231    let mut i_prot = i_prot;
232    let mut o_prot = o_prot;
233    loop {
234        match processor.process(&mut *i_prot, &mut *o_prot) {
235            Ok(()) => {}
236            Err(err) => {
237                match err {
238                    crate::Error::Transport(ref transport_err)
239                        if transport_err.kind == TransportErrorKind::EndOfFile => {}
240                    other => warn!("processor completed with error: {:?}", other),
241                }
242                break;
243            }
244        }
245    }
246}