001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.executor; 019 020import java.io.IOException; 021import java.io.Writer; 022import java.lang.management.ThreadInfo; 023import java.util.List; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.concurrent.BlockingQueue; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.Executors; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 042import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; 044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService; 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; 046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 047 048/** 049 * This is a generic executor service. This component abstracts a threadpool, a queue to which 050 * {@link EventType}s can be submitted, and a <code>Runnable</code> that handles the object that is 051 * added to the queue. 052 * <p> 053 * In order to create a new service, create an instance of this class and then do: 054 * <code>instance.startExecutorService(executorConfig);</code>. {@link ExecutorConfig} wraps the 055 * configuration needed by this service. When done call {@link #shutdown()}. 056 * <p> 057 * In order to use the service created above, call {@link #submit(EventHandler)}. 058 */ 059@InterfaceAudience.Private 060public class ExecutorService { 061 private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class); 062 063 // hold the all the executors created in a map addressable by their names 064 private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>(); 065 066 // Name of the server hosting this executor service. 067 private final String servername; 068 069 private final ListeningScheduledExecutorService delayedSubmitTimer = 070 MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 071 .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build())); 072 073 /** 074 * Default constructor. 075 * @param servername Name of the hosting server. 076 */ 077 public ExecutorService(final String servername) { 078 this.servername = servername; 079 } 080 081 /** 082 * Start an executor service with a given name. If there was a service already started with the 083 * same name, this throws a RuntimeException. 084 * @param config Configuration to use for the executor. 085 */ 086 public void startExecutorService(final ExecutorConfig config) { 087 final String name = config.getName(); 088 Executor hbes = this.executorMap.compute(name, (key, value) -> { 089 if (value != null) { 090 throw new RuntimeException( 091 "An executor service with the name " + key + " is already running!"); 092 } 093 return new Executor(config); 094 }); 095 096 LOG.debug("Starting executor service name={}, corePoolSize={}, maxPoolSize={}", name, 097 hbes.threadPoolExecutor.getCorePoolSize(), hbes.threadPoolExecutor.getMaximumPoolSize()); 098 } 099 100 boolean isExecutorServiceRunning(String name) { 101 return this.executorMap.containsKey(name); 102 } 103 104 public void shutdown() { 105 this.delayedSubmitTimer.shutdownNow(); 106 for (Entry<String, Executor> entry : this.executorMap.entrySet()) { 107 List<Runnable> wasRunning = entry.getValue().threadPoolExecutor.shutdownNow(); 108 if (!wasRunning.isEmpty()) { 109 LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown"); 110 } 111 } 112 this.executorMap.clear(); 113 } 114 115 Executor getExecutor(final ExecutorType type) { 116 return getExecutor(type.getExecutorName(this.servername)); 117 } 118 119 Executor getExecutor(String name) { 120 return this.executorMap.get(name); 121 } 122 123 public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) { 124 return getExecutor(type).getThreadPoolExecutor(); 125 } 126 127 /** 128 * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all 129 * paths should use this method to get the executor, should not start executor by using 130 * {@link ExecutorService#startExecutorService(ExecutorConfig)} 131 */ 132 public ThreadPoolExecutor getExecutorLazily(ExecutorConfig config) { 133 return executorMap.computeIfAbsent(config.getName(), (executorName) -> new Executor(config)) 134 .getThreadPoolExecutor(); 135 } 136 137 public void submit(final EventHandler eh) { 138 Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); 139 if (executor == null) { 140 // This happens only when events are submitted after shutdown() was 141 // called, so dropping them should be "ok" since it means we're 142 // shutting down. 143 LOG.error("Cannot submit [" + eh + "] because the executor is missing." 144 + " Is this process shutting down?"); 145 } else { 146 executor.submit(eh); 147 } 148 } 149 150 // Submit the handler after the given delay. Used for retrying. 151 public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) { 152 ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit); 153 future.addListener(() -> { 154 try { 155 future.get(); 156 } catch (Exception e) { 157 LOG.error("Failed to submit the event handler {} to executor", eh, e); 158 } 159 }, MoreExecutors.directExecutor()); 160 } 161 162 public Map<String, ExecutorStatus> getAllExecutorStatuses() { 163 Map<String, ExecutorStatus> ret = Maps.newHashMap(); 164 for (Map.Entry<String, Executor> e : executorMap.entrySet()) { 165 ret.put(e.getKey(), e.getValue().getStatus()); 166 } 167 return ret; 168 } 169 170 /** 171 * Configuration wrapper for {@link Executor}. 172 */ 173 public class ExecutorConfig { 174 // Refer to ThreadPoolExecutor javadoc for details of these configuration. 175 // Argument validation and bound checks delegated to the underlying ThreadPoolExecutor 176 // implementation. 177 public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000; 178 private int corePoolSize = -1; 179 private boolean allowCoreThreadTimeout = false; 180 private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT; 181 private ExecutorType executorType; 182 183 public ExecutorConfig setExecutorType(ExecutorType type) { 184 this.executorType = type; 185 return this; 186 } 187 188 private ExecutorType getExecutorType() { 189 return Preconditions.checkNotNull(executorType, "ExecutorType not set."); 190 } 191 192 public int getCorePoolSize() { 193 return corePoolSize; 194 } 195 196 public ExecutorConfig setCorePoolSize(int corePoolSize) { 197 this.corePoolSize = corePoolSize; 198 return this; 199 } 200 201 public boolean allowCoreThreadTimeout() { 202 return allowCoreThreadTimeout; 203 } 204 205 /** 206 * Allows timing out of core threads. Good to set this for non-critical thread pools for release 207 * of unused resources. Refer to {@link ThreadPoolExecutor#allowCoreThreadTimeOut} for 208 * additional details. 209 */ 210 public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) { 211 this.allowCoreThreadTimeout = allowCoreThreadTimeout; 212 return this; 213 } 214 215 /** 216 * Returns the executor name inferred from the type and the servername on which this is running. 217 */ 218 public String getName() { 219 return getExecutorType().getExecutorName(servername); 220 } 221 222 public long getKeepAliveTimeMillis() { 223 return keepAliveTimeMillis; 224 } 225 226 public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) { 227 this.keepAliveTimeMillis = keepAliveTimeMillis; 228 return this; 229 } 230 } 231 232 /** 233 * Executor instance. 234 */ 235 static class Executor { 236 // the thread pool executor that services the requests 237 final TrackingThreadPoolExecutor threadPoolExecutor; 238 // work queue to use - unbounded queue 239 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>(); 240 private final String name; 241 private static final AtomicLong seqids = new AtomicLong(0); 242 private final long id; 243 244 protected Executor(ExecutorConfig config) { 245 this.id = seqids.incrementAndGet(); 246 this.name = config.getName(); 247 // create the thread pool executor 248 this.threadPoolExecutor = new TrackingThreadPoolExecutor( 249 // setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue. 250 config.getCorePoolSize(), config.getCorePoolSize(), config.getKeepAliveTimeMillis(), 251 TimeUnit.MILLISECONDS, q); 252 this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout()); 253 // name the threads for this threadpool 254 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 255 tfb.setNameFormat(this.name + "-%d"); 256 tfb.setDaemon(true); 257 tfb.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); 258 this.threadPoolExecutor.setThreadFactory(tfb.build()); 259 } 260 261 /** 262 * Submit the event to the queue for handling. 263 */ 264 void submit(final EventHandler event) { 265 // If there is a listener for this type, make sure we call the before 266 // and after process methods. 267 this.threadPoolExecutor.execute(event); 268 } 269 270 TrackingThreadPoolExecutor getThreadPoolExecutor() { 271 return threadPoolExecutor; 272 } 273 274 @Override 275 public String toString() { 276 return getClass().getSimpleName() + "-" + id + "-" + name; 277 } 278 279 public ExecutorStatus getStatus() { 280 List<EventHandler> queuedEvents = Lists.newArrayList(); 281 for (Runnable r : q) { 282 if (!(r instanceof EventHandler)) { 283 LOG.warn("Non-EventHandler " + r + " queued in " + name); 284 continue; 285 } 286 queuedEvents.add((EventHandler) r); 287 } 288 289 List<RunningEventStatus> running = Lists.newArrayList(); 290 for (Map.Entry<Thread, Runnable> e : threadPoolExecutor.getRunningTasks().entrySet()) { 291 Runnable r = e.getValue(); 292 if (!(r instanceof EventHandler)) { 293 LOG.warn("Non-EventHandler " + r + " running in " + name); 294 continue; 295 } 296 running.add(new RunningEventStatus(e.getKey(), (EventHandler) r)); 297 } 298 299 return new ExecutorStatus(this, queuedEvents, running); 300 } 301 } 302 303 /** 304 * A subclass of ThreadPoolExecutor that keeps track of the Runnables that are executing at any 305 * given point in time. 306 */ 307 static class TrackingThreadPoolExecutor extends ThreadPoolExecutor { 308 private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 309 310 public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 311 TimeUnit unit, BlockingQueue<Runnable> workQueue) { 312 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 313 } 314 315 @Override 316 protected void afterExecute(Runnable r, Throwable t) { 317 super.afterExecute(r, t); 318 running.remove(Thread.currentThread()); 319 } 320 321 @Override 322 protected void beforeExecute(Thread t, Runnable r) { 323 Runnable oldPut = running.put(t, r); 324 assert oldPut == null : "inconsistency for thread " + t; 325 super.beforeExecute(t, r); 326 } 327 328 /** 329 * @return a map of the threads currently running tasks inside this executor. Each key is an 330 * active thread, and the value is the task that is currently running. Note that this is 331 * not a stable snapshot of the map. 332 */ 333 public ConcurrentMap<Thread, Runnable> getRunningTasks() { 334 return running; 335 } 336 } 337 338 /** 339 * A snapshot of the status of a particular executor. This includes the contents of the executor's 340 * pending queue, as well as the threads and events currently being processed. This is a 341 * consistent snapshot that is immutable once constructed. 342 */ 343 public static class ExecutorStatus { 344 final Executor executor; 345 final List<EventHandler> queuedEvents; 346 final List<RunningEventStatus> running; 347 348 ExecutorStatus(Executor executor, List<EventHandler> queuedEvents, 349 List<RunningEventStatus> running) { 350 this.executor = executor; 351 this.queuedEvents = queuedEvents; 352 this.running = running; 353 } 354 355 public List<EventHandler> getQueuedEvents() { 356 return queuedEvents; 357 } 358 359 public List<RunningEventStatus> getRunning() { 360 return running; 361 } 362 363 /** 364 * Dump a textual representation of the executor's status to the given writer. 365 * @param out the stream to write to 366 * @param indent a string prefix for each line, used for indentation 367 */ 368 public void dumpTo(Writer out, String indent) throws IOException { 369 out.write(indent + "Status for executor: " + executor + "\n"); 370 out.write(indent + "=======================================\n"); 371 out.write(indent + queuedEvents.size() + " events queued, " + running.size() + " running\n"); 372 if (!queuedEvents.isEmpty()) { 373 out.write(indent + "Queued:\n"); 374 for (EventHandler e : queuedEvents) { 375 out.write(indent + " " + e + "\n"); 376 } 377 out.write("\n"); 378 } 379 if (!running.isEmpty()) { 380 out.write(indent + "Running:\n"); 381 for (RunningEventStatus stat : running) { 382 out.write(indent + " Running on thread '" + stat.threadInfo.getThreadName() + "': " 383 + stat.event + "\n"); 384 out.write(ThreadMonitoring.formatThreadInfo(stat.threadInfo, indent + " ")); 385 out.write("\n"); 386 } 387 } 388 out.flush(); 389 } 390 } 391 392 /** 393 * The status of a particular event that is in the middle of being handled by an executor. 394 */ 395 public static class RunningEventStatus { 396 final ThreadInfo threadInfo; 397 final EventHandler event; 398 399 public RunningEventStatus(Thread t, EventHandler event) { 400 this.threadInfo = ThreadMonitoring.getThreadInfo(t); 401 this.event = event; 402 } 403 } 404}