001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.PrintStream; 021import java.lang.Thread.UncaughtExceptionHandler; 022import java.util.Set; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.ThreadFactory; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.util.StringUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Thread Utility 035 */ 036@InterfaceAudience.Private 037public class Threads { 038 private static final Logger LOG = LoggerFactory.getLogger(Threads.class); 039 040 public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = 041 (t, e) -> LOG.warn("Thread:{} exited with Exception:{}", t, StringUtils.stringifyException(e)); 042 043 /** 044 * Utility method that sets name, daemon status and starts passed thread. 045 * @param t thread to run 046 * @return Returns the passed Thread <code>t</code>. 047 */ 048 public static <T extends Thread> T setDaemonThreadRunning(T t) { 049 return setDaemonThreadRunning(t, t.getName()); 050 } 051 052 /** 053 * Utility method that sets name, daemon status and starts passed thread. 054 * @param t thread to frob 055 * @param name new name 056 * @return Returns the passed Thread <code>t</code>. 057 */ 058 public static <T extends Thread> T setDaemonThreadRunning(T t, String name) { 059 return setDaemonThreadRunning(t, name, null); 060 } 061 062 /** 063 * Utility method that sets name, daemon status and starts passed thread. 064 * @param t thread to frob 065 * @param name new name 066 * @param handler A handler to set on the thread. Pass null if want to use default handler. 067 * @return Returns the passed Thread <code>t</code>. 068 */ 069 public static <T extends Thread> T setDaemonThreadRunning(T t, String name, 070 UncaughtExceptionHandler handler) { 071 t.setName(name); 072 if (handler != null) { 073 t.setUncaughtExceptionHandler(handler); 074 } 075 t.setDaemon(true); 076 t.start(); 077 return t; 078 } 079 080 /** 081 * Shutdown passed thread using isAlive and join. 082 * @param t Thread to shutdown 083 */ 084 public static void shutdown(final Thread t) { 085 shutdown(t, 0); 086 } 087 088 /** 089 * Shutdown passed thread using isAlive and join. 090 * @param joinwait Pass 0 if we're to wait forever. 091 * @param t Thread to shutdown 092 */ 093 public static void shutdown(final Thread t, final long joinwait) { 094 if (t == null) return; 095 while (t.isAlive()) { 096 try { 097 t.join(joinwait); 098 } catch (InterruptedException e) { 099 LOG.warn(t.getName() + "; joinwait=" + joinwait, e); 100 } 101 } 102 } 103 104 /** 105 * @param t Waits on the passed thread to die dumping a threaddump every minute while its up. n 106 */ 107 public static void threadDumpingIsAlive(final Thread t) throws InterruptedException { 108 if (t == null) { 109 return; 110 } 111 112 while (t.isAlive()) { 113 t.join(60 * 1000); 114 if (t.isAlive()) { 115 printThreadInfo(System.out, 116 "Automatic Stack Trace every 60 seconds waiting on " + t.getName()); 117 } 118 } 119 } 120 121 /** 122 * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns 123 * @param millis How long to sleep for in milliseconds. 124 */ 125 public static void sleep(long millis) { 126 try { 127 Thread.sleep(millis); 128 } catch (InterruptedException e) { 129 LOG.warn("sleep interrupted", e); 130 Thread.currentThread().interrupt(); 131 } 132 } 133 134 /** 135 * Sleeps for the given amount of time even if interrupted. Preserves the interrupt status. 136 * @param msToWait the amount of time to sleep in milliseconds 137 */ 138 public static void sleepWithoutInterrupt(final long msToWait) { 139 long timeMillis = EnvironmentEdgeManager.currentTime(); 140 long endTime = timeMillis + msToWait; 141 boolean interrupted = false; 142 while (timeMillis < endTime) { 143 try { 144 Thread.sleep(endTime - timeMillis); 145 } catch (InterruptedException ex) { 146 interrupted = true; 147 } 148 timeMillis = EnvironmentEdgeManager.currentTime(); 149 } 150 151 if (interrupted) { 152 Thread.currentThread().interrupt(); 153 } 154 } 155 156 /** 157 * Create a new CachedThreadPool with a bounded number as the maximum thread size in the pool. 158 * @param maxCachedThread the maximum thread could be created in the pool 159 * @param timeout the maximum time to wait 160 * @param unit the time unit of the timeout argument 161 * @param threadFactory the factory to use when creating new threads 162 * @return threadPoolExecutor the cachedThreadPool with a bounded number as the maximum thread 163 * size in the pool. 164 */ 165 public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout, 166 TimeUnit unit, ThreadFactory threadFactory) { 167 ThreadPoolExecutor boundedCachedThreadPool = new ThreadPoolExecutor(maxCachedThread, 168 maxCachedThread, timeout, unit, new LinkedBlockingQueue<>(), threadFactory); 169 // allow the core pool threads timeout and terminate 170 boundedCachedThreadPool.allowCoreThreadTimeOut(true); 171 return boundedCachedThreadPool; 172 } 173 174 /** 175 * Sets an UncaughtExceptionHandler for the thread which logs the Exception stack if the thread 176 * dies. 177 */ 178 public static void setLoggingUncaughtExceptionHandler(Thread t) { 179 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); 180 } 181 182 /** 183 * Print all of the thread's information and stack traces. Wrapper around Hadoop's method. 184 * @param stream the stream to 185 * @param title a string title for the stack trace 186 */ 187 public static void printThreadInfo(PrintStream stream, String title) { 188 ReflectionUtils.printThreadInfo(stream, title); 189 } 190 191 /** 192 * Checks whether any non-daemon thread is running. 193 * @return true if there are non daemon threads running, otherwise false 194 */ 195 public static boolean isNonDaemonThreadRunning() { 196 AtomicInteger nonDaemonThreadCount = new AtomicInteger(); 197 Set<Thread> threads = Thread.getAllStackTraces().keySet(); 198 threads.forEach(t -> { 199 // Exclude current thread 200 if (t.getId() != Thread.currentThread().getId() && !t.isDaemon()) { 201 nonDaemonThreadCount.getAndIncrement(); 202 LOG.info("Non daemon thread {} is still alive", t.getName()); 203 LOG.info(printStackTrace(t)); 204 } 205 }); 206 return nonDaemonThreadCount.get() > 0; 207 } 208 209 /* 210 * Print stack trace of the passed thread 211 */ 212 public static String printStackTrace(Thread t) { 213 StringBuilder sb = new StringBuilder(); 214 for (StackTraceElement frame : t.getStackTrace()) { 215 sb.append("\n").append(" ").append(frame.toString()); 216 } 217 return sb.toString(); 218 } 219}