View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import java.util.concurrent.ExecutorService;
22  import java.util.concurrent.LinkedBlockingQueue;
23  import java.util.concurrent.RejectedExecutionException;
24  import java.util.concurrent.SynchronousQueue;
25  import java.util.concurrent.ThreadPoolExecutor;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
32  import org.apache.hadoop.hbase.util.Threads;
33  import org.apache.thrift.TException;
34  import org.apache.thrift.TProcessor;
35  import org.apache.thrift.protocol.TProtocol;
36  import org.apache.thrift.server.TServer;
37  import org.apache.thrift.server.TThreadPoolServer;
38  import org.apache.thrift.transport.TServerTransport;
39  import org.apache.thrift.transport.TSocket;
40  import org.apache.thrift.transport.TTransport;
41  import org.apache.thrift.transport.TTransportException;
42  
43  import com.google.common.util.concurrent.ThreadFactoryBuilder;
44  
45  /**
46   * A bounded thread pool server customized for HBase.
47   */
48  public class TBoundedThreadPoolServer extends TServer {
49  
50    private static final String QUEUE_FULL_MSG =
51        "Queue is full, closing connection";
52  
53    /**
54     * The "core size" of the thread pool. New threads are created on every
55     * connection until this many threads are created.
56     */
57    public static final String MIN_WORKER_THREADS_CONF_KEY =
58        "hbase.thrift.minWorkerThreads";
59  
60    /**
61     * This default core pool size should be enough for many test scenarios. We
62     * want to override this with a much larger number (e.g. at least 200) for a
63     * large-scale production setup.
64     */
65    public static final int DEFAULT_MIN_WORKER_THREADS = 16;
66  
67    /**
68     * The maximum size of the thread pool. When the pending request queue
69     * overflows, new threads are created until their number reaches this number.
70     * After that, the server starts dropping connections.
71     */
72    public static final String MAX_WORKER_THREADS_CONF_KEY =
73        "hbase.thrift.maxWorkerThreads";
74  
75    public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
76  
77    /**
78     * The maximum number of pending connections waiting in the queue. If there
79     * are no idle threads in the pool, the server queues requests. Only when
80     * the queue overflows, new threads are added, up to
81     * hbase.thrift.maxQueuedRequests threads.
82     */
83    public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
84        "hbase.thrift.maxQueuedRequests";
85  
86    public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
87  
88    /**
89     * Default amount of time in seconds to keep a thread alive. Worker threads
90     * are stopped after being idle for this long.
91     */
92    public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
93        "hbase.thrift.threadKeepAliveTimeSec";
94  
95    private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
96  
97    /**
98     * Time to wait after interrupting all worker threads. This is after a clean
99     * shutdown has been attempted.
100    */
101   public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
102 
103   private static final Log LOG = LogFactory.getLog(
104       TBoundedThreadPoolServer.class.getName());
105 
106   private final CallQueue callQueue;
107 
108   public static class Args extends TThreadPoolServer.Args {
109     int maxQueuedRequests;
110     int threadKeepAliveTimeSec;
111 
112     public Args(TServerTransport transport, Configuration conf) {
113       super(transport);
114       minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
115           DEFAULT_MIN_WORKER_THREADS);
116       maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
117           DEFAULT_MAX_WORKER_THREADS);
118       maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
119           DEFAULT_MAX_QUEUED_REQUESTS);
120       threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY,
121           DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
122     }
123 
124     @Override
125     public String toString() {
126       return "min worker threads=" + minWorkerThreads
127           + ", max worker threads=" + maxWorkerThreads 
128           + ", max queued requests=" + maxQueuedRequests;
129     }
130   }
131 
132   /** Executor service for handling client connections */
133   private ExecutorService executorService;
134 
135   /** Flag for stopping the server */
136   private volatile boolean stopped;
137 
138   private Args serverOptions;
139 
140   public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
141     super(options);
142 
143     if (options.maxQueuedRequests > 0) {
144       this.callQueue = new CallQueue(
145           new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
146     } else {
147       this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
148     }
149 
150     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
151     tfb.setDaemon(true);
152     tfb.setNameFormat("thrift-worker-%d");
153     executorService =
154         new ThreadPoolExecutor(options.minWorkerThreads,
155             options.maxWorkerThreads, options.threadKeepAliveTimeSec,
156             TimeUnit.SECONDS, this.callQueue, tfb.build());
157     serverOptions = options;
158   }
159 
160   public void serve() {
161     try {
162       serverTransport_.listen();
163     } catch (TTransportException ttx) {
164       LOG.error("Error occurred during listening.", ttx);
165       return;
166     }
167 
168     Runtime.getRuntime().addShutdownHook(
169         new Thread(getClass().getSimpleName() + "-shutdown-hook") {
170           @Override
171           public void run() {
172             TBoundedThreadPoolServer.this.stop();
173           }
174         });
175 
176     stopped = false;
177     while (!stopped && !Thread.interrupted()) {
178       TTransport client = null;
179       try {
180         client = serverTransport_.accept();
181       } catch (TTransportException ttx) {
182         if (!stopped) {
183           LOG.warn("Transport error when accepting message", ttx);
184           continue;
185         } else {
186           // The server has been stopped
187           break;
188         }
189       }
190 
191       ClientConnnection command = new ClientConnnection(client);
192       try {
193         executorService.execute(command);
194       } catch (RejectedExecutionException rex) {
195         if (client.getClass() == TSocket.class) {
196           // We expect the client to be TSocket.
197           LOG.warn(QUEUE_FULL_MSG + " from " +
198               ((TSocket) client).getSocket().getRemoteSocketAddress());
199         } else {
200           LOG.warn(QUEUE_FULL_MSG, rex);
201         }
202         client.close();
203       }
204     }
205 
206     shutdownServer();
207   }
208 
209   /**
210    * Loop until {@link ExecutorService#awaitTermination} finally does return
211    * without an interrupted exception. If we don't do this, then we'll shut
212    * down prematurely. We want to let the executor service clear its task
213    * queue, closing client sockets appropriately.
214    */
215   private void shutdownServer() {
216     executorService.shutdown();
217 
218     long msLeftToWait =
219         serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
220     long timeMillis = System.currentTimeMillis();
221 
222     LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
223         " pending requests");
224     boolean interrupted = false;
225     while (msLeftToWait >= 0) {
226       try {
227         executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
228         break;
229       } catch (InterruptedException ix) {
230         long timePassed = System.currentTimeMillis() - timeMillis;
231         msLeftToWait -= timePassed;
232         timeMillis += timePassed;
233         interrupted = true;
234       }
235     }
236 
237     LOG.info("Interrupting all worker threads and waiting for "
238         + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
239 
240     // This will interrupt all the threads, even those running a task.
241     executorService.shutdownNow();
242     Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
243 
244     // Preserve the interrupted status.
245     if (interrupted) {
246       Thread.currentThread().interrupt();
247     }
248     LOG.info("Thrift server shutdown complete");
249   }
250 
251   @Override
252   public void stop() {
253     stopped = true;
254     serverTransport_.interrupt();
255   }
256 
257   private class ClientConnnection implements Runnable {
258 
259     private TTransport client;
260 
261     /**
262      * Default constructor.
263      *
264      * @param client Transport to process
265      */
266     private ClientConnnection(TTransport client) {
267       this.client = client;
268     }
269 
270     /**
271      * Loops on processing a client forever
272      */
273     public void run() {
274       TProcessor processor = null;
275       TTransport inputTransport = null;
276       TTransport outputTransport = null;
277       TProtocol inputProtocol = null;
278       TProtocol outputProtocol = null;
279       try {
280         processor = processorFactory_.getProcessor(client);
281         inputTransport = inputTransportFactory_.getTransport(client);
282         outputTransport = outputTransportFactory_.getTransport(client);
283         inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
284         outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
285         // we check stopped_ first to make sure we're not supposed to be shutting
286         // down. this is necessary for graceful shutdown.
287         while (!stopped && processor.process(inputProtocol, outputProtocol)) {
288            // message limit is reset for every request
289            // see THRIFT-601, working on thrift 0.8.0
290            // NOTE that THRIFT-820 breaks this again in thrift 0.9.0
291            // and TFramedTransport needs to be used from this version onwards
292            // to avoid the buffer overflow
293            inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
294            outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
295         }
296       } catch (TTransportException ttx) {
297         // Assume the client died and continue silently
298       } catch (TException tx) {
299         LOG.error("Thrift error occurred during processing of message.", tx);
300       } catch (Exception x) {
301         LOG.error("Error occurred during processing of message.", x);
302       }
303 
304       if (inputTransport != null) {
305         inputTransport.close();
306       }
307 
308       if (outputTransport != null) {
309         outputTransport.close();
310       }
311     }
312   }
313 }