View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import java.util.concurrent.ExecutorService;
22  import java.util.concurrent.LinkedBlockingQueue;
23  import java.util.concurrent.RejectedExecutionException;
24  import java.util.concurrent.SynchronousQueue;
25  import java.util.concurrent.ThreadPoolExecutor;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
33  import org.apache.hadoop.hbase.util.Threads;
34  import org.apache.thrift.TException;
35  import org.apache.thrift.TProcessor;
36  import org.apache.thrift.protocol.TProtocol;
37  import org.apache.thrift.server.TServer;
38  import org.apache.thrift.server.TThreadPoolServer;
39  import org.apache.thrift.transport.TServerTransport;
40  import org.apache.thrift.transport.TSocket;
41  import org.apache.thrift.transport.TTransport;
42  import org.apache.thrift.transport.TTransportException;
43  
44  import com.google.common.util.concurrent.ThreadFactoryBuilder;
45  
46  /**
47   * A bounded thread pool server customized for HBase.
48   */
49  @InterfaceAudience.Private
50  public class TBoundedThreadPoolServer extends TServer {
51  
52    private static final String QUEUE_FULL_MSG =
53        "Queue is full, closing connection";
54  
55    /**
56     * The "core size" of the thread pool. New threads are created on every
57     * connection until this many threads are created.
58     */
59    public static final String MIN_WORKER_THREADS_CONF_KEY =
60        "hbase.thrift.minWorkerThreads";
61  
62    /**
63     * This default core pool size should be enough for many test scenarios. We
64     * want to override this with a much larger number (e.g. at least 200) for a
65     * large-scale production setup.
66     */
67    public static final int DEFAULT_MIN_WORKER_THREADS = 16;
68  
69    /**
70     * The maximum size of the thread pool. When the pending request queue
71     * overflows, new threads are created until their number reaches this number.
72     * After that, the server starts dropping connections.
73     */
74    public static final String MAX_WORKER_THREADS_CONF_KEY =
75        "hbase.thrift.maxWorkerThreads";
76  
77    public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
78  
79    /**
80     * The maximum number of pending connections waiting in the queue. If there
81     * are no idle threads in the pool, the server queues requests. Only when
82     * the queue overflows, new threads are added, up to
83     * hbase.thrift.maxQueuedRequests threads.
84     */
85    public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
86        "hbase.thrift.maxQueuedRequests";
87  
88    public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
89  
90    /**
91     * Default amount of time in seconds to keep a thread alive. Worker threads
92     * are stopped after being idle for this long.
93     */
94    public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
95        "hbase.thrift.threadKeepAliveTimeSec";
96  
97    private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
98  
99    /**
100    * Time to wait after interrupting all worker threads. This is after a clean
101    * shutdown has been attempted.
102    */
103   public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
104 
105   private static final Log LOG = LogFactory.getLog(
106       TBoundedThreadPoolServer.class.getName());
107 
108   private final CallQueue callQueue;
109 
110   public static class Args extends TThreadPoolServer.Args {
111     int maxQueuedRequests;
112     int threadKeepAliveTimeSec;
113 
114     public Args(TServerTransport transport, Configuration conf) {
115       super(transport);
116       minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
117           DEFAULT_MIN_WORKER_THREADS);
118       maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
119           DEFAULT_MAX_WORKER_THREADS);
120       maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
121           DEFAULT_MAX_QUEUED_REQUESTS);
122       threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY,
123           DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
124     }
125 
126     @Override
127     public String toString() {
128       return "min worker threads=" + minWorkerThreads
129           + ", max worker threads=" + maxWorkerThreads 
130           + ", max queued requests=" + maxQueuedRequests;
131     }
132   }
133 
134   /** Executor service for handling client connections */
135   private ExecutorService executorService;
136 
137   /** Flag for stopping the server */
138   private volatile boolean stopped;
139 
140   private Args serverOptions;
141 
142   public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
143     super(options);
144 
145     if (options.maxQueuedRequests > 0) {
146       this.callQueue = new CallQueue(
147           new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
148     } else {
149       this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
150     }
151 
152     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
153     tfb.setDaemon(true);
154     tfb.setNameFormat("thrift-worker-%d");
155     executorService =
156         new ThreadPoolExecutor(options.minWorkerThreads,
157             options.maxWorkerThreads, options.threadKeepAliveTimeSec,
158             TimeUnit.SECONDS, this.callQueue, tfb.build());
159     serverOptions = options;
160   }
161 
162   public void serve() {
163     try {
164       serverTransport_.listen();
165     } catch (TTransportException ttx) {
166       LOG.error("Error occurred during listening.", ttx);
167       return;
168     }
169 
170     Runtime.getRuntime().addShutdownHook(
171         new Thread(getClass().getSimpleName() + "-shutdown-hook") {
172           @Override
173           public void run() {
174             TBoundedThreadPoolServer.this.stop();
175           }
176         });
177 
178     stopped = false;
179     while (!stopped && !Thread.interrupted()) {
180       TTransport client = null;
181       try {
182         client = serverTransport_.accept();
183       } catch (TTransportException ttx) {
184         if (!stopped) {
185           LOG.warn("Transport error when accepting message", ttx);
186           continue;
187         } else {
188           // The server has been stopped
189           break;
190         }
191       }
192 
193       ClientConnnection command = new ClientConnnection(client);
194       try {
195         executorService.execute(command);
196       } catch (RejectedExecutionException rex) {
197         if (client.getClass() == TSocket.class) {
198           // We expect the client to be TSocket.
199           LOG.warn(QUEUE_FULL_MSG + " from " +
200               ((TSocket) client).getSocket().getRemoteSocketAddress());
201         } else {
202           LOG.warn(QUEUE_FULL_MSG, rex);
203         }
204         client.close();
205       }
206     }
207 
208     shutdownServer();
209   }
210 
211   /**
212    * Loop until {@link ExecutorService#awaitTermination} finally does return
213    * without an interrupted exception. If we don't do this, then we'll shut
214    * down prematurely. We want to let the executor service clear its task
215    * queue, closing client sockets appropriately.
216    */
217   private void shutdownServer() {
218     executorService.shutdown();
219 
220     long msLeftToWait =
221         serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
222     long timeMillis = System.currentTimeMillis();
223 
224     LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
225         " pending requests");
226     boolean interrupted = false;
227     while (msLeftToWait >= 0) {
228       try {
229         executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
230         break;
231       } catch (InterruptedException ix) {
232         long timePassed = System.currentTimeMillis() - timeMillis;
233         msLeftToWait -= timePassed;
234         timeMillis += timePassed;
235         interrupted = true;
236       }
237     }
238 
239     LOG.info("Interrupting all worker threads and waiting for "
240         + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
241 
242     // This will interrupt all the threads, even those running a task.
243     executorService.shutdownNow();
244     Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
245 
246     // Preserve the interrupted status.
247     if (interrupted) {
248       Thread.currentThread().interrupt();
249     }
250     LOG.info("Thrift server shutdown complete");
251   }
252 
253   @Override
254   public void stop() {
255     stopped = true;
256     serverTransport_.interrupt();
257   }
258 
259   private class ClientConnnection implements Runnable {
260 
261     private TTransport client;
262 
263     /**
264      * Default constructor.
265      *
266      * @param client Transport to process
267      */
268     private ClientConnnection(TTransport client) {
269       this.client = client;
270     }
271 
272     /**
273      * Loops on processing a client forever
274      */
275     public void run() {
276       TProcessor processor = null;
277       TTransport inputTransport = null;
278       TTransport outputTransport = null;
279       TProtocol inputProtocol = null;
280       TProtocol outputProtocol = null;
281       try {
282         processor = processorFactory_.getProcessor(client);
283         inputTransport = inputTransportFactory_.getTransport(client);
284         outputTransport = outputTransportFactory_.getTransport(client);
285         inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
286         outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
287         // we check stopped_ first to make sure we're not supposed to be shutting
288         // down. this is necessary for graceful shutdown.
289         while (!stopped && processor.process(inputProtocol, outputProtocol)) {}
290       } catch (TTransportException ttx) {
291         // Assume the client died and continue silently
292       } catch (TException tx) {
293         LOG.error("Thrift error occurred during processing of message.", tx);
294       } catch (Exception x) {
295         LOG.error("Error occurred during processing of message.", x);
296       }
297 
298       if (inputTransport != null) {
299         inputTransport.close();
300       }
301 
302       if (outputTransport != null) {
303         outputTransport.close();
304       }
305     }
306   }
307 }