001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.executor; 019 020import java.io.IOException; 021import java.io.Writer; 022import java.lang.management.ThreadInfo; 023import java.util.List; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.concurrent.BlockingQueue; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.Executors; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 040import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 041import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService; 044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 046 047/** 048 * This is a generic executor service. This component abstracts a threadpool, a queue to which 049 * {@link EventType}s can be submitted, and a <code>Runnable</code> that handles the object that is 050 * added to the queue. 051 * <p> 052 * In order to create a new service, create an instance of this class and then do: 053 * <code>instance.startExecutorService(executorConfig);</code>. {@link ExecutorConfig} wraps the 054 * configuration needed by this service. When done call {@link #shutdown()}. 055 * <p> 056 * In order to use the service created above, call {@link #submit(EventHandler)}. 057 */ 058@InterfaceAudience.Private 059public class ExecutorService { 060 private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class); 061 062 // hold the all the executors created in a map addressable by their names 063 private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>(); 064 065 // Name of the server hosting this executor service. 066 private final String servername; 067 068 private final ListeningScheduledExecutorService delayedSubmitTimer = 069 MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 070 .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build())); 071 072 /** 073 * Default constructor. 074 * @param servername Name of the hosting server. 075 */ 076 public ExecutorService(final String servername) { 077 this.servername = servername; 078 } 079 080 /** 081 * Start an executor service with a given name. If there was a service already started with the 082 * same name, this throws a RuntimeException. 083 * @param config Configuration to use for the executor. 084 */ 085 public void startExecutorService(final ExecutorConfig config) { 086 final String name = config.getName(); 087 Executor hbes = this.executorMap.compute(name, (key, value) -> { 088 if (value != null) { 089 throw new RuntimeException( 090 "An executor service with the name " + key + " is already running!"); 091 } 092 return new Executor(config); 093 }); 094 095 LOG.debug("Starting executor service name={}, corePoolSize={}, maxPoolSize={}", name, 096 hbes.threadPoolExecutor.getCorePoolSize(), hbes.threadPoolExecutor.getMaximumPoolSize()); 097 } 098 099 boolean isExecutorServiceRunning(String name) { 100 return this.executorMap.containsKey(name); 101 } 102 103 public void shutdown() { 104 this.delayedSubmitTimer.shutdownNow(); 105 for (Entry<String, Executor> entry : this.executorMap.entrySet()) { 106 List<Runnable> wasRunning = entry.getValue().threadPoolExecutor.shutdownNow(); 107 if (!wasRunning.isEmpty()) { 108 LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown"); 109 } 110 } 111 this.executorMap.clear(); 112 } 113 114 Executor getExecutor(final ExecutorType type) { 115 return getExecutor(type.getExecutorName(this.servername)); 116 } 117 118 Executor getExecutor(String name) { 119 return this.executorMap.get(name); 120 } 121 122 public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) { 123 return getExecutor(type).getThreadPoolExecutor(); 124 } 125 126 /** 127 * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all 128 * paths should use this method to get the executor, should not start executor by using 129 * {@link ExecutorService#startExecutorService(ExecutorConfig)} 130 */ 131 public ThreadPoolExecutor getExecutorLazily(ExecutorConfig config) { 132 return executorMap.computeIfAbsent(config.getName(), (executorName) -> new Executor(config)) 133 .getThreadPoolExecutor(); 134 } 135 136 public void submit(final EventHandler eh) { 137 Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); 138 if (executor == null) { 139 // This happens only when events are submitted after shutdown() was 140 // called, so dropping them should be "ok" since it means we're 141 // shutting down. 142 LOG.error("Cannot submit [" + eh + "] because the executor is missing." 143 + " Is this process shutting down?"); 144 } else { 145 executor.submit(eh); 146 } 147 } 148 149 // Submit the handler after the given delay. Used for retrying. 150 public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) { 151 ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit); 152 future.addListener(() -> { 153 try { 154 future.get(); 155 } catch (Exception e) { 156 LOG.error("Failed to submit the event handler {} to executor", eh, e); 157 } 158 }, MoreExecutors.directExecutor()); 159 } 160 161 public Map<String, ExecutorStatus> getAllExecutorStatuses() { 162 Map<String, ExecutorStatus> ret = Maps.newHashMap(); 163 for (Map.Entry<String, Executor> e : executorMap.entrySet()) { 164 ret.put(e.getKey(), e.getValue().getStatus()); 165 } 166 return ret; 167 } 168 169 /** 170 * Configuration wrapper for {@link Executor}. 171 */ 172 public class ExecutorConfig { 173 // Refer to ThreadPoolExecutor javadoc for details of these configuration. 174 // Argument validation and bound checks delegated to the underlying ThreadPoolExecutor 175 // implementation. 176 public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000; 177 private int corePoolSize = -1; 178 private boolean allowCoreThreadTimeout = false; 179 private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT; 180 private ExecutorType executorType; 181 182 public ExecutorConfig setExecutorType(ExecutorType type) { 183 this.executorType = type; 184 return this; 185 } 186 187 private ExecutorType getExecutorType() { 188 return Preconditions.checkNotNull(executorType, "ExecutorType not set."); 189 } 190 191 public int getCorePoolSize() { 192 return corePoolSize; 193 } 194 195 public ExecutorConfig setCorePoolSize(int corePoolSize) { 196 this.corePoolSize = corePoolSize; 197 return this; 198 } 199 200 public boolean allowCoreThreadTimeout() { 201 return allowCoreThreadTimeout; 202 } 203 204 /** 205 * Allows timing out of core threads. Good to set this for non-critical thread pools for release 206 * of unused resources. Refer to {@link ThreadPoolExecutor#allowCoreThreadTimeOut} for 207 * additional details. 208 */ 209 public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) { 210 this.allowCoreThreadTimeout = allowCoreThreadTimeout; 211 return this; 212 } 213 214 /** 215 * Returns the executor name inferred from the type and the servername on which this is running. 216 */ 217 public String getName() { 218 return getExecutorType().getExecutorName(servername); 219 } 220 221 public long getKeepAliveTimeMillis() { 222 return keepAliveTimeMillis; 223 } 224 225 public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) { 226 this.keepAliveTimeMillis = keepAliveTimeMillis; 227 return this; 228 } 229 } 230 231 /** 232 * Executor instance. 233 */ 234 static class Executor { 235 // the thread pool executor that services the requests 236 final TrackingThreadPoolExecutor threadPoolExecutor; 237 // work queue to use - unbounded queue 238 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>(); 239 private final String name; 240 private static final AtomicLong seqids = new AtomicLong(0); 241 private final long id; 242 243 protected Executor(ExecutorConfig config) { 244 this.id = seqids.incrementAndGet(); 245 this.name = config.getName(); 246 // create the thread pool executor 247 this.threadPoolExecutor = new TrackingThreadPoolExecutor( 248 // setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue. 249 config.getCorePoolSize(), config.getCorePoolSize(), config.getKeepAliveTimeMillis(), 250 TimeUnit.MILLISECONDS, q); 251 this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout()); 252 // name the threads for this threadpool 253 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 254 tfb.setNameFormat(this.name + "-%d"); 255 tfb.setDaemon(true); 256 this.threadPoolExecutor.setThreadFactory(tfb.build()); 257 } 258 259 /** 260 * Submit the event to the queue for handling. 261 */ 262 void submit(final EventHandler event) { 263 // If there is a listener for this type, make sure we call the before 264 // and after process methods. 265 this.threadPoolExecutor.execute(event); 266 } 267 268 TrackingThreadPoolExecutor getThreadPoolExecutor() { 269 return threadPoolExecutor; 270 } 271 272 @Override 273 public String toString() { 274 return getClass().getSimpleName() + "-" + id + "-" + name; 275 } 276 277 public ExecutorStatus getStatus() { 278 List<EventHandler> queuedEvents = Lists.newArrayList(); 279 for (Runnable r : q) { 280 if (!(r instanceof EventHandler)) { 281 LOG.warn("Non-EventHandler " + r + " queued in " + name); 282 continue; 283 } 284 queuedEvents.add((EventHandler) r); 285 } 286 287 List<RunningEventStatus> running = Lists.newArrayList(); 288 for (Map.Entry<Thread, Runnable> e : threadPoolExecutor.getRunningTasks().entrySet()) { 289 Runnable r = e.getValue(); 290 if (!(r instanceof EventHandler)) { 291 LOG.warn("Non-EventHandler " + r + " running in " + name); 292 continue; 293 } 294 running.add(new RunningEventStatus(e.getKey(), (EventHandler) r)); 295 } 296 297 return new ExecutorStatus(this, queuedEvents, running); 298 } 299 } 300 301 /** 302 * A subclass of ThreadPoolExecutor that keeps track of the Runnables that are executing at any 303 * given point in time. 304 */ 305 static class TrackingThreadPoolExecutor extends ThreadPoolExecutor { 306 private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 307 308 public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 309 TimeUnit unit, BlockingQueue<Runnable> workQueue) { 310 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 311 } 312 313 @Override 314 protected void afterExecute(Runnable r, Throwable t) { 315 super.afterExecute(r, t); 316 running.remove(Thread.currentThread()); 317 } 318 319 @Override 320 protected void beforeExecute(Thread t, Runnable r) { 321 Runnable oldPut = running.put(t, r); 322 assert oldPut == null : "inconsistency for thread " + t; 323 super.beforeExecute(t, r); 324 } 325 326 /** 327 * @return a map of the threads currently running tasks inside this executor. Each key is an 328 * active thread, and the value is the task that is currently running. Note that this is 329 * not a stable snapshot of the map. 330 */ 331 public ConcurrentMap<Thread, Runnable> getRunningTasks() { 332 return running; 333 } 334 } 335 336 /** 337 * A snapshot of the status of a particular executor. This includes the contents of the executor's 338 * pending queue, as well as the threads and events currently being processed. This is a 339 * consistent snapshot that is immutable once constructed. 340 */ 341 public static class ExecutorStatus { 342 final Executor executor; 343 final List<EventHandler> queuedEvents; 344 final List<RunningEventStatus> running; 345 346 ExecutorStatus(Executor executor, List<EventHandler> queuedEvents, 347 List<RunningEventStatus> running) { 348 this.executor = executor; 349 this.queuedEvents = queuedEvents; 350 this.running = running; 351 } 352 353 public List<EventHandler> getQueuedEvents() { 354 return queuedEvents; 355 } 356 357 public List<RunningEventStatus> getRunning() { 358 return running; 359 } 360 361 /** 362 * Dump a textual representation of the executor's status to the given writer. 363 * @param out the stream to write to 364 * @param indent a string prefix for each line, used for indentation 365 */ 366 public void dumpTo(Writer out, String indent) throws IOException { 367 out.write(indent + "Status for executor: " + executor + "\n"); 368 out.write(indent + "=======================================\n"); 369 out.write(indent + queuedEvents.size() + " events queued, " + running.size() + " running\n"); 370 if (!queuedEvents.isEmpty()) { 371 out.write(indent + "Queued:\n"); 372 for (EventHandler e : queuedEvents) { 373 out.write(indent + " " + e + "\n"); 374 } 375 out.write("\n"); 376 } 377 if (!running.isEmpty()) { 378 out.write(indent + "Running:\n"); 379 for (RunningEventStatus stat : running) { 380 out.write(indent + " Running on thread '" + stat.threadInfo.getThreadName() + "': " 381 + stat.event + "\n"); 382 out.write(ThreadMonitoring.formatThreadInfo(stat.threadInfo, indent + " ")); 383 out.write("\n"); 384 } 385 } 386 out.flush(); 387 } 388 } 389 390 /** 391 * The status of a particular event that is in the middle of being handled by an executor. 392 */ 393 public static class RunningEventStatus { 394 final ThreadInfo threadInfo; 395 final EventHandler event; 396 397 public RunningEventStatus(Thread t, EventHandler event) { 398 this.threadInfo = ThreadMonitoring.getThreadInfo(t); 399 this.event = event; 400 } 401 } 402}