View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.PrintStream;
22  import java.io.PrintWriter;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.reflect.InvocationTargetException;
25  import java.lang.reflect.Method;
26  import java.util.concurrent.LinkedBlockingQueue;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.ThreadPoolExecutor;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.util.ReflectionUtils;
36  import org.apache.hadoop.util.StringUtils;
37  
38  import com.google.common.base.Preconditions;
39  
40  /**
41   * Thread Utility
42   */
43  @InterfaceAudience.Private
44  public class Threads {
45    private static final Log LOG = LogFactory.getLog(Threads.class);
46    private static final AtomicInteger poolNumber = new AtomicInteger(1);
47  
48    private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
49      new UncaughtExceptionHandler() {
50      @Override
51      public void uncaughtException(Thread t, Throwable e) {
52        LOG.warn("Thread:" + t + " exited with Exception:"
53            + StringUtils.stringifyException(e));
54      }
55    };
56  
57    /**
58     * Utility method that sets name, daemon status and starts passed thread.
59     * @param t thread to run
60     * @return Returns the passed Thread <code>t</code>.
61     */
62    public static Thread setDaemonThreadRunning(final Thread t) {
63      return setDaemonThreadRunning(t, t.getName());
64    }
65  
66    /**
67     * Utility method that sets name, daemon status and starts passed thread.
68     * @param t thread to frob
69     * @param name new name
70     * @return Returns the passed Thread <code>t</code>.
71     */
72    public static Thread setDaemonThreadRunning(final Thread t,
73      final String name) {
74      return setDaemonThreadRunning(t, name, null);
75    }
76  
77    /**
78     * Utility method that sets name, daemon status and starts passed thread.
79     * @param t thread to frob
80     * @param name new name
81     * @param handler A handler to set on the thread.  Pass null if want to
82     * use default handler.
83     * @return Returns the passed Thread <code>t</code>.
84     */
85    public static Thread setDaemonThreadRunning(final Thread t,
86      final String name, final UncaughtExceptionHandler handler) {
87      t.setName(name);
88      if (handler != null) {
89        t.setUncaughtExceptionHandler(handler);
90      }
91      t.setDaemon(true);
92      t.start();
93      return t;
94    }
95  
96    /**
97     * Shutdown passed thread using isAlive and join.
98     * @param t Thread to shutdown
99     */
100   public static void shutdown(final Thread t) {
101     shutdown(t, 0);
102   }
103 
104   /**
105    * Shutdown passed thread using isAlive and join.
106    * @param joinwait Pass 0 if we're to wait forever.
107    * @param t Thread to shutdown
108    */
109   public static void shutdown(final Thread t, final long joinwait) {
110     if (t == null) return;
111     while (t.isAlive()) {
112       try {
113         t.join(joinwait);
114       } catch (InterruptedException e) {
115         LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
116       }
117     }
118   }
119 
120 
121   /**
122    * @param t Waits on the passed thread to die dumping a threaddump every
123    * minute while its up.
124    * @throws InterruptedException
125    */
126   public static void threadDumpingIsAlive(final Thread t)
127   throws InterruptedException {
128     if (t == null) {
129       return;
130     }
131 
132     while (t.isAlive()) {
133       t.join(60 * 1000);
134       if (t.isAlive()) {
135         printThreadInfo(System.out,
136             "Automatic Stack Trace every 60 seconds waiting on " +
137             t.getName());
138       }
139     }
140   }
141 
142   /**
143    * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns
144    * @param millis How long to sleep for in milliseconds.
145    */
146   public static void sleep(long millis) {
147     try {
148       Thread.sleep(millis);
149     } catch (InterruptedException e) {
150       e.printStackTrace();
151       Thread.currentThread().interrupt();
152     }
153   }
154 
155   /**
156    * Sleeps for the given amount of time even if interrupted. Preserves
157    * the interrupt status.
158    * @param msToWait the amount of time to sleep in milliseconds
159    */
160   public static void sleepWithoutInterrupt(final long msToWait) {
161     long timeMillis = System.currentTimeMillis();
162     long endTime = timeMillis + msToWait;
163     boolean interrupted = false;
164     while (timeMillis < endTime) {
165       try {
166         Thread.sleep(endTime - timeMillis);
167       } catch (InterruptedException ex) {
168         interrupted = true;
169       }
170       timeMillis = System.currentTimeMillis();
171     }
172 
173     if (interrupted) {
174       Thread.currentThread().interrupt();
175     }
176   }
177 
178   /**
179    * Create a new CachedThreadPool with a bounded number as the maximum
180    * thread size in the pool.
181    *
182    * @param maxCachedThread the maximum thread could be created in the pool
183    * @param timeout the maximum time to wait
184    * @param unit the time unit of the timeout argument
185    * @param threadFactory the factory to use when creating new threads
186    * @return threadPoolExecutor the cachedThreadPool with a bounded number
187    * as the maximum thread size in the pool.
188    */
189   public static ThreadPoolExecutor getBoundedCachedThreadPool(
190       int maxCachedThread, long timeout, TimeUnit unit,
191       ThreadFactory threadFactory) {
192     ThreadPoolExecutor boundedCachedThreadPool =
193       new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
194         unit, new LinkedBlockingQueue<Runnable>(), threadFactory);
195     // allow the core pool threads timeout and terminate
196     boundedCachedThreadPool.allowCoreThreadTimeOut(true);
197     return boundedCachedThreadPool;
198   }
199 
200 
201   /**
202    * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
203    * with a common prefix.
204    * @param prefix The prefix of every created Thread's name
205    * @return a {@link java.util.concurrent.ThreadFactory} that names threads
206    */
207   public static ThreadFactory getNamedThreadFactory(final String prefix) {
208     SecurityManager s = System.getSecurityManager();
209     final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
210         .getThreadGroup();
211 
212     return new ThreadFactory() {
213       final AtomicInteger threadNumber = new AtomicInteger(1);
214       private final int poolNumber = Threads.poolNumber.getAndIncrement();
215       final ThreadGroup group = threadGroup;
216 
217       @Override
218       public Thread newThread(Runnable r) {
219         final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
220         return new Thread(group, r, name);
221       }
222     };
223   }
224 
225   /**
226    * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
227    * without setting the exception handler.
228    */
229   public static ThreadFactory newDaemonThreadFactory(final String prefix) {
230     return newDaemonThreadFactory(prefix, null);
231   }
232 
233   /**
234    * Get a named {@link ThreadFactory} that just builds daemon threads.
235    * @param prefix name prefix for all threads created from the factory
236    * @param handler unhandles exception handler to set for all threads
237    * @return a thread factory that creates named, daemon threads with
238    *         the supplied exception handler and normal priority
239    */
240   public static ThreadFactory newDaemonThreadFactory(final String prefix,
241       final UncaughtExceptionHandler handler) {
242     final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
243     return new ThreadFactory() {
244       @Override
245       public Thread newThread(Runnable r) {
246         Thread t = namedFactory.newThread(r);
247         if (handler != null) {
248           t.setUncaughtExceptionHandler(handler);
249         } else {
250           t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
251         }
252         if (!t.isDaemon()) {
253           t.setDaemon(true);
254         }
255         if (t.getPriority() != Thread.NORM_PRIORITY) {
256           t.setPriority(Thread.NORM_PRIORITY);
257         }
258         return t;
259       }
260 
261     };
262   }
263 
264   /** Sets an UncaughtExceptionHandler for the thread which logs the
265    * Exception stack if the thread dies.
266    */
267   public static void setLoggingUncaughtExceptionHandler(Thread t) {
268     t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
269   }
270 
271   private static interface PrintThreadInfoHelper {
272 
273     void printThreadInfo(PrintStream stream, String title);
274 
275   }
276 
277   private static class PrintThreadInfoLazyHolder {
278 
279     public static final PrintThreadInfoHelper HELPER = initHelper();
280 
281     private static PrintThreadInfoHelper initHelper() {
282       Method method = null;
283       try {
284         // Hadoop 2.7+ declares printThreadInfo(PrintStream, String)
285         method = ReflectionUtils.class.getMethod("printThreadInfo", PrintStream.class,
286           String.class);
287         method.setAccessible(true);
288         final Method hadoop27Method = method;
289         return new PrintThreadInfoHelper() {
290 
291           @Override
292           public void printThreadInfo(PrintStream stream, String title) {
293             try {
294               hadoop27Method.invoke(null, stream, title);
295             } catch (IllegalAccessException | IllegalArgumentException e) {
296               throw new RuntimeException(e);
297             } catch (InvocationTargetException e) {
298               throw new RuntimeException(e.getCause());
299             }
300           }
301         };
302       } catch (NoSuchMethodException e) {
303         LOG.info(
304           "Can not find hadoop 2.7+ printThreadInfo method, try hadoop hadoop 2.6 and earlier", e);
305       }
306       try {
307         // Hadoop 2.6 and earlier declares printThreadInfo(PrintWriter, String)
308         method = ReflectionUtils.class.getMethod("printThreadInfo", PrintWriter.class,
309           String.class);
310         method.setAccessible(true);
311         final Method hadoop26Method = method;
312         return new PrintThreadInfoHelper() {
313 
314           @Override
315           public void printThreadInfo(PrintStream stream, String title) {
316             try {
317               hadoop26Method.invoke(null, new PrintWriter(stream), title);
318             } catch (IllegalAccessException | IllegalArgumentException e) {
319               throw new RuntimeException(e);
320             } catch (InvocationTargetException e) {
321               throw new RuntimeException(e.getCause());
322             }
323           }
324         };
325       } catch (NoSuchMethodException e) {
326         LOG.warn("Cannot find printThreadInfo method. Check hadoop jars linked", e);
327       }
328       return null;
329     }
330   }
331 
332   /**
333    * Print all of the thread's information and stack traces. Wrapper around Hadoop's method.
334    *
335    * @param stream the stream to
336    * @param title a string title for the stack trace
337    */
338   public static void printThreadInfo(PrintStream stream, String title) {
339     Preconditions.checkNotNull(PrintThreadInfoLazyHolder.HELPER,
340       "Cannot find method. Check hadoop jars linked").printThreadInfo(stream, title);
341   }
342 }