1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.PrintStream;
22 import java.io.PrintWriter;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.util.ReflectionUtils;
36 import org.apache.hadoop.util.StringUtils;
37
38
39
40
41 @InterfaceAudience.Private
42 public class Threads {
43 protected static final Log LOG = LogFactory.getLog(Threads.class);
44 private static final AtomicInteger poolNumber = new AtomicInteger(1);
45
46 private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
47 new UncaughtExceptionHandler() {
48 @Override
49 public void uncaughtException(Thread t, Throwable e) {
50 LOG.warn("Thread:" + t + " exited with Exception:"
51 + StringUtils.stringifyException(e));
52 }
53 };
54
55
56
57
58
59
60 public static Thread setDaemonThreadRunning(final Thread t) {
61 return setDaemonThreadRunning(t, t.getName());
62 }
63
64
65
66
67
68
69
70 public static Thread setDaemonThreadRunning(final Thread t,
71 final String name) {
72 return setDaemonThreadRunning(t, name, null);
73 }
74
75
76
77
78
79
80
81
82
83 public static Thread setDaemonThreadRunning(final Thread t,
84 final String name, final UncaughtExceptionHandler handler) {
85 t.setName(name);
86 if (handler != null) {
87 t.setUncaughtExceptionHandler(handler);
88 }
89 t.setDaemon(true);
90 t.start();
91 return t;
92 }
93
94
95
96
97
98 public static void shutdown(final Thread t) {
99 shutdown(t, 0);
100 }
101
102
103
104
105
106
107 public static void shutdown(final Thread t, final long joinwait) {
108 if (t == null) return;
109 while (t.isAlive()) {
110 try {
111 t.join(joinwait);
112 } catch (InterruptedException e) {
113 LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
114 }
115 }
116 }
117
118
119
120
121
122
123
124 public static void threadDumpingIsAlive(final Thread t)
125 throws InterruptedException {
126 if (t == null) {
127 return;
128 }
129
130 while (t.isAlive()) {
131 t.join(60 * 1000);
132 if (t.isAlive()) {
133 printThreadInfo(System.out,
134 "Automatic Stack Trace every 60 seconds waiting on " +
135 t.getName());
136 }
137 }
138 }
139
140
141
142
143
144 public static void sleep(long millis) {
145 try {
146 Thread.sleep(millis);
147 } catch (InterruptedException e) {
148 e.printStackTrace();
149 Thread.currentThread().interrupt();
150 }
151 }
152
153
154
155
156
157
158 public static void sleepWithoutInterrupt(final long msToWait) {
159 long timeMillis = System.currentTimeMillis();
160 long endTime = timeMillis + msToWait;
161 boolean interrupted = false;
162 while (timeMillis < endTime) {
163 try {
164 Thread.sleep(endTime - timeMillis);
165 } catch (InterruptedException ex) {
166 interrupted = true;
167 }
168 timeMillis = System.currentTimeMillis();
169 }
170
171 if (interrupted) {
172 Thread.currentThread().interrupt();
173 }
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187 public static ThreadPoolExecutor getBoundedCachedThreadPool(
188 int maxCachedThread, long timeout, TimeUnit unit,
189 ThreadFactory threadFactory) {
190 ThreadPoolExecutor boundedCachedThreadPool =
191 new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
192 unit, new LinkedBlockingQueue<Runnable>(), threadFactory);
193
194 boundedCachedThreadPool.allowCoreThreadTimeOut(true);
195 return boundedCachedThreadPool;
196 }
197
198
199
200
201
202
203
204
205 public static ThreadFactory getNamedThreadFactory(final String prefix) {
206 SecurityManager s = System.getSecurityManager();
207 final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
208 .getThreadGroup();
209
210 return new ThreadFactory() {
211 final AtomicInteger threadNumber = new AtomicInteger(1);
212 private final int poolNumber = Threads.poolNumber.getAndIncrement();
213 final ThreadGroup group = threadGroup;
214
215 @Override
216 public Thread newThread(Runnable r) {
217 final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
218 return new Thread(group, r, name);
219 }
220 };
221 }
222
223
224
225
226
227 public static ThreadFactory newDaemonThreadFactory(final String prefix) {
228 return newDaemonThreadFactory(prefix, null);
229 }
230
231
232
233
234
235
236
237
238 public static ThreadFactory newDaemonThreadFactory(final String prefix,
239 final UncaughtExceptionHandler handler) {
240 final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
241 return new ThreadFactory() {
242 @Override
243 public Thread newThread(Runnable r) {
244 Thread t = namedFactory.newThread(r);
245 if (handler != null) {
246 t.setUncaughtExceptionHandler(handler);
247 } else {
248 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
249 }
250 if (!t.isDaemon()) {
251 t.setDaemon(true);
252 }
253 if (t.getPriority() != Thread.NORM_PRIORITY) {
254 t.setPriority(Thread.NORM_PRIORITY);
255 }
256 return t;
257 }
258
259 };
260 }
261
262
263
264
265 public static void setLoggingUncaughtExceptionHandler(Thread t) {
266 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
267 }
268
269 private static Method printThreadInfoMethod = null;
270 private static boolean printThreadInfoMethodWithPrintStream = true;
271
272
273
274
275
276
277
278 public static void printThreadInfo(PrintStream stream, String title) {
279
280 if (printThreadInfoMethod == null) {
281 try {
282
283 printThreadInfoMethod = ReflectionUtils.class.getMethod("printThreadInfo",
284 PrintStream.class, String.class);
285 } catch (NoSuchMethodException e) {
286
287 printThreadInfoMethodWithPrintStream = false;
288 try {
289 printThreadInfoMethod = ReflectionUtils.class.getMethod("printThreadInfo",
290 PrintWriter.class, String.class);
291 } catch (NoSuchMethodException e1) {
292 throw new RuntimeException("Cannot find method. Check hadoop jars linked", e1);
293 }
294 }
295 printThreadInfoMethod.setAccessible(true);
296 }
297
298 try {
299 if (printThreadInfoMethodWithPrintStream) {
300 printThreadInfoMethod.invoke(null, stream, title);
301 } else {
302 printThreadInfoMethod.invoke(null, new PrintWriter(stream), title);
303 }
304 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
305 throw new RuntimeException(e.getCause());
306 }
307 }
308 }