1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.RejectedExecutionException;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
33 import org.apache.hadoop.hbase.util.Threads;
34 import org.apache.thrift.TException;
35 import org.apache.thrift.TProcessor;
36 import org.apache.thrift.protocol.TProtocol;
37 import org.apache.thrift.server.TServer;
38 import org.apache.thrift.server.TThreadPoolServer;
39 import org.apache.thrift.transport.TServerTransport;
40 import org.apache.thrift.transport.TSocket;
41 import org.apache.thrift.transport.TTransport;
42 import org.apache.thrift.transport.TTransportException;
43
44 import com.google.common.util.concurrent.ThreadFactoryBuilder;
45
46
47
48
49 @InterfaceAudience.Private
50 public class TBoundedThreadPoolServer extends TServer {
51
52 private static final String QUEUE_FULL_MSG =
53 "Queue is full, closing connection";
54
55
56
57
58
59 public static final String MIN_WORKER_THREADS_CONF_KEY =
60 "hbase.thrift.minWorkerThreads";
61
62
63
64
65
66
67 public static final int DEFAULT_MIN_WORKER_THREADS = 16;
68
69
70
71
72
73
74 public static final String MAX_WORKER_THREADS_CONF_KEY =
75 "hbase.thrift.maxWorkerThreads";
76
77 public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
78
79
80
81
82
83
84
85 public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
86 "hbase.thrift.maxQueuedRequests";
87
88 public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
89
90
91
92
93
94 public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
95 "hbase.thrift.threadKeepAliveTimeSec";
96
97 private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
98
99
100
101
102
103 public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
104
105 private static final Log LOG = LogFactory.getLog(
106 TBoundedThreadPoolServer.class.getName());
107
108 private final CallQueue callQueue;
109
110 public static class Args extends TThreadPoolServer.Args {
111 int maxQueuedRequests;
112 int threadKeepAliveTimeSec;
113
114 public Args(TServerTransport transport, Configuration conf) {
115 super(transport);
116 minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
117 DEFAULT_MIN_WORKER_THREADS);
118 maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
119 DEFAULT_MAX_WORKER_THREADS);
120 maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
121 DEFAULT_MAX_QUEUED_REQUESTS);
122 threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY,
123 DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
124 }
125
126 @Override
127 public String toString() {
128 return "min worker threads=" + minWorkerThreads
129 + ", max worker threads=" + maxWorkerThreads
130 + ", max queued requests=" + maxQueuedRequests;
131 }
132 }
133
134
135 private ExecutorService executorService;
136
137
138 private volatile boolean stopped;
139
140 private Args serverOptions;
141
142 public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
143 super(options);
144
145 if (options.maxQueuedRequests > 0) {
146 this.callQueue = new CallQueue(
147 new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
148 } else {
149 this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
150 }
151
152 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
153 tfb.setDaemon(true);
154 tfb.setNameFormat("thrift-worker-%d");
155 executorService =
156 new ThreadPoolExecutor(options.minWorkerThreads,
157 options.maxWorkerThreads, options.threadKeepAliveTimeSec,
158 TimeUnit.SECONDS, this.callQueue, tfb.build());
159 serverOptions = options;
160 }
161
162 public void serve() {
163 try {
164 serverTransport_.listen();
165 } catch (TTransportException ttx) {
166 LOG.error("Error occurred during listening.", ttx);
167 return;
168 }
169
170 Runtime.getRuntime().addShutdownHook(
171 new Thread(getClass().getSimpleName() + "-shutdown-hook") {
172 @Override
173 public void run() {
174 TBoundedThreadPoolServer.this.stop();
175 }
176 });
177
178 stopped = false;
179 while (!stopped && !Thread.interrupted()) {
180 TTransport client = null;
181 try {
182 client = serverTransport_.accept();
183 } catch (TTransportException ttx) {
184 if (!stopped) {
185 LOG.warn("Transport error when accepting message", ttx);
186 continue;
187 } else {
188
189 break;
190 }
191 }
192
193 ClientConnnection command = new ClientConnnection(client);
194 try {
195 executorService.execute(command);
196 } catch (RejectedExecutionException rex) {
197 if (client.getClass() == TSocket.class) {
198
199 LOG.warn(QUEUE_FULL_MSG + " from " +
200 ((TSocket) client).getSocket().getRemoteSocketAddress());
201 } else {
202 LOG.warn(QUEUE_FULL_MSG, rex);
203 }
204 client.close();
205 }
206 }
207
208 shutdownServer();
209 }
210
211
212
213
214
215
216
217 private void shutdownServer() {
218 executorService.shutdown();
219
220 long msLeftToWait =
221 serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
222 long timeMillis = System.currentTimeMillis();
223
224 LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
225 " pending requests");
226 boolean interrupted = false;
227 while (msLeftToWait >= 0) {
228 try {
229 executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
230 break;
231 } catch (InterruptedException ix) {
232 long timePassed = System.currentTimeMillis() - timeMillis;
233 msLeftToWait -= timePassed;
234 timeMillis += timePassed;
235 interrupted = true;
236 }
237 }
238
239 LOG.info("Interrupting all worker threads and waiting for "
240 + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
241
242
243 executorService.shutdownNow();
244 Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
245
246
247 if (interrupted) {
248 Thread.currentThread().interrupt();
249 }
250 LOG.info("Thrift server shutdown complete");
251 }
252
253 @Override
254 public void stop() {
255 stopped = true;
256 serverTransport_.interrupt();
257 }
258
259 private class ClientConnnection implements Runnable {
260
261 private TTransport client;
262
263
264
265
266
267
268 private ClientConnnection(TTransport client) {
269 this.client = client;
270 }
271
272
273
274
275 public void run() {
276 TProcessor processor = null;
277 TTransport inputTransport = null;
278 TTransport outputTransport = null;
279 TProtocol inputProtocol = null;
280 TProtocol outputProtocol = null;
281 try {
282 processor = processorFactory_.getProcessor(client);
283 inputTransport = inputTransportFactory_.getTransport(client);
284 outputTransport = outputTransportFactory_.getTransport(client);
285 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
286 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
287
288
289 while (!stopped && processor.process(inputProtocol, outputProtocol)) {}
290 } catch (TTransportException ttx) {
291
292 } catch (TException tx) {
293 LOG.error("Thrift error occurred during processing of message.", tx);
294 } catch (Exception x) {
295 LOG.error("Error occurred during processing of message.", x);
296 }
297
298 if (inputTransport != null) {
299 inputTransport.close();
300 }
301
302 if (outputTransport != null) {
303 outputTransport.close();
304 }
305 }
306 }
307 }