001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.LinkedBlockingQueue;
023import java.util.concurrent.RejectedExecutionException;
024import java.util.concurrent.SynchronousQueue;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.util.Threads;
030import org.apache.thrift.TException;
031import org.apache.thrift.TProcessor;
032import org.apache.thrift.protocol.TProtocol;
033import org.apache.thrift.server.TServer;
034import org.apache.thrift.server.TThreadPoolServer;
035import org.apache.thrift.transport.TServerTransport;
036import org.apache.thrift.transport.TSocket;
037import org.apache.thrift.transport.TTransport;
038import org.apache.thrift.transport.TTransportException;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
043
044/**
045 * A bounded thread pool server customized for HBase.
046 */
047@InterfaceAudience.Private
048public class TBoundedThreadPoolServer extends TServer {
049
050  private static final String QUEUE_FULL_MSG =
051      "Queue is full, closing connection";
052
053  /**
054   * The "core size" of the thread pool. New threads are created on every
055   * connection until this many threads are created.
056   */
057  public static final String MIN_WORKER_THREADS_CONF_KEY =
058      "hbase.thrift.minWorkerThreads";
059
060  /**
061   * This default core pool size should be enough for many test scenarios. We
062   * want to override this with a much larger number (e.g. at least 200) for a
063   * large-scale production setup.
064   */
065  public static final int DEFAULT_MIN_WORKER_THREADS = 16;
066
067  /**
068   * The maximum size of the thread pool. When the pending request queue
069   * overflows, new threads are created until their number reaches this number.
070   * After that, the server starts dropping connections.
071   */
072  public static final String MAX_WORKER_THREADS_CONF_KEY =
073      "hbase.thrift.maxWorkerThreads";
074
075  public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
076
077  /**
078   * The maximum number of pending connections waiting in the queue. If there
079   * are no idle threads in the pool, the server queues requests. Only when
080   * the queue overflows, new threads are added, up to
081   * hbase.thrift.maxQueuedRequests threads.
082   */
083  public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
084      "hbase.thrift.maxQueuedRequests";
085
086  public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
087
088  /**
089   * Default amount of time in seconds to keep a thread alive. Worker threads
090   * are stopped after being idle for this long.
091   */
092  public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
093      "hbase.thrift.threadKeepAliveTimeSec";
094
095  private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
096
097  /**
098   * Time to wait after interrupting all worker threads. This is after a clean
099   * shutdown has been attempted.
100   */
101  public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
102
103  private static final Logger LOG = LoggerFactory.getLogger(
104      TBoundedThreadPoolServer.class.getName());
105
106  private final CallQueue callQueue;
107
108  public static class Args extends TThreadPoolServer.Args {
109    int maxQueuedRequests;
110    int threadKeepAliveTimeSec;
111
112    public Args(TServerTransport transport, Configuration conf) {
113      super(transport);
114      minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
115          DEFAULT_MIN_WORKER_THREADS);
116      maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
117          DEFAULT_MAX_WORKER_THREADS);
118      maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
119          DEFAULT_MAX_QUEUED_REQUESTS);
120      threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY,
121          DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
122    }
123
124    @Override
125    public String toString() {
126      return "min worker threads=" + minWorkerThreads
127          + ", max worker threads=" + maxWorkerThreads 
128          + ", max queued requests=" + maxQueuedRequests;
129    }
130  }
131
132  /** Executor service for handling client connections */
133  private ThreadPoolExecutor executorService;
134
135  /** Flag for stopping the server */
136  private volatile boolean stopped;
137
138  private Args serverOptions;
139
140  public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
141    super(options);
142
143    int minWorkerThreads = options.minWorkerThreads;
144    int maxWorkerThreads = options.maxWorkerThreads;
145    if (options.maxQueuedRequests > 0) {
146      this.callQueue = new CallQueue(
147          new LinkedBlockingQueue<>(options.maxQueuedRequests), metrics);
148      minWorkerThreads = maxWorkerThreads;
149    } else {
150      this.callQueue = new CallQueue(new SynchronousQueue<>(), metrics);
151    }
152
153    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
154    tfb.setDaemon(true);
155    tfb.setNameFormat("thrift-worker-%d");
156    executorService =
157        new THBaseThreadPoolExecutor(minWorkerThreads,
158            maxWorkerThreads, options.threadKeepAliveTimeSec,
159            TimeUnit.SECONDS, this.callQueue, tfb.build(), metrics);
160    executorService.allowCoreThreadTimeOut(true);
161    serverOptions = options;
162  }
163
164  @Override
165  public void serve() {
166    try {
167      serverTransport_.listen();
168    } catch (TTransportException ttx) {
169      LOG.error("Error occurred during listening.", ttx);
170      return;
171    }
172
173    Runtime.getRuntime().addShutdownHook(
174        new Thread(getClass().getSimpleName() + "-shutdown-hook") {
175          @Override
176          public void run() {
177            TBoundedThreadPoolServer.this.stop();
178          }
179        });
180
181    stopped = false;
182    while (!stopped && !Thread.interrupted()) {
183      TTransport client = null;
184      try {
185        client = serverTransport_.accept();
186      } catch (TTransportException ttx) {
187        if (!stopped) {
188          LOG.warn("Transport error when accepting message", ttx);
189          continue;
190        } else {
191          // The server has been stopped
192          break;
193        }
194      }
195
196      ClientConnnection command = new ClientConnnection(client);
197      try {
198        executorService.execute(command);
199      } catch (RejectedExecutionException rex) {
200        if (client.getClass() == TSocket.class) {
201          // We expect the client to be TSocket.
202          LOG.warn(QUEUE_FULL_MSG + " from " +
203              ((TSocket) client).getSocket().getRemoteSocketAddress());
204        } else {
205          LOG.warn(QUEUE_FULL_MSG, rex);
206        }
207        client.close();
208      }
209    }
210
211    shutdownServer();
212  }
213
214  /**
215   * Loop until {@link ExecutorService#awaitTermination} finally does return
216   * without an interrupted exception. If we don't do this, then we'll shut
217   * down prematurely. We want to let the executor service clear its task
218   * queue, closing client sockets appropriately.
219   */
220  private void shutdownServer() {
221    executorService.shutdown();
222
223    long msLeftToWait =
224        serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
225    long timeMillis = System.currentTimeMillis();
226
227    LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
228        " pending requests");
229    boolean interrupted = false;
230    while (msLeftToWait >= 0) {
231      try {
232        executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
233        break;
234      } catch (InterruptedException ix) {
235        long timePassed = System.currentTimeMillis() - timeMillis;
236        msLeftToWait -= timePassed;
237        timeMillis += timePassed;
238        interrupted = true;
239      }
240    }
241
242    LOG.info("Interrupting all worker threads and waiting for "
243        + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
244
245    // This will interrupt all the threads, even those running a task.
246    executorService.shutdownNow();
247    Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
248
249    // Preserve the interrupted status.
250    if (interrupted) {
251      Thread.currentThread().interrupt();
252    }
253    LOG.info("Thrift server shutdown complete");
254  }
255
256  @Override
257  public void stop() {
258    stopped = true;
259    serverTransport_.interrupt();
260  }
261
262  private class ClientConnnection implements Runnable {
263
264    private TTransport client;
265
266    /**
267     * Default constructor.
268     *
269     * @param client Transport to process
270     */
271    private ClientConnnection(TTransport client) {
272      this.client = client;
273    }
274
275    /**
276     * Loops on processing a client forever
277     */
278    @Override
279    public void run() {
280      TProcessor processor = null;
281      TTransport inputTransport = null;
282      TTransport outputTransport = null;
283      TProtocol inputProtocol = null;
284      TProtocol outputProtocol = null;
285      try {
286        processor = processorFactory_.getProcessor(client);
287        inputTransport = inputTransportFactory_.getTransport(client);
288        outputTransport = outputTransportFactory_.getTransport(client);
289        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
290        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
291        // we check stopped_ first to make sure we're not supposed to be shutting
292        // down. this is necessary for graceful shutdown.
293        while (true) {
294          if (stopped) {
295            break;
296          }
297          processor.process(inputProtocol, outputProtocol);
298        }
299      } catch (TTransportException ttx) {
300        // Assume the client died and continue silently
301      } catch (TException tx) {
302        LOG.error("Thrift error occurred during processing of message.", tx);
303      } catch (Exception x) {
304        LOG.error("Error occurred during processing of message.", x);
305      }
306
307      if (inputTransport != null) {
308        inputTransport.close();
309      }
310
311      if (outputTransport != null) {
312        outputTransport.close();
313      }
314    }
315  }
316}