001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.LinkedBlockingQueue; 023import java.util.concurrent.RejectedExecutionException; 024import java.util.concurrent.SynchronousQueue; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.util.Threads; 030import org.apache.thrift.TException; 031import org.apache.thrift.TProcessor; 032import org.apache.thrift.protocol.TProtocol; 033import org.apache.thrift.server.TServer; 034import org.apache.thrift.server.TThreadPoolServer; 035import org.apache.thrift.transport.TServerTransport; 036import org.apache.thrift.transport.TSocket; 037import org.apache.thrift.transport.TTransport; 038import org.apache.thrift.transport.TTransportException; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 043 044/** 045 * A bounded thread pool server customized for HBase. 046 */ 047@InterfaceAudience.Private 048public class TBoundedThreadPoolServer extends TServer { 049 050 private static final String QUEUE_FULL_MSG = 051 "Queue is full, closing connection"; 052 053 /** 054 * The "core size" of the thread pool. New threads are created on every 055 * connection until this many threads are created. 056 */ 057 public static final String MIN_WORKER_THREADS_CONF_KEY = 058 "hbase.thrift.minWorkerThreads"; 059 060 /** 061 * This default core pool size should be enough for many test scenarios. We 062 * want to override this with a much larger number (e.g. at least 200) for a 063 * large-scale production setup. 064 */ 065 public static final int DEFAULT_MIN_WORKER_THREADS = 16; 066 067 /** 068 * The maximum size of the thread pool. When the pending request queue 069 * overflows, new threads are created until their number reaches this number. 070 * After that, the server starts dropping connections. 071 */ 072 public static final String MAX_WORKER_THREADS_CONF_KEY = 073 "hbase.thrift.maxWorkerThreads"; 074 075 public static final int DEFAULT_MAX_WORKER_THREADS = 1000; 076 077 /** 078 * The maximum number of pending connections waiting in the queue. If there 079 * are no idle threads in the pool, the server queues requests. Only when 080 * the queue overflows, new threads are added, up to 081 * hbase.thrift.maxQueuedRequests threads. 082 */ 083 public static final String MAX_QUEUED_REQUESTS_CONF_KEY = 084 "hbase.thrift.maxQueuedRequests"; 085 086 public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000; 087 088 /** 089 * Default amount of time in seconds to keep a thread alive. Worker threads 090 * are stopped after being idle for this long. 091 */ 092 public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY = 093 "hbase.thrift.threadKeepAliveTimeSec"; 094 095 private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60; 096 097 /** 098 * Time to wait after interrupting all worker threads. This is after a clean 099 * shutdown has been attempted. 100 */ 101 public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000; 102 103 private static final Logger LOG = LoggerFactory.getLogger( 104 TBoundedThreadPoolServer.class.getName()); 105 106 private final CallQueue callQueue; 107 108 public static class Args extends TThreadPoolServer.Args { 109 int maxQueuedRequests; 110 int threadKeepAliveTimeSec; 111 112 public Args(TServerTransport transport, Configuration conf) { 113 super(transport); 114 minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY, 115 DEFAULT_MIN_WORKER_THREADS); 116 maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY, 117 DEFAULT_MAX_WORKER_THREADS); 118 maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY, 119 DEFAULT_MAX_QUEUED_REQUESTS); 120 threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY, 121 DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC); 122 } 123 124 @Override 125 public String toString() { 126 return "min worker threads=" + minWorkerThreads 127 + ", max worker threads=" + maxWorkerThreads 128 + ", max queued requests=" + maxQueuedRequests; 129 } 130 } 131 132 /** Executor service for handling client connections */ 133 private ThreadPoolExecutor executorService; 134 135 /** Flag for stopping the server */ 136 private volatile boolean stopped; 137 138 private Args serverOptions; 139 140 public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) { 141 super(options); 142 143 int minWorkerThreads = options.minWorkerThreads; 144 int maxWorkerThreads = options.maxWorkerThreads; 145 if (options.maxQueuedRequests > 0) { 146 this.callQueue = new CallQueue( 147 new LinkedBlockingQueue<>(options.maxQueuedRequests), metrics); 148 minWorkerThreads = maxWorkerThreads; 149 } else { 150 this.callQueue = new CallQueue(new SynchronousQueue<>(), metrics); 151 } 152 153 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 154 tfb.setDaemon(true); 155 tfb.setNameFormat("thrift-worker-%d"); 156 executorService = 157 new THBaseThreadPoolExecutor(minWorkerThreads, 158 maxWorkerThreads, options.threadKeepAliveTimeSec, 159 TimeUnit.SECONDS, this.callQueue, tfb.build(), metrics); 160 executorService.allowCoreThreadTimeOut(true); 161 serverOptions = options; 162 } 163 164 @Override 165 public void serve() { 166 try { 167 serverTransport_.listen(); 168 } catch (TTransportException ttx) { 169 LOG.error("Error occurred during listening.", ttx); 170 return; 171 } 172 173 Runtime.getRuntime().addShutdownHook( 174 new Thread(getClass().getSimpleName() + "-shutdown-hook") { 175 @Override 176 public void run() { 177 TBoundedThreadPoolServer.this.stop(); 178 } 179 }); 180 181 stopped = false; 182 while (!stopped && !Thread.interrupted()) { 183 TTransport client = null; 184 try { 185 client = serverTransport_.accept(); 186 } catch (TTransportException ttx) { 187 if (!stopped) { 188 LOG.warn("Transport error when accepting message", ttx); 189 continue; 190 } else { 191 // The server has been stopped 192 break; 193 } 194 } 195 196 ClientConnnection command = new ClientConnnection(client); 197 try { 198 executorService.execute(command); 199 } catch (RejectedExecutionException rex) { 200 if (client.getClass() == TSocket.class) { 201 // We expect the client to be TSocket. 202 LOG.warn(QUEUE_FULL_MSG + " from " + 203 ((TSocket) client).getSocket().getRemoteSocketAddress()); 204 } else { 205 LOG.warn(QUEUE_FULL_MSG, rex); 206 } 207 client.close(); 208 } 209 } 210 211 shutdownServer(); 212 } 213 214 /** 215 * Loop until {@link ExecutorService#awaitTermination} finally does return 216 * without an interrupted exception. If we don't do this, then we'll shut 217 * down prematurely. We want to let the executor service clear its task 218 * queue, closing client sockets appropriately. 219 */ 220 private void shutdownServer() { 221 executorService.shutdown(); 222 223 long msLeftToWait = 224 serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal); 225 long timeMillis = System.currentTimeMillis(); 226 227 LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" + 228 " pending requests"); 229 boolean interrupted = false; 230 while (msLeftToWait >= 0) { 231 try { 232 executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS); 233 break; 234 } catch (InterruptedException ix) { 235 long timePassed = System.currentTimeMillis() - timeMillis; 236 msLeftToWait -= timePassed; 237 timeMillis += timePassed; 238 interrupted = true; 239 } 240 } 241 242 LOG.info("Interrupting all worker threads and waiting for " 243 + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer"); 244 245 // This will interrupt all the threads, even those running a task. 246 executorService.shutdownNow(); 247 Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS); 248 249 // Preserve the interrupted status. 250 if (interrupted) { 251 Thread.currentThread().interrupt(); 252 } 253 LOG.info("Thrift server shutdown complete"); 254 } 255 256 @Override 257 public void stop() { 258 stopped = true; 259 serverTransport_.interrupt(); 260 } 261 262 private class ClientConnnection implements Runnable { 263 264 private TTransport client; 265 266 /** 267 * Default constructor. 268 * 269 * @param client Transport to process 270 */ 271 private ClientConnnection(TTransport client) { 272 this.client = client; 273 } 274 275 /** 276 * Loops on processing a client forever 277 */ 278 @Override 279 public void run() { 280 TProcessor processor = null; 281 TTransport inputTransport = null; 282 TTransport outputTransport = null; 283 TProtocol inputProtocol = null; 284 TProtocol outputProtocol = null; 285 try { 286 processor = processorFactory_.getProcessor(client); 287 inputTransport = inputTransportFactory_.getTransport(client); 288 outputTransport = outputTransportFactory_.getTransport(client); 289 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); 290 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); 291 // we check stopped_ first to make sure we're not supposed to be shutting 292 // down. this is necessary for graceful shutdown. 293 while (true) { 294 if (stopped) { 295 break; 296 } 297 processor.process(inputProtocol, outputProtocol); 298 } 299 } catch (TTransportException ttx) { 300 // Assume the client died and continue silently 301 } catch (TException tx) { 302 LOG.error("Thrift error occurred during processing of message.", tx); 303 } catch (Exception x) { 304 LOG.error("Error occurred during processing of message.", x); 305 } 306 307 if (inputTransport != null) { 308 inputTransport.close(); 309 } 310 311 if (outputTransport != null) { 312 outputTransport.close(); 313 } 314 } 315 } 316}