001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.executor; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.lang.management.ThreadInfo; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.Executors; 031import java.util.concurrent.LinkedBlockingQueue; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035 036import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 042import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 043import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService; 046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; 047import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 048 049/** 050 * This is a generic executor service. This component abstracts a 051 * threadpool, a queue to which {@link EventType}s can be submitted, 052 * and a <code>Runnable</code> that handles the object that is added to the queue. 053 * 054 * <p>In order to create a new service, create an instance of this class and 055 * then do: <code>instance.startExecutorService("myService");</code>. When done 056 * call {@link #shutdown()}. 057 * 058 * <p>In order to use the service created above, call 059 * {@link #submit(EventHandler)}. 060 */ 061@InterfaceAudience.Private 062public class ExecutorService { 063 private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class); 064 065 // hold the all the executors created in a map addressable by their names 066 private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>(); 067 068 // Name of the server hosting this executor service. 069 private final String servername; 070 071 private final ListeningScheduledExecutorService delayedSubmitTimer = 072 MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 073 .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build())); 074 075 /** 076 * Default constructor. 077 * @param servername Name of the hosting server. 078 */ 079 public ExecutorService(final String servername) { 080 this.servername = servername; 081 } 082 083 /** 084 * Start an executor service with a given name. If there was a service already 085 * started with the same name, this throws a RuntimeException. 086 * @param name Name of the service to start. 087 */ 088 @VisibleForTesting 089 public void startExecutorService(String name, int maxThreads) { 090 if (this.executorMap.get(name) != null) { 091 throw new RuntimeException("An executor service with the name " + name + 092 " is already running!"); 093 } 094 Executor hbes = new Executor(name, maxThreads); 095 if (this.executorMap.putIfAbsent(name, hbes) != null) { 096 throw new RuntimeException("An executor service with the name " + name + 097 " is already running (2)!"); 098 } 099 LOG.debug("Starting executor service name=" + name + 100 ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() + 101 ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize()); 102 } 103 104 boolean isExecutorServiceRunning(String name) { 105 return this.executorMap.containsKey(name); 106 } 107 108 public void shutdown() { 109 this.delayedSubmitTimer.shutdownNow(); 110 for(Entry<String, Executor> entry: this.executorMap.entrySet()) { 111 List<Runnable> wasRunning = 112 entry.getValue().threadPoolExecutor.shutdownNow(); 113 if (!wasRunning.isEmpty()) { 114 LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown"); 115 } 116 } 117 this.executorMap.clear(); 118 } 119 120 Executor getExecutor(final ExecutorType type) { 121 return getExecutor(type.getExecutorName(this.servername)); 122 } 123 124 Executor getExecutor(String name) { 125 Executor executor = this.executorMap.get(name); 126 return executor; 127 } 128 129 @VisibleForTesting 130 public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) { 131 return getExecutor(type).getThreadPoolExecutor(); 132 } 133 134 public void startExecutorService(final ExecutorType type, final int maxThreads) { 135 String name = type.getExecutorName(this.servername); 136 if (isExecutorServiceRunning(name)) { 137 LOG.debug("Executor service " + toString() + " already running on " + this.servername); 138 return; 139 } 140 startExecutorService(name, maxThreads); 141 } 142 143 /** 144 * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all 145 * paths should use this method to get the executor, should not start executor by using 146 * {@link ExecutorService#startExecutorService(ExecutorType, int)} 147 */ 148 public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) { 149 String name = type.getExecutorName(this.servername); 150 return executorMap 151 .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads)) 152 .getThreadPoolExecutor(); 153 } 154 155 public void submit(final EventHandler eh) { 156 Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); 157 if (executor == null) { 158 // This happens only when events are submitted after shutdown() was 159 // called, so dropping them should be "ok" since it means we're 160 // shutting down. 161 LOG.error("Cannot submit [" + eh + "] because the executor is missing." + 162 " Is this process shutting down?"); 163 } else { 164 executor.submit(eh); 165 } 166 } 167 168 // Submit the handler after the given delay. Used for retrying. 169 public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) { 170 ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit); 171 future.addListener(() -> { 172 try { 173 future.get(); 174 } catch (Exception e) { 175 LOG.error("Failed to submit the event handler {} to executor", eh, e); 176 } 177 }, MoreExecutors.directExecutor()); 178 } 179 180 public Map<String, ExecutorStatus> getAllExecutorStatuses() { 181 Map<String, ExecutorStatus> ret = Maps.newHashMap(); 182 for (Map.Entry<String, Executor> e : executorMap.entrySet()) { 183 ret.put(e.getKey(), e.getValue().getStatus()); 184 } 185 return ret; 186 } 187 188 /** 189 * Executor instance. 190 */ 191 static class Executor { 192 // how long to retain excess threads 193 static final long keepAliveTimeInMillis = 1000; 194 // the thread pool executor that services the requests 195 final TrackingThreadPoolExecutor threadPoolExecutor; 196 // work queue to use - unbounded queue 197 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>(); 198 private final String name; 199 private static final AtomicLong seqids = new AtomicLong(0); 200 private final long id; 201 202 protected Executor(String name, int maxThreads) { 203 this.id = seqids.incrementAndGet(); 204 this.name = name; 205 // create the thread pool executor 206 this.threadPoolExecutor = new TrackingThreadPoolExecutor( 207 maxThreads, maxThreads, 208 keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q); 209 // name the threads for this threadpool 210 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 211 tfb.setNameFormat(this.name + "-%d"); 212 tfb.setDaemon(true); 213 this.threadPoolExecutor.setThreadFactory(tfb.build()); 214 } 215 216 /** 217 * Submit the event to the queue for handling. 218 * @param event 219 */ 220 void submit(final EventHandler event) { 221 // If there is a listener for this type, make sure we call the before 222 // and after process methods. 223 this.threadPoolExecutor.execute(event); 224 } 225 226 TrackingThreadPoolExecutor getThreadPoolExecutor() { 227 return threadPoolExecutor; 228 } 229 230 @Override 231 public String toString() { 232 return getClass().getSimpleName() + "-" + id + "-" + name; 233 } 234 235 public ExecutorStatus getStatus() { 236 List<EventHandler> queuedEvents = Lists.newArrayList(); 237 for (Runnable r : q) { 238 if (!(r instanceof EventHandler)) { 239 LOG.warn("Non-EventHandler " + r + " queued in " + name); 240 continue; 241 } 242 queuedEvents.add((EventHandler)r); 243 } 244 245 List<RunningEventStatus> running = Lists.newArrayList(); 246 for (Map.Entry<Thread, Runnable> e : 247 threadPoolExecutor.getRunningTasks().entrySet()) { 248 Runnable r = e.getValue(); 249 if (!(r instanceof EventHandler)) { 250 LOG.warn("Non-EventHandler " + r + " running in " + name); 251 continue; 252 } 253 running.add(new RunningEventStatus(e.getKey(), (EventHandler)r)); 254 } 255 256 return new ExecutorStatus(this, queuedEvents, running); 257 } 258 } 259 260 /** 261 * A subclass of ThreadPoolExecutor that keeps track of the Runnables that 262 * are executing at any given point in time. 263 */ 264 static class TrackingThreadPoolExecutor extends ThreadPoolExecutor { 265 private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 266 267 public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 268 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 269 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 270 } 271 272 @Override 273 protected void afterExecute(Runnable r, Throwable t) { 274 super.afterExecute(r, t); 275 running.remove(Thread.currentThread()); 276 } 277 278 @Override 279 protected void beforeExecute(Thread t, Runnable r) { 280 Runnable oldPut = running.put(t, r); 281 assert oldPut == null : "inconsistency for thread " + t; 282 super.beforeExecute(t, r); 283 } 284 285 /** 286 * @return a map of the threads currently running tasks 287 * inside this executor. Each key is an active thread, 288 * and the value is the task that is currently running. 289 * Note that this is not a stable snapshot of the map. 290 */ 291 public ConcurrentMap<Thread, Runnable> getRunningTasks() { 292 return running; 293 } 294 } 295 296 /** 297 * A snapshot of the status of a particular executor. This includes 298 * the contents of the executor's pending queue, as well as the 299 * threads and events currently being processed. 300 * 301 * This is a consistent snapshot that is immutable once constructed. 302 */ 303 public static class ExecutorStatus { 304 final Executor executor; 305 final List<EventHandler> queuedEvents; 306 final List<RunningEventStatus> running; 307 308 ExecutorStatus(Executor executor, 309 List<EventHandler> queuedEvents, 310 List<RunningEventStatus> running) { 311 this.executor = executor; 312 this.queuedEvents = queuedEvents; 313 this.running = running; 314 } 315 316 /** 317 * Dump a textual representation of the executor's status 318 * to the given writer. 319 * 320 * @param out the stream to write to 321 * @param indent a string prefix for each line, used for indentation 322 */ 323 public void dumpTo(Writer out, String indent) throws IOException { 324 out.write(indent + "Status for executor: " + executor + "\n"); 325 out.write(indent + "=======================================\n"); 326 out.write(indent + queuedEvents.size() + " events queued, " + 327 running.size() + " running\n"); 328 if (!queuedEvents.isEmpty()) { 329 out.write(indent + "Queued:\n"); 330 for (EventHandler e : queuedEvents) { 331 out.write(indent + " " + e + "\n"); 332 } 333 out.write("\n"); 334 } 335 if (!running.isEmpty()) { 336 out.write(indent + "Running:\n"); 337 for (RunningEventStatus stat : running) { 338 out.write(indent + " Running on thread '" + 339 stat.threadInfo.getThreadName() + 340 "': " + stat.event + "\n"); 341 out.write(ThreadMonitoring.formatThreadInfo( 342 stat.threadInfo, indent + " ")); 343 out.write("\n"); 344 } 345 } 346 out.flush(); 347 } 348 } 349 350 /** 351 * The status of a particular event that is in the middle of being 352 * handled by an executor. 353 */ 354 public static class RunningEventStatus { 355 final ThreadInfo threadInfo; 356 final EventHandler event; 357 358 public RunningEventStatus(Thread t, EventHandler event) { 359 this.threadInfo = ThreadMonitoring.getThreadInfo(t); 360 this.event = event; 361 } 362 } 363}