001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.OutputStreamWriter; 022import java.io.PrintStream; 023import java.io.PrintWriter; 024import java.lang.Thread.UncaughtExceptionHandler; 025import java.lang.reflect.InvocationTargetException; 026import java.lang.reflect.Method; 027import java.nio.charset.StandardCharsets; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.util.ReflectionUtils; 033import org.apache.hadoop.util.StringUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 039 040/** 041 * Thread Utility 042 */ 043@InterfaceAudience.Private 044public class Threads { 045 private static final Logger LOG = LoggerFactory.getLogger(Threads.class); 046 047 public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = 048 (t, e) -> LOG.warn("Thread:{} exited with Exception:{}", t, StringUtils.stringifyException(e)); 049 050 /** 051 * Utility method that sets name, daemon status and starts passed thread. 052 * @param t thread to run 053 * @return Returns the passed Thread <code>t</code>. 054 */ 055 public static <T extends Thread> T setDaemonThreadRunning(T t) { 056 return setDaemonThreadRunning(t, t.getName()); 057 } 058 059 /** 060 * Utility method that sets name, daemon status and starts passed thread. 061 * @param t thread to frob 062 * @param name new name 063 * @return Returns the passed Thread <code>t</code>. 064 */ 065 public static <T extends Thread> T setDaemonThreadRunning(T t, String name) { 066 return setDaemonThreadRunning(t, name, null); 067 } 068 069 /** 070 * Utility method that sets name, daemon status and starts passed thread. 071 * @param t thread to frob 072 * @param name new name 073 * @param handler A handler to set on the thread. Pass null if want to use default handler. 074 * @return Returns the passed Thread <code>t</code>. 075 */ 076 public static <T extends Thread> T setDaemonThreadRunning(T t, String name, 077 UncaughtExceptionHandler handler) { 078 t.setName(name); 079 if (handler != null) { 080 t.setUncaughtExceptionHandler(handler); 081 } 082 t.setDaemon(true); 083 t.start(); 084 return t; 085 } 086 087 /** 088 * Shutdown passed thread using isAlive and join. 089 * @param t Thread to shutdown 090 */ 091 public static void shutdown(final Thread t) { 092 shutdown(t, 0); 093 } 094 095 /** 096 * Shutdown passed thread using isAlive and join. 097 * @param joinwait Pass 0 if we're to wait forever. 098 * @param t Thread to shutdown 099 */ 100 public static void shutdown(final Thread t, final long joinwait) { 101 if (t == null) return; 102 while (t.isAlive()) { 103 try { 104 t.join(joinwait); 105 } catch (InterruptedException e) { 106 LOG.warn(t.getName() + "; joinwait=" + joinwait, e); 107 } 108 } 109 } 110 111 112 /** 113 * @param t Waits on the passed thread to die dumping a threaddump every 114 * minute while its up. 115 * @throws InterruptedException 116 */ 117 public static void threadDumpingIsAlive(final Thread t) 118 throws InterruptedException { 119 if (t == null) { 120 return; 121 } 122 123 while (t.isAlive()) { 124 t.join(60 * 1000); 125 if (t.isAlive()) { 126 printThreadInfo(System.out, 127 "Automatic Stack Trace every 60 seconds waiting on " + 128 t.getName()); 129 } 130 } 131 } 132 133 /** 134 * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns 135 * @param millis How long to sleep for in milliseconds. 136 */ 137 public static void sleep(long millis) { 138 try { 139 Thread.sleep(millis); 140 } catch (InterruptedException e) { 141 LOG.warn("sleep interrupted", e); 142 Thread.currentThread().interrupt(); 143 } 144 } 145 146 /** 147 * Sleeps for the given amount of time even if interrupted. Preserves 148 * the interrupt status. 149 * @param msToWait the amount of time to sleep in milliseconds 150 */ 151 public static void sleepWithoutInterrupt(final long msToWait) { 152 long timeMillis = System.currentTimeMillis(); 153 long endTime = timeMillis + msToWait; 154 boolean interrupted = false; 155 while (timeMillis < endTime) { 156 try { 157 Thread.sleep(endTime - timeMillis); 158 } catch (InterruptedException ex) { 159 interrupted = true; 160 } 161 timeMillis = System.currentTimeMillis(); 162 } 163 164 if (interrupted) { 165 Thread.currentThread().interrupt(); 166 } 167 } 168 169 /** 170 * Create a new CachedThreadPool with a bounded number as the maximum 171 * thread size in the pool. 172 * 173 * @param maxCachedThread the maximum thread could be created in the pool 174 * @param timeout the maximum time to wait 175 * @param unit the time unit of the timeout argument 176 * @param threadFactory the factory to use when creating new threads 177 * @return threadPoolExecutor the cachedThreadPool with a bounded number 178 * as the maximum thread size in the pool. 179 */ 180 public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout, 181 TimeUnit unit, ThreadFactory threadFactory) { 182 ThreadPoolExecutor boundedCachedThreadPool = 183 new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout, unit, 184 new LinkedBlockingQueue<>(), threadFactory); 185 // allow the core pool threads timeout and terminate 186 boundedCachedThreadPool.allowCoreThreadTimeOut(true); 187 return boundedCachedThreadPool; 188 } 189 190 /** Sets an UncaughtExceptionHandler for the thread which logs the 191 * Exception stack if the thread dies. 192 */ 193 public static void setLoggingUncaughtExceptionHandler(Thread t) { 194 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); 195 } 196 197 private interface PrintThreadInfoHelper { 198 199 void printThreadInfo(PrintStream stream, String title); 200 201 } 202 203 private static class PrintThreadInfoLazyHolder { 204 205 public static final PrintThreadInfoHelper HELPER = initHelper(); 206 207 private static PrintThreadInfoHelper initHelper() { 208 Method method = null; 209 try { 210 // Hadoop 2.7+ declares printThreadInfo(PrintStream, String) 211 method = ReflectionUtils.class.getMethod("printThreadInfo", PrintStream.class, 212 String.class); 213 method.setAccessible(true); 214 final Method hadoop27Method = method; 215 return new PrintThreadInfoHelper() { 216 217 @Override 218 public void printThreadInfo(PrintStream stream, String title) { 219 try { 220 hadoop27Method.invoke(null, stream, title); 221 } catch (IllegalAccessException | IllegalArgumentException e) { 222 throw new RuntimeException(e); 223 } catch (InvocationTargetException e) { 224 throw new RuntimeException(e.getCause()); 225 } 226 } 227 }; 228 } catch (NoSuchMethodException e) { 229 LOG.info( 230 "Can not find hadoop 2.7+ printThreadInfo method, try hadoop hadoop 2.6 and earlier", e); 231 } 232 try { 233 // Hadoop 2.6 and earlier declares printThreadInfo(PrintWriter, String) 234 method = ReflectionUtils.class.getMethod("printThreadInfo", PrintWriter.class, 235 String.class); 236 method.setAccessible(true); 237 final Method hadoop26Method = method; 238 return new PrintThreadInfoHelper() { 239 240 @Override 241 public void printThreadInfo(PrintStream stream, String title) { 242 try { 243 hadoop26Method.invoke(null, new PrintWriter( 244 new OutputStreamWriter(stream, StandardCharsets.UTF_8)), title); 245 } catch (IllegalAccessException | IllegalArgumentException e) { 246 throw new RuntimeException(e); 247 } catch (InvocationTargetException e) { 248 throw new RuntimeException(e.getCause()); 249 } 250 } 251 }; 252 } catch (NoSuchMethodException e) { 253 LOG.warn("Cannot find printThreadInfo method. Check hadoop jars linked", e); 254 } 255 return null; 256 } 257 } 258 259 /** 260 * Print all of the thread's information and stack traces. Wrapper around Hadoop's method. 261 * 262 * @param stream the stream to 263 * @param title a string title for the stack trace 264 */ 265 public static void printThreadInfo(PrintStream stream, String title) { 266 Preconditions.checkNotNull(PrintThreadInfoLazyHolder.HELPER, 267 "Cannot find method. Check hadoop jars linked").printThreadInfo(stream, title); 268 } 269}