View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import java.util.concurrent.ExecutorService;
22  import java.util.concurrent.LinkedBlockingQueue;
23  import java.util.concurrent.RejectedExecutionException;
24  import java.util.concurrent.SynchronousQueue;
25  import java.util.concurrent.ThreadPoolExecutor;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
33  import org.apache.hadoop.hbase.util.Threads;
34  import org.apache.thrift.TException;
35  import org.apache.thrift.TProcessor;
36  import org.apache.thrift.protocol.TProtocol;
37  import org.apache.thrift.server.TServer;
38  import org.apache.thrift.server.TThreadPoolServer;
39  import org.apache.thrift.transport.TServerTransport;
40  import org.apache.thrift.transport.TSocket;
41  import org.apache.thrift.transport.TTransport;
42  import org.apache.thrift.transport.TTransportException;
43  
44  import com.google.common.util.concurrent.ThreadFactoryBuilder;
45  
46  /**
47   * A bounded thread pool server customized for HBase.
48   */
49  @InterfaceAudience.Private
50  public class TBoundedThreadPoolServer extends TServer {
51  
52    private static final String QUEUE_FULL_MSG =
53        "Queue is full, closing connection";
54  
55    /**
56     * The "core size" of the thread pool. New threads are created on every
57     * connection until this many threads are created.
58     */
59    public static final String MIN_WORKER_THREADS_CONF_KEY =
60        "hbase.thrift.minWorkerThreads";
61  
62    /**
63     * This default core pool size should be enough for many test scenarios. We
64     * want to override this with a much larger number (e.g. at least 200) for a
65     * large-scale production setup.
66     */
67    public static final int DEFAULT_MIN_WORKER_THREADS = 16;
68  
69    /**
70     * The maximum size of the thread pool. When the pending request queue
71     * overflows, new threads are created until their number reaches this number.
72     * After that, the server starts dropping connections.
73     */
74    public static final String MAX_WORKER_THREADS_CONF_KEY =
75        "hbase.thrift.maxWorkerThreads";
76  
77    public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
78  
79    /**
80     * The maximum number of pending connections waiting in the queue. If there
81     * are no idle threads in the pool, the server queues requests. Only when
82     * the queue overflows, new threads are added, up to
83     * hbase.thrift.maxQueuedRequests threads.
84     */
85    public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
86        "hbase.thrift.maxQueuedRequests";
87  
88    public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
89  
90    /**
91     * Default amount of time in seconds to keep a thread alive. Worker threads
92     * are stopped after being idle for this long.
93     */
94    public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
95        "hbase.thrift.threadKeepAliveTimeSec";
96  
97    private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
98  
99    /**
100    * Time to wait after interrupting all worker threads. This is after a clean
101    * shutdown has been attempted.
102    */
103   public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
104 
105   private static final Log LOG = LogFactory.getLog(
106       TBoundedThreadPoolServer.class.getName());
107 
108   private final CallQueue callQueue;
109 
110   public static class Args extends TThreadPoolServer.Args {
111     int maxQueuedRequests;
112     int threadKeepAliveTimeSec;
113 
114     public Args(TServerTransport transport, Configuration conf) {
115       super(transport);
116       minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
117           DEFAULT_MIN_WORKER_THREADS);
118       maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
119           DEFAULT_MAX_WORKER_THREADS);
120       maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
121           DEFAULT_MAX_QUEUED_REQUESTS);
122       threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY,
123           DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
124     }
125 
126     @Override
127     public String toString() {
128       return "min worker threads=" + minWorkerThreads
129           + ", max worker threads=" + maxWorkerThreads 
130           + ", max queued requests=" + maxQueuedRequests;
131     }
132   }
133 
134   /** Executor service for handling client connections */
135   private ThreadPoolExecutor executorService;
136 
137   /** Flag for stopping the server */
138   private volatile boolean stopped;
139 
140   private Args serverOptions;
141 
142   public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
143     super(options);
144 
145     int minWorkerThreads = options.minWorkerThreads;
146     int maxWorkerThreads = options.maxWorkerThreads;
147     if (options.maxQueuedRequests > 0) {
148       this.callQueue = new CallQueue(
149           new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
150       minWorkerThreads = maxWorkerThreads;
151     } else {
152       this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
153     }
154
155     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
156     tfb.setDaemon(true);
157     tfb.setNameFormat("thrift-worker-%d");
158     executorService =
159         new ThreadPoolExecutor(minWorkerThreads,
160             maxWorkerThreads, options.threadKeepAliveTimeSec,
161             TimeUnit.SECONDS, this.callQueue, tfb.build());
162     executorService.allowCoreThreadTimeOut(true);
163     serverOptions = options;
164   }
165
166   public void serve() {
167     try {
168       serverTransport_.listen();
169     } catch (TTransportException ttx) {
170       LOG.error("Error occurred during listening.", ttx);
171       return;
172     }
173
174     Runtime.getRuntime().addShutdownHook(
175         new Thread(getClass().getSimpleName() + "-shutdown-hook") {
176           @Override
177           public void run() {
178             TBoundedThreadPoolServer.this.stop();
179           }
180         });
181
182     stopped = false;
183     while (!stopped && !Thread.interrupted()) {
184       TTransport client = null;
185       try {
186         client = serverTransport_.accept();
187       } catch (TTransportException ttx) {
188         if (!stopped) {
189           LOG.warn("Transport error when accepting message", ttx);
190           continue;
191         } else {
192           // The server has been stopped
193           break;
194         }
195       }
196
197       ClientConnnection command = new ClientConnnection(client);
198       try {
199         executorService.execute(command);
200       } catch (RejectedExecutionException rex) {
201         if (client.getClass() == TSocket.class) {
202           // We expect the client to be TSocket.
203           LOG.warn(QUEUE_FULL_MSG + " from " +
204               ((TSocket) client).getSocket().getRemoteSocketAddress());
205         } else {
206           LOG.warn(QUEUE_FULL_MSG, rex);
207         }
208         client.close();
209       }
210     }
211
212     shutdownServer();
213   }
214
215   /**
216    * Loop until {@link ExecutorService#awaitTermination} finally does return
217    * without an interrupted exception. If we don't do this, then we'll shut
218    * down prematurely. We want to let the executor service clear its task
219    * queue, closing client sockets appropriately.
220    */
221   private void shutdownServer() {
222     executorService.shutdown();
223 
224     long msLeftToWait =
225         serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
226     long timeMillis = System.currentTimeMillis();
227
228     LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
229         " pending requests");
230     boolean interrupted = false;
231     while (msLeftToWait >= 0) {
232       try {
233         executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
234         break;
235       } catch (InterruptedException ix) {
236         long timePassed = System.currentTimeMillis() - timeMillis;
237         msLeftToWait -= timePassed;
238         timeMillis += timePassed;
239         interrupted = true;
240       }
241     }
242
243     LOG.info("Interrupting all worker threads and waiting for "
244         + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
245 
246     // This will interrupt all the threads, even those running a task.
247     executorService.shutdownNow();
248     Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
249
250     // Preserve the interrupted status.
251     if (interrupted) {
252       Thread.currentThread().interrupt();
253     }
254     LOG.info("Thrift server shutdown complete");
255   }
256
257   @Override
258   public void stop() {
259     stopped = true;
260     serverTransport_.interrupt();
261   }
262 
263   private class ClientConnnection implements Runnable {
264
265     private TTransport client;
266
267     /**
268      * Default constructor.
269      *
270      * @param client Transport to process
271      */
272     private ClientConnnection(TTransport client) {
273       this.client = client;
274     }
275
276     /**
277      * Loops on processing a client forever
278      */
279     public void run() {
280       TProcessor processor = null;
281       TTransport inputTransport = null;
282       TTransport outputTransport = null;
283       TProtocol inputProtocol = null;
284       TProtocol outputProtocol = null;
285       try {
286         processor = processorFactory_.getProcessor(client);
287         inputTransport = inputTransportFactory_.getTransport(client);
288         outputTransport = outputTransportFactory_.getTransport(client);
289         inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
290         outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
291         // we check stopped_ first to make sure we're not supposed to be shutting
292         // down. this is necessary for graceful shutdown.
293         while (!stopped && processor.process(inputProtocol, outputProtocol)) {}
294       } catch (TTransportException ttx) {
295         // Assume the client died and continue silently
296       } catch (TException tx) {
297         LOG.error("Thrift error occurred during processing of message.", tx);
298       } catch (Exception x) {
299         LOG.error("Error occurred during processing of message.", x);
300       }
301 
302       if (inputTransport != null) {
303         inputTransport.close();
304       }
305
306       if (outputTransport != null) {
307         outputTransport.close();
308       }
309     }
310   }
311 }