001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.LinkedBlockingQueue;
022import java.util.concurrent.RejectedExecutionException;
023import java.util.concurrent.SynchronousQueue;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.hadoop.hbase.util.Threads;
029import org.apache.thrift.TException;
030import org.apache.thrift.TProcessor;
031import org.apache.thrift.protocol.TProtocol;
032import org.apache.thrift.server.TServer;
033import org.apache.thrift.server.TThreadPoolServer;
034import org.apache.thrift.transport.TServerTransport;
035import org.apache.thrift.transport.TSocket;
036import org.apache.thrift.transport.TTransport;
037import org.apache.thrift.transport.TTransportException;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
043
044/**
045 * A bounded thread pool server customized for HBase.
046 */
047@InterfaceAudience.Private
048public class TBoundedThreadPoolServer extends TServer {
049
050  private static final String QUEUE_FULL_MSG = "Queue is full, closing connection";
051
052  /**
053   * The "core size" of the thread pool. New threads are created on every connection until this many
054   * threads are created.
055   */
056  public static final String MIN_WORKER_THREADS_CONF_KEY = "hbase.thrift.minWorkerThreads";
057
058  /**
059   * This default core pool size should be enough for many test scenarios. We want to override this
060   * with a much larger number (e.g. at least 200) for a large-scale production setup.
061   */
062  public static final int DEFAULT_MIN_WORKER_THREADS = 16;
063
064  /**
065   * The maximum size of the thread pool. When the pending request queue overflows, new threads are
066   * created until their number reaches this number. After that, the server starts dropping
067   * connections.
068   */
069  public static final String MAX_WORKER_THREADS_CONF_KEY = "hbase.thrift.maxWorkerThreads";
070
071  public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
072
073  /**
074   * The maximum number of pending connections waiting in the queue. If there are no idle threads in
075   * the pool, the server queues requests. Only when the queue overflows, new threads are added, up
076   * to hbase.thrift.maxQueuedRequests threads.
077   */
078  public static final String MAX_QUEUED_REQUESTS_CONF_KEY = "hbase.thrift.maxQueuedRequests";
079
080  public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
081
082  /**
083   * Default amount of time in seconds to keep a thread alive. Worker threads are stopped after
084   * being idle for this long.
085   */
086  public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
087    "hbase.thrift.threadKeepAliveTimeSec";
088
089  private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
090
091  /**
092   * Time to wait after interrupting all worker threads. This is after a clean shutdown has been
093   * attempted.
094   */
095  public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
096
097  private static final Logger LOG =
098    LoggerFactory.getLogger(TBoundedThreadPoolServer.class.getName());
099
100  private final CallQueue callQueue;
101
102  public static class Args extends TThreadPoolServer.Args {
103    int maxQueuedRequests;
104    int threadKeepAliveTimeSec;
105
106    public Args(TServerTransport transport, Configuration conf) {
107      super(transport);
108      minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY, DEFAULT_MIN_WORKER_THREADS);
109      maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY, DEFAULT_MAX_WORKER_THREADS);
110      maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY, DEFAULT_MAX_QUEUED_REQUESTS);
111      threadKeepAliveTimeSec =
112        conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY, DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
113    }
114
115    @Override
116    public String toString() {
117      return "min worker threads=" + minWorkerThreads + ", max worker threads=" + maxWorkerThreads
118        + ", max queued requests=" + maxQueuedRequests;
119    }
120  }
121
122  /** Executor service for handling client connections */
123  private ThreadPoolExecutor executorService;
124
125  /** Flag for stopping the server */
126  private volatile boolean stopped;
127
128  private Args serverOptions;
129
130  public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
131    super(options);
132
133    int minWorkerThreads = options.minWorkerThreads;
134    int maxWorkerThreads = options.maxWorkerThreads;
135    if (options.maxQueuedRequests > 0) {
136      this.callQueue = new CallQueue(new LinkedBlockingQueue<>(options.maxQueuedRequests), metrics);
137      minWorkerThreads = maxWorkerThreads;
138    } else {
139      this.callQueue = new CallQueue(new SynchronousQueue<>(), metrics);
140    }
141
142    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
143    tfb.setDaemon(true);
144    tfb.setNameFormat("thrift-worker-%d");
145    executorService = new THBaseThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
146      options.threadKeepAliveTimeSec, TimeUnit.SECONDS, this.callQueue, tfb.build(), metrics);
147    executorService.allowCoreThreadTimeOut(true);
148    serverOptions = options;
149  }
150
151  @Override
152  public void serve() {
153    try {
154      serverTransport_.listen();
155    } catch (TTransportException ttx) {
156      LOG.error("Error occurred during listening.", ttx);
157      return;
158    }
159
160    Runtime.getRuntime().addShutdownHook(new Thread(getClass().getSimpleName() + "-shutdown-hook") {
161      @Override
162      public void run() {
163        TBoundedThreadPoolServer.this.stop();
164      }
165    });
166
167    stopped = false;
168    while (!stopped && !Thread.interrupted()) {
169      TTransport client = null;
170      try {
171        client = serverTransport_.accept();
172      } catch (TTransportException ttx) {
173        if (!stopped) {
174          LOG.warn("Transport error when accepting message", ttx);
175          continue;
176        } else {
177          // The server has been stopped
178          break;
179        }
180      }
181
182      ClientConnnection command = new ClientConnnection(client);
183      try {
184        executorService.execute(command);
185      } catch (RejectedExecutionException rex) {
186        if (client.getClass() == TSocket.class) {
187          // We expect the client to be TSocket.
188          LOG.warn(
189            QUEUE_FULL_MSG + " from " + ((TSocket) client).getSocket().getRemoteSocketAddress());
190        } else {
191          LOG.warn(QUEUE_FULL_MSG, rex);
192        }
193        client.close();
194      }
195    }
196
197    shutdownServer();
198  }
199
200  /**
201   * Loop until {@link ExecutorService#awaitTermination} finally does return without an interrupted
202   * exception. If we don't do this, then we'll shut down prematurely. We want to let the executor
203   * service clear its task queue, closing client sockets appropriately.
204   */
205  private void shutdownServer() {
206    executorService.shutdown();
207
208    long msLeftToWait = serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
209    long timeMillis = EnvironmentEdgeManager.currentTime();
210
211    LOG
212      .info("Waiting for up to " + msLeftToWait + " ms to finish processing" + " pending requests");
213    boolean interrupted = false;
214    while (msLeftToWait >= 0) {
215      try {
216        executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
217        break;
218      } catch (InterruptedException ix) {
219        long timePassed = EnvironmentEdgeManager.currentTime() - timeMillis;
220        msLeftToWait -= timePassed;
221        timeMillis += timePassed;
222        interrupted = true;
223      }
224    }
225
226    LOG.info("Interrupting all worker threads and waiting for " + TIME_TO_WAIT_AFTER_SHUTDOWN_MS
227      + " ms longer");
228
229    // This will interrupt all the threads, even those running a task.
230    executorService.shutdownNow();
231    Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
232
233    // Preserve the interrupted status.
234    if (interrupted) {
235      Thread.currentThread().interrupt();
236    }
237    LOG.info("Thrift server shutdown complete");
238  }
239
240  @Override
241  public void stop() {
242    stopped = true;
243    serverTransport_.interrupt();
244  }
245
246  private class ClientConnnection implements Runnable {
247
248    private TTransport client;
249
250    /**
251     * Default constructor.
252     * @param client Transport to process
253     */
254    private ClientConnnection(TTransport client) {
255      this.client = client;
256    }
257
258    /**
259     * Loops on processing a client forever
260     */
261    @Override
262    public void run() {
263      TProcessor processor = null;
264      TTransport inputTransport = null;
265      TTransport outputTransport = null;
266      TProtocol inputProtocol = null;
267      TProtocol outputProtocol = null;
268      try {
269        processor = processorFactory_.getProcessor(client);
270        inputTransport = inputTransportFactory_.getTransport(client);
271        outputTransport = outputTransportFactory_.getTransport(client);
272        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
273        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
274        // we check stopped_ first to make sure we're not supposed to be shutting
275        // down. this is necessary for graceful shutdown.
276        while (true) {
277          if (stopped) {
278            break;
279          }
280          processor.process(inputProtocol, outputProtocol);
281        }
282      } catch (TTransportException ttx) {
283        // Assume the client died and continue silently
284      } catch (TException tx) {
285        LOG.error("Thrift error occurred during processing of message.", tx);
286      } catch (Exception x) {
287        LOG.error("Error occurred during processing of message.", x);
288      }
289
290      if (inputTransport != null) {
291        inputTransport.close();
292      }
293
294      if (outputTransport != null) {
295        outputTransport.close();
296      }
297    }
298  }
299}