001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.OutputStreamWriter; 022import java.io.PrintStream; 023import java.io.PrintWriter; 024import java.lang.Thread.UncaughtExceptionHandler; 025import java.lang.reflect.InvocationTargetException; 026import java.lang.reflect.Method; 027import java.nio.charset.StandardCharsets; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicInteger; 033 034import org.apache.hadoop.util.ReflectionUtils; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041 042/** 043 * Thread Utility 044 */ 045@InterfaceAudience.Private 046public class Threads { 047 private static final Logger LOG = LoggerFactory.getLogger(Threads.class); 048 private static final AtomicInteger poolNumber = new AtomicInteger(1); 049 050 public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = 051 new UncaughtExceptionHandler() { 052 @Override 053 public void uncaughtException(Thread t, Throwable e) { 054 LOG.warn("Thread:" + t + " exited with Exception:" 055 + StringUtils.stringifyException(e)); 056 } 057 }; 058 059 /** 060 * Utility method that sets name, daemon status and starts passed thread. 061 * @param t thread to run 062 * @return Returns the passed Thread <code>t</code>. 063 */ 064 public static <T extends Thread> T setDaemonThreadRunning(T t) { 065 return setDaemonThreadRunning(t, t.getName()); 066 } 067 068 /** 069 * Utility method that sets name, daemon status and starts passed thread. 070 * @param t thread to frob 071 * @param name new name 072 * @return Returns the passed Thread <code>t</code>. 073 */ 074 public static <T extends Thread> T setDaemonThreadRunning(T t, String name) { 075 return setDaemonThreadRunning(t, name, null); 076 } 077 078 /** 079 * Utility method that sets name, daemon status and starts passed thread. 080 * @param t thread to frob 081 * @param name new name 082 * @param handler A handler to set on the thread. Pass null if want to use default handler. 083 * @return Returns the passed Thread <code>t</code>. 084 */ 085 public static <T extends Thread> T setDaemonThreadRunning(T t, String name, 086 UncaughtExceptionHandler handler) { 087 t.setName(name); 088 if (handler != null) { 089 t.setUncaughtExceptionHandler(handler); 090 } 091 t.setDaemon(true); 092 t.start(); 093 return t; 094 } 095 096 /** 097 * Shutdown passed thread using isAlive and join. 098 * @param t Thread to shutdown 099 */ 100 public static void shutdown(final Thread t) { 101 shutdown(t, 0); 102 } 103 104 /** 105 * Shutdown passed thread using isAlive and join. 106 * @param joinwait Pass 0 if we're to wait forever. 107 * @param t Thread to shutdown 108 */ 109 public static void shutdown(final Thread t, final long joinwait) { 110 if (t == null) return; 111 while (t.isAlive()) { 112 try { 113 t.join(joinwait); 114 } catch (InterruptedException e) { 115 LOG.warn(t.getName() + "; joinwait=" + joinwait, e); 116 } 117 } 118 } 119 120 121 /** 122 * @param t Waits on the passed thread to die dumping a threaddump every 123 * minute while its up. 124 * @throws InterruptedException 125 */ 126 public static void threadDumpingIsAlive(final Thread t) 127 throws InterruptedException { 128 if (t == null) { 129 return; 130 } 131 132 while (t.isAlive()) { 133 t.join(60 * 1000); 134 if (t.isAlive()) { 135 printThreadInfo(System.out, 136 "Automatic Stack Trace every 60 seconds waiting on " + 137 t.getName()); 138 } 139 } 140 } 141 142 /** 143 * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns 144 * @param millis How long to sleep for in milliseconds. 145 */ 146 public static void sleep(long millis) { 147 try { 148 Thread.sleep(millis); 149 } catch (InterruptedException e) { 150 LOG.warn("sleep interrupted", e); 151 Thread.currentThread().interrupt(); 152 } 153 } 154 155 /** 156 * Sleeps for the given amount of time even if interrupted. Preserves 157 * the interrupt status. 158 * @param msToWait the amount of time to sleep in milliseconds 159 */ 160 public static void sleepWithoutInterrupt(final long msToWait) { 161 long timeMillis = System.currentTimeMillis(); 162 long endTime = timeMillis + msToWait; 163 boolean interrupted = false; 164 while (timeMillis < endTime) { 165 try { 166 Thread.sleep(endTime - timeMillis); 167 } catch (InterruptedException ex) { 168 interrupted = true; 169 } 170 timeMillis = System.currentTimeMillis(); 171 } 172 173 if (interrupted) { 174 Thread.currentThread().interrupt(); 175 } 176 } 177 178 /** 179 * Create a new CachedThreadPool with a bounded number as the maximum 180 * thread size in the pool. 181 * 182 * @param maxCachedThread the maximum thread could be created in the pool 183 * @param timeout the maximum time to wait 184 * @param unit the time unit of the timeout argument 185 * @param threadFactory the factory to use when creating new threads 186 * @return threadPoolExecutor the cachedThreadPool with a bounded number 187 * as the maximum thread size in the pool. 188 */ 189 public static ThreadPoolExecutor getBoundedCachedThreadPool( 190 int maxCachedThread, long timeout, TimeUnit unit, 191 ThreadFactory threadFactory) { 192 ThreadPoolExecutor boundedCachedThreadPool = 193 new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout, 194 unit, new LinkedBlockingQueue<>(), threadFactory); 195 // allow the core pool threads timeout and terminate 196 boundedCachedThreadPool.allowCoreThreadTimeOut(true); 197 return boundedCachedThreadPool; 198 } 199 200 201 /** 202 * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, 203 * with a common prefix. 204 * @param prefix The prefix of every created Thread's name 205 * @return a {@link java.util.concurrent.ThreadFactory} that names threads 206 */ 207 public static ThreadFactory getNamedThreadFactory(final String prefix) { 208 SecurityManager s = System.getSecurityManager(); 209 final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() 210 .getThreadGroup(); 211 212 return new ThreadFactory() { 213 final AtomicInteger threadNumber = new AtomicInteger(1); 214 private final int poolNumber = Threads.poolNumber.getAndIncrement(); 215 final ThreadGroup group = threadGroup; 216 217 @Override 218 public Thread newThread(Runnable r) { 219 final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement(); 220 return new Thread(group, r, name); 221 } 222 }; 223 } 224 225 /** 226 * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)}, 227 * without setting the exception handler. 228 */ 229 public static ThreadFactory newDaemonThreadFactory(final String prefix) { 230 return newDaemonThreadFactory(prefix, null); 231 } 232 233 /** 234 * Get a named {@link ThreadFactory} that just builds daemon threads. 235 * @param prefix name prefix for all threads created from the factory 236 * @param handler unhandles exception handler to set for all threads 237 * @return a thread factory that creates named, daemon threads with 238 * the supplied exception handler and normal priority 239 */ 240 public static ThreadFactory newDaemonThreadFactory(final String prefix, 241 final UncaughtExceptionHandler handler) { 242 final ThreadFactory namedFactory = getNamedThreadFactory(prefix); 243 return new ThreadFactory() { 244 @Override 245 public Thread newThread(Runnable r) { 246 Thread t = namedFactory.newThread(r); 247 if (handler != null) { 248 t.setUncaughtExceptionHandler(handler); 249 } else { 250 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); 251 } 252 if (!t.isDaemon()) { 253 t.setDaemon(true); 254 } 255 if (t.getPriority() != Thread.NORM_PRIORITY) { 256 t.setPriority(Thread.NORM_PRIORITY); 257 } 258 return t; 259 } 260 261 }; 262 } 263 264 /** Sets an UncaughtExceptionHandler for the thread which logs the 265 * Exception stack if the thread dies. 266 */ 267 public static void setLoggingUncaughtExceptionHandler(Thread t) { 268 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); 269 } 270 271 private static interface PrintThreadInfoHelper { 272 273 void printThreadInfo(PrintStream stream, String title); 274 275 } 276 277 private static class PrintThreadInfoLazyHolder { 278 279 public static final PrintThreadInfoHelper HELPER = initHelper(); 280 281 private static PrintThreadInfoHelper initHelper() { 282 Method method = null; 283 try { 284 // Hadoop 2.7+ declares printThreadInfo(PrintStream, String) 285 method = ReflectionUtils.class.getMethod("printThreadInfo", PrintStream.class, 286 String.class); 287 method.setAccessible(true); 288 final Method hadoop27Method = method; 289 return new PrintThreadInfoHelper() { 290 291 @Override 292 public void printThreadInfo(PrintStream stream, String title) { 293 try { 294 hadoop27Method.invoke(null, stream, title); 295 } catch (IllegalAccessException | IllegalArgumentException e) { 296 throw new RuntimeException(e); 297 } catch (InvocationTargetException e) { 298 throw new RuntimeException(e.getCause()); 299 } 300 } 301 }; 302 } catch (NoSuchMethodException e) { 303 LOG.info( 304 "Can not find hadoop 2.7+ printThreadInfo method, try hadoop hadoop 2.6 and earlier", e); 305 } 306 try { 307 // Hadoop 2.6 and earlier declares printThreadInfo(PrintWriter, String) 308 method = ReflectionUtils.class.getMethod("printThreadInfo", PrintWriter.class, 309 String.class); 310 method.setAccessible(true); 311 final Method hadoop26Method = method; 312 return new PrintThreadInfoHelper() { 313 314 @Override 315 public void printThreadInfo(PrintStream stream, String title) { 316 try { 317 hadoop26Method.invoke(null, new PrintWriter( 318 new OutputStreamWriter(stream, StandardCharsets.UTF_8)), title); 319 } catch (IllegalAccessException | IllegalArgumentException e) { 320 throw new RuntimeException(e); 321 } catch (InvocationTargetException e) { 322 throw new RuntimeException(e.getCause()); 323 } 324 } 325 }; 326 } catch (NoSuchMethodException e) { 327 LOG.warn("Cannot find printThreadInfo method. Check hadoop jars linked", e); 328 } 329 return null; 330 } 331 } 332 333 /** 334 * Print all of the thread's information and stack traces. Wrapper around Hadoop's method. 335 * 336 * @param stream the stream to 337 * @param title a string title for the stack trace 338 */ 339 public static void printThreadInfo(PrintStream stream, String title) { 340 Preconditions.checkNotNull(PrintThreadInfoLazyHolder.HELPER, 341 "Cannot find method. Check hadoop jars linked").printThreadInfo(stream, title); 342 } 343}