001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.OutputStreamWriter;
022import java.io.PrintStream;
023import java.io.PrintWriter;
024import java.lang.Thread.UncaughtExceptionHandler;
025import java.lang.reflect.InvocationTargetException;
026import java.lang.reflect.Method;
027import java.nio.charset.StandardCharsets;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import org.apache.hadoop.util.ReflectionUtils;
035import org.apache.hadoop.util.StringUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041
042/**
043 * Thread Utility
044 */
045@InterfaceAudience.Private
046public class Threads {
047  private static final Logger LOG = LoggerFactory.getLogger(Threads.class);
048  private static final AtomicInteger poolNumber = new AtomicInteger(1);
049
050  public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
051    new UncaughtExceptionHandler() {
052    @Override
053    public void uncaughtException(Thread t, Throwable e) {
054      LOG.warn("Thread:" + t + " exited with Exception:"
055          + StringUtils.stringifyException(e));
056    }
057  };
058
059  /**
060   * Utility method that sets name, daemon status and starts passed thread.
061   * @param t thread to run
062   * @return Returns the passed Thread <code>t</code>.
063   */
064  public static <T extends Thread> T setDaemonThreadRunning(T t) {
065    return setDaemonThreadRunning(t, t.getName());
066  }
067
068  /**
069   * Utility method that sets name, daemon status and starts passed thread.
070   * @param t thread to frob
071   * @param name new name
072   * @return Returns the passed Thread <code>t</code>.
073   */
074  public static <T extends Thread> T setDaemonThreadRunning(T t, String name) {
075    return setDaemonThreadRunning(t, name, null);
076  }
077
078  /**
079   * Utility method that sets name, daemon status and starts passed thread.
080   * @param t thread to frob
081   * @param name new name
082   * @param handler A handler to set on the thread. Pass null if want to use default handler.
083   * @return Returns the passed Thread <code>t</code>.
084   */
085  public static <T extends Thread> T setDaemonThreadRunning(T t, String name,
086      UncaughtExceptionHandler handler) {
087    t.setName(name);
088    if (handler != null) {
089      t.setUncaughtExceptionHandler(handler);
090    }
091    t.setDaemon(true);
092    t.start();
093    return t;
094  }
095
096  /**
097   * Shutdown passed thread using isAlive and join.
098   * @param t Thread to shutdown
099   */
100  public static void shutdown(final Thread t) {
101    shutdown(t, 0);
102  }
103
104  /**
105   * Shutdown passed thread using isAlive and join.
106   * @param joinwait Pass 0 if we're to wait forever.
107   * @param t Thread to shutdown
108   */
109  public static void shutdown(final Thread t, final long joinwait) {
110    if (t == null) return;
111    while (t.isAlive()) {
112      try {
113        t.join(joinwait);
114      } catch (InterruptedException e) {
115        LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
116      }
117    }
118  }
119
120
121  /**
122   * @param t Waits on the passed thread to die dumping a threaddump every
123   * minute while its up.
124   * @throws InterruptedException
125   */
126  public static void threadDumpingIsAlive(final Thread t)
127  throws InterruptedException {
128    if (t == null) {
129      return;
130    }
131
132    while (t.isAlive()) {
133      t.join(60 * 1000);
134      if (t.isAlive()) {
135        printThreadInfo(System.out,
136            "Automatic Stack Trace every 60 seconds waiting on " +
137            t.getName());
138      }
139    }
140  }
141
142  /**
143   * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns
144   * @param millis How long to sleep for in milliseconds.
145   */
146  public static void sleep(long millis) {
147    try {
148      Thread.sleep(millis);
149    } catch (InterruptedException e) {
150      LOG.warn("sleep interrupted", e);
151      Thread.currentThread().interrupt();
152    }
153  }
154
155  /**
156   * Sleeps for the given amount of time even if interrupted. Preserves
157   * the interrupt status.
158   * @param msToWait the amount of time to sleep in milliseconds
159   */
160  public static void sleepWithoutInterrupt(final long msToWait) {
161    long timeMillis = System.currentTimeMillis();
162    long endTime = timeMillis + msToWait;
163    boolean interrupted = false;
164    while (timeMillis < endTime) {
165      try {
166        Thread.sleep(endTime - timeMillis);
167      } catch (InterruptedException ex) {
168        interrupted = true;
169      }
170      timeMillis = System.currentTimeMillis();
171    }
172
173    if (interrupted) {
174      Thread.currentThread().interrupt();
175    }
176  }
177
178  /**
179   * Create a new CachedThreadPool with a bounded number as the maximum
180   * thread size in the pool.
181   *
182   * @param maxCachedThread the maximum thread could be created in the pool
183   * @param timeout the maximum time to wait
184   * @param unit the time unit of the timeout argument
185   * @param threadFactory the factory to use when creating new threads
186   * @return threadPoolExecutor the cachedThreadPool with a bounded number
187   * as the maximum thread size in the pool.
188   */
189  public static ThreadPoolExecutor getBoundedCachedThreadPool(
190      int maxCachedThread, long timeout, TimeUnit unit,
191      ThreadFactory threadFactory) {
192    ThreadPoolExecutor boundedCachedThreadPool =
193      new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
194        unit, new LinkedBlockingQueue<>(), threadFactory);
195    // allow the core pool threads timeout and terminate
196    boundedCachedThreadPool.allowCoreThreadTimeOut(true);
197    return boundedCachedThreadPool;
198  }
199
200
201  /**
202   * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
203   * with a common prefix.
204   * @param prefix The prefix of every created Thread's name
205   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
206   */
207  public static ThreadFactory getNamedThreadFactory(final String prefix) {
208    SecurityManager s = System.getSecurityManager();
209    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
210        .getThreadGroup();
211
212    return new ThreadFactory() {
213      final AtomicInteger threadNumber = new AtomicInteger(1);
214      private final int poolNumber = Threads.poolNumber.getAndIncrement();
215      final ThreadGroup group = threadGroup;
216
217      @Override
218      public Thread newThread(Runnable r) {
219        final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
220        return new Thread(group, r, name);
221      }
222    };
223  }
224
225  /**
226   * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
227   * without setting the exception handler.
228   */
229  public static ThreadFactory newDaemonThreadFactory(final String prefix) {
230    return newDaemonThreadFactory(prefix, null);
231  }
232
233  /**
234   * Get a named {@link ThreadFactory} that just builds daemon threads.
235   * @param prefix name prefix for all threads created from the factory
236   * @param handler unhandles exception handler to set for all threads
237   * @return a thread factory that creates named, daemon threads with
238   *         the supplied exception handler and normal priority
239   */
240  public static ThreadFactory newDaemonThreadFactory(final String prefix,
241      final UncaughtExceptionHandler handler) {
242    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
243    return new ThreadFactory() {
244      @Override
245      public Thread newThread(Runnable r) {
246        Thread t = namedFactory.newThread(r);
247        if (handler != null) {
248          t.setUncaughtExceptionHandler(handler);
249        } else {
250          t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
251        }
252        if (!t.isDaemon()) {
253          t.setDaemon(true);
254        }
255        if (t.getPriority() != Thread.NORM_PRIORITY) {
256          t.setPriority(Thread.NORM_PRIORITY);
257        }
258        return t;
259      }
260
261    };
262  }
263
264  /** Sets an UncaughtExceptionHandler for the thread which logs the
265   * Exception stack if the thread dies.
266   */
267  public static void setLoggingUncaughtExceptionHandler(Thread t) {
268    t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
269  }
270
271  private static interface PrintThreadInfoHelper {
272
273    void printThreadInfo(PrintStream stream, String title);
274
275  }
276
277  private static class PrintThreadInfoLazyHolder {
278
279    public static final PrintThreadInfoHelper HELPER = initHelper();
280
281    private static PrintThreadInfoHelper initHelper() {
282      Method method = null;
283      try {
284        // Hadoop 2.7+ declares printThreadInfo(PrintStream, String)
285        method = ReflectionUtils.class.getMethod("printThreadInfo", PrintStream.class,
286          String.class);
287        method.setAccessible(true);
288        final Method hadoop27Method = method;
289        return new PrintThreadInfoHelper() {
290
291          @Override
292          public void printThreadInfo(PrintStream stream, String title) {
293            try {
294              hadoop27Method.invoke(null, stream, title);
295            } catch (IllegalAccessException | IllegalArgumentException e) {
296              throw new RuntimeException(e);
297            } catch (InvocationTargetException e) {
298              throw new RuntimeException(e.getCause());
299            }
300          }
301        };
302      } catch (NoSuchMethodException e) {
303        LOG.info(
304          "Can not find hadoop 2.7+ printThreadInfo method, try hadoop hadoop 2.6 and earlier", e);
305      }
306      try {
307        // Hadoop 2.6 and earlier declares printThreadInfo(PrintWriter, String)
308        method = ReflectionUtils.class.getMethod("printThreadInfo", PrintWriter.class,
309          String.class);
310        method.setAccessible(true);
311        final Method hadoop26Method = method;
312        return new PrintThreadInfoHelper() {
313
314          @Override
315          public void printThreadInfo(PrintStream stream, String title) {
316            try {
317              hadoop26Method.invoke(null, new PrintWriter(
318                  new OutputStreamWriter(stream, StandardCharsets.UTF_8)), title);
319            } catch (IllegalAccessException | IllegalArgumentException e) {
320              throw new RuntimeException(e);
321            } catch (InvocationTargetException e) {
322              throw new RuntimeException(e.getCause());
323            }
324          }
325        };
326      } catch (NoSuchMethodException e) {
327        LOG.warn("Cannot find printThreadInfo method. Check hadoop jars linked", e);
328      }
329      return null;
330    }
331  }
332
333  /**
334   * Print all of the thread's information and stack traces. Wrapper around Hadoop's method.
335   *
336   * @param stream the stream to
337   * @param title a string title for the stack trace
338   */
339  public static void printThreadInfo(PrintStream stream, String title) {
340    Preconditions.checkNotNull(PrintThreadInfoLazyHolder.HELPER,
341      "Cannot find method. Check hadoop jars linked").printThreadInfo(stream, title);
342  }
343}