001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.PrintStream; 021import java.lang.Thread.UncaughtExceptionHandler; 022import java.util.Set; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.ThreadFactory; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.util.StringUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Thread Utility 035 */ 036@InterfaceAudience.Private 037public class Threads { 038 private static final Logger LOG = LoggerFactory.getLogger(Threads.class); 039 040 public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = 041 (t, e) -> LOG.warn("Thread:{} exited with Exception:{}", t, StringUtils.stringifyException(e)); 042 043 /** 044 * Utility method that sets name, daemon status and starts passed thread. 045 * @param t thread to run 046 * @return Returns the passed Thread <code>t</code>. 047 */ 048 public static <T extends Thread> T setDaemonThreadRunning(T t) { 049 return setDaemonThreadRunning(t, t.getName()); 050 } 051 052 /** 053 * Utility method that sets name, daemon status and starts passed thread. 054 * @param t thread to frob 055 * @param name new name 056 * @return Returns the passed Thread <code>t</code>. 057 */ 058 public static <T extends Thread> T setDaemonThreadRunning(T t, String name) { 059 return setDaemonThreadRunning(t, name, null); 060 } 061 062 /** 063 * Utility method that sets name, daemon status and starts passed thread. 064 * @param t thread to frob 065 * @param name new name 066 * @param handler A handler to set on the thread. Pass null if want to use default handler. 067 * @return Returns the passed Thread <code>t</code>. 068 */ 069 public static <T extends Thread> T setDaemonThreadRunning(T t, String name, 070 UncaughtExceptionHandler handler) { 071 t.setName(name); 072 if (handler != null) { 073 t.setUncaughtExceptionHandler(handler); 074 } 075 t.setDaemon(true); 076 t.start(); 077 return t; 078 } 079 080 /** 081 * Shutdown passed thread using isAlive and join. 082 * @param t Thread to shutdown 083 */ 084 public static void shutdown(final Thread t) { 085 shutdown(t, 0); 086 } 087 088 /** 089 * Shutdown passed thread using isAlive and join. 090 * @param joinwait Pass 0 if we're to wait forever. 091 * @param t Thread to shutdown 092 */ 093 public static void shutdown(final Thread t, final long joinwait) { 094 if (t == null) return; 095 while (t.isAlive()) { 096 try { 097 t.join(joinwait); 098 } catch (InterruptedException e) { 099 LOG.warn(t.getName() + "; joinwait=" + joinwait, e); 100 } 101 } 102 } 103 104 /** Waits on the passed thread to die dumping a threaddump every minute while its up. */ 105 public static void threadDumpingIsAlive(final Thread t) throws InterruptedException { 106 if (t == null) { 107 return; 108 } 109 110 while (t.isAlive()) { 111 t.join(60 * 1000); 112 if (t.isAlive()) { 113 printThreadInfo(System.out, 114 "Automatic Stack Trace every 60 seconds waiting on " + t.getName()); 115 } 116 } 117 } 118 119 /** 120 * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns 121 * @param millis How long to sleep for in milliseconds. 122 */ 123 public static void sleep(long millis) { 124 try { 125 Thread.sleep(millis); 126 } catch (InterruptedException e) { 127 LOG.warn("sleep interrupted", e); 128 Thread.currentThread().interrupt(); 129 } 130 } 131 132 /** 133 * Sleeps for the given amount of time even if interrupted. Preserves the interrupt status. 134 * @param msToWait the amount of time to sleep in milliseconds 135 */ 136 public static void sleepWithoutInterrupt(final long msToWait) { 137 long timeMillis = EnvironmentEdgeManager.currentTime(); 138 long endTime = timeMillis + msToWait; 139 boolean interrupted = false; 140 while (timeMillis < endTime) { 141 try { 142 Thread.sleep(endTime - timeMillis); 143 } catch (InterruptedException ex) { 144 interrupted = true; 145 } 146 timeMillis = EnvironmentEdgeManager.currentTime(); 147 } 148 149 if (interrupted) { 150 Thread.currentThread().interrupt(); 151 } 152 } 153 154 /** 155 * Create a new CachedThreadPool with a bounded number as the maximum thread size in the pool. 156 * @param maxCachedThread the maximum thread could be created in the pool 157 * @param timeout the maximum time to wait 158 * @param unit the time unit of the timeout argument 159 * @param threadFactory the factory to use when creating new threads 160 * @return threadPoolExecutor the cachedThreadPool with a bounded number as the maximum thread 161 * size in the pool. 162 */ 163 public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout, 164 TimeUnit unit, ThreadFactory threadFactory) { 165 ThreadPoolExecutor boundedCachedThreadPool = new ThreadPoolExecutor(maxCachedThread, 166 maxCachedThread, timeout, unit, new LinkedBlockingQueue<>(), threadFactory); 167 // allow the core pool threads timeout and terminate 168 boundedCachedThreadPool.allowCoreThreadTimeOut(true); 169 return boundedCachedThreadPool; 170 } 171 172 /** 173 * Sets an UncaughtExceptionHandler for the thread which logs the Exception stack if the thread 174 * dies. 175 */ 176 public static void setLoggingUncaughtExceptionHandler(Thread t) { 177 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); 178 } 179 180 /** 181 * Print all of the thread's information and stack traces. Wrapper around Hadoop's method. 182 * @param stream the stream to 183 * @param title a string title for the stack trace 184 */ 185 public static void printThreadInfo(PrintStream stream, String title) { 186 ReflectionUtils.printThreadInfo(stream, title); 187 } 188 189 /** 190 * Checks whether any non-daemon thread is running. 191 * @return true if there are non daemon threads running, otherwise false 192 */ 193 public static boolean isNonDaemonThreadRunning() { 194 AtomicInteger nonDaemonThreadCount = new AtomicInteger(); 195 Set<Thread> threads = Thread.getAllStackTraces().keySet(); 196 threads.forEach(t -> { 197 // Exclude current thread 198 if (t.getId() != Thread.currentThread().getId() && !t.isDaemon()) { 199 nonDaemonThreadCount.getAndIncrement(); 200 LOG.info("Non daemon thread {} is still alive", t.getName()); 201 LOG.info(printStackTrace(t)); 202 } 203 }); 204 return nonDaemonThreadCount.get() > 0; 205 } 206 207 /* 208 * Print stack trace of the passed thread 209 */ 210 public static String printStackTrace(Thread t) { 211 StringBuilder sb = new StringBuilder(); 212 for (StackTraceElement frame : t.getStackTrace()) { 213 sb.append("\n").append(" ").append(frame.toString()); 214 } 215 return sb.toString(); 216 } 217}