001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.LinkedBlockingQueue; 022import java.util.concurrent.RejectedExecutionException; 023import java.util.concurrent.SynchronousQueue; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.hadoop.hbase.util.Threads; 029import org.apache.thrift.TException; 030import org.apache.thrift.TProcessor; 031import org.apache.thrift.protocol.TProtocol; 032import org.apache.thrift.server.TServer; 033import org.apache.thrift.server.TThreadPoolServer; 034import org.apache.thrift.transport.TServerTransport; 035import org.apache.thrift.transport.TSocket; 036import org.apache.thrift.transport.TTransport; 037import org.apache.thrift.transport.TTransportException; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 043 044/** 045 * A bounded thread pool server customized for HBase. 046 */ 047@InterfaceAudience.Private 048public class TBoundedThreadPoolServer extends TServer { 049 050 private static final String QUEUE_FULL_MSG = "Queue is full, closing connection"; 051 052 /** 053 * The "core size" of the thread pool. New threads are created on every connection until this many 054 * threads are created. 055 */ 056 public static final String MIN_WORKER_THREADS_CONF_KEY = "hbase.thrift.minWorkerThreads"; 057 058 /** 059 * This default core pool size should be enough for many test scenarios. We want to override this 060 * with a much larger number (e.g. at least 200) for a large-scale production setup. 061 */ 062 public static final int DEFAULT_MIN_WORKER_THREADS = 16; 063 064 /** 065 * The maximum size of the thread pool. When the pending request queue overflows, new threads are 066 * created until their number reaches this number. After that, the server starts dropping 067 * connections. 068 */ 069 public static final String MAX_WORKER_THREADS_CONF_KEY = "hbase.thrift.maxWorkerThreads"; 070 071 public static final int DEFAULT_MAX_WORKER_THREADS = 1000; 072 073 /** 074 * The maximum number of pending connections waiting in the queue. If there are no idle threads in 075 * the pool, the server queues requests. Only when the queue overflows, new threads are added, up 076 * to hbase.thrift.maxQueuedRequests threads. 077 */ 078 public static final String MAX_QUEUED_REQUESTS_CONF_KEY = "hbase.thrift.maxQueuedRequests"; 079 080 public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000; 081 082 /** 083 * Default amount of time in seconds to keep a thread alive. Worker threads are stopped after 084 * being idle for this long. 085 */ 086 public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY = 087 "hbase.thrift.threadKeepAliveTimeSec"; 088 089 private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60; 090 091 /** 092 * Time to wait after interrupting all worker threads. This is after a clean shutdown has been 093 * attempted. 094 */ 095 public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000; 096 097 private static final Logger LOG = 098 LoggerFactory.getLogger(TBoundedThreadPoolServer.class.getName()); 099 100 private final CallQueue callQueue; 101 102 public static class Args extends TThreadPoolServer.Args { 103 int maxQueuedRequests; 104 int threadKeepAliveTimeSec; 105 106 public Args(TServerTransport transport, Configuration conf) { 107 super(transport); 108 minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY, DEFAULT_MIN_WORKER_THREADS); 109 maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY, DEFAULT_MAX_WORKER_THREADS); 110 maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY, DEFAULT_MAX_QUEUED_REQUESTS); 111 threadKeepAliveTimeSec = 112 conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY, DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC); 113 } 114 115 @Override 116 public String toString() { 117 return "min worker threads=" + minWorkerThreads + ", max worker threads=" + maxWorkerThreads 118 + ", max queued requests=" + maxQueuedRequests; 119 } 120 } 121 122 /** Executor service for handling client connections */ 123 private ThreadPoolExecutor executorService; 124 125 /** Flag for stopping the server */ 126 private volatile boolean stopped; 127 128 private Args serverOptions; 129 130 public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) { 131 super(options); 132 133 int minWorkerThreads = options.minWorkerThreads; 134 int maxWorkerThreads = options.maxWorkerThreads; 135 if (options.maxQueuedRequests > 0) { 136 this.callQueue = new CallQueue(new LinkedBlockingQueue<>(options.maxQueuedRequests), metrics); 137 minWorkerThreads = maxWorkerThreads; 138 } else { 139 this.callQueue = new CallQueue(new SynchronousQueue<>(), metrics); 140 } 141 142 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 143 tfb.setDaemon(true); 144 tfb.setNameFormat("thrift-worker-%d"); 145 executorService = new THBaseThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, 146 options.threadKeepAliveTimeSec, TimeUnit.SECONDS, this.callQueue, tfb.build(), metrics); 147 executorService.allowCoreThreadTimeOut(true); 148 serverOptions = options; 149 } 150 151 @Override 152 public void serve() { 153 try { 154 serverTransport_.listen(); 155 } catch (TTransportException ttx) { 156 LOG.error("Error occurred during listening.", ttx); 157 return; 158 } 159 160 Runtime.getRuntime().addShutdownHook(new Thread(getClass().getSimpleName() + "-shutdown-hook") { 161 @Override 162 public void run() { 163 TBoundedThreadPoolServer.this.stop(); 164 } 165 }); 166 167 stopped = false; 168 while (!stopped && !Thread.interrupted()) { 169 TTransport client = null; 170 try { 171 client = serverTransport_.accept(); 172 } catch (TTransportException ttx) { 173 if (!stopped) { 174 LOG.warn("Transport error when accepting message", ttx); 175 continue; 176 } else { 177 // The server has been stopped 178 break; 179 } 180 } 181 182 ClientConnnection command = new ClientConnnection(client); 183 try { 184 executorService.execute(command); 185 } catch (RejectedExecutionException rex) { 186 if (client.getClass() == TSocket.class) { 187 // We expect the client to be TSocket. 188 LOG.warn( 189 QUEUE_FULL_MSG + " from " + ((TSocket) client).getSocket().getRemoteSocketAddress()); 190 } else { 191 LOG.warn(QUEUE_FULL_MSG, rex); 192 } 193 client.close(); 194 } 195 } 196 197 shutdownServer(); 198 } 199 200 /** 201 * Loop until {@link ExecutorService#awaitTermination} finally does return without an interrupted 202 * exception. If we don't do this, then we'll shut down prematurely. We want to let the executor 203 * service clear its task queue, closing client sockets appropriately. 204 */ 205 private void shutdownServer() { 206 executorService.shutdown(); 207 208 long msLeftToWait = serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal); 209 long timeMillis = EnvironmentEdgeManager.currentTime(); 210 211 LOG 212 .info("Waiting for up to " + msLeftToWait + " ms to finish processing" + " pending requests"); 213 boolean interrupted = false; 214 while (msLeftToWait >= 0) { 215 try { 216 executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS); 217 break; 218 } catch (InterruptedException ix) { 219 long timePassed = EnvironmentEdgeManager.currentTime() - timeMillis; 220 msLeftToWait -= timePassed; 221 timeMillis += timePassed; 222 interrupted = true; 223 } 224 } 225 226 LOG.info("Interrupting all worker threads and waiting for " + TIME_TO_WAIT_AFTER_SHUTDOWN_MS 227 + " ms longer"); 228 229 // This will interrupt all the threads, even those running a task. 230 executorService.shutdownNow(); 231 Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS); 232 233 // Preserve the interrupted status. 234 if (interrupted) { 235 Thread.currentThread().interrupt(); 236 } 237 LOG.info("Thrift server shutdown complete"); 238 } 239 240 @Override 241 public void stop() { 242 stopped = true; 243 serverTransport_.interrupt(); 244 } 245 246 private final class ClientConnnection implements Runnable { 247 248 private TTransport client; 249 250 /** 251 * Default constructor. 252 * @param client Transport to process 253 */ 254 private ClientConnnection(TTransport client) { 255 this.client = client; 256 } 257 258 /** 259 * Loops on processing a client forever 260 */ 261 @Override 262 public void run() { 263 TProcessor processor = null; 264 TTransport inputTransport = null; 265 TTransport outputTransport = null; 266 TProtocol inputProtocol = null; 267 TProtocol outputProtocol = null; 268 try { 269 processor = processorFactory_.getProcessor(client); 270 inputTransport = inputTransportFactory_.getTransport(client); 271 outputTransport = outputTransportFactory_.getTransport(client); 272 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); 273 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); 274 // we check stopped_ first to make sure we're not supposed to be shutting 275 // down. this is necessary for graceful shutdown. 276 while (true) { 277 if (stopped) { 278 break; 279 } 280 processor.process(inputProtocol, outputProtocol); 281 } 282 } catch (TTransportException ttx) { 283 // Assume the client died and continue silently 284 } catch (TException tx) { 285 LOG.error("Thrift error occurred during processing of message.", tx); 286 } catch (Exception x) { 287 LOG.error("Error occurred during processing of message.", x); 288 } 289 290 if (inputTransport != null) { 291 inputTransport.close(); 292 } 293 294 if (outputTransport != null) { 295 outputTransport.close(); 296 } 297 } 298 } 299}