001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.executor; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.lang.management.ThreadInfo; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.Executors; 031import java.util.concurrent.LinkedBlockingQueue; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 041import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService; 044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 046 047/** 048 * This is a generic executor service. This component abstracts a 049 * threadpool, a queue to which {@link EventType}s can be submitted, 050 * and a <code>Runnable</code> that handles the object that is added to the queue. 051 * 052 * <p>In order to create a new service, create an instance of this class and 053 * then do: <code>instance.startExecutorService("myService");</code>. When done 054 * call {@link #shutdown()}. 055 * 056 * <p>In order to use the service created above, call 057 * {@link #submit(EventHandler)}. 058 */ 059@InterfaceAudience.Private 060public class ExecutorService { 061 private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class); 062 063 // hold the all the executors created in a map addressable by their names 064 private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>(); 065 066 // Name of the server hosting this executor service. 067 private final String servername; 068 069 private final ListeningScheduledExecutorService delayedSubmitTimer = 070 MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 071 .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build())); 072 073 /** 074 * Default constructor. 075 * @param servername Name of the hosting server. 076 */ 077 public ExecutorService(final String servername) { 078 this.servername = servername; 079 } 080 081 /** 082 * Start an executor service with a given name. If there was a service already 083 * started with the same name, this throws a RuntimeException. 084 * @param name Name of the service to start. 085 */ 086 public void startExecutorService(String name, int maxThreads) { 087 if (this.executorMap.get(name) != null) { 088 throw new RuntimeException("An executor service with the name " + name + 089 " is already running!"); 090 } 091 Executor hbes = new Executor(name, maxThreads); 092 if (this.executorMap.putIfAbsent(name, hbes) != null) { 093 throw new RuntimeException("An executor service with the name " + name + 094 " is already running (2)!"); 095 } 096 LOG.debug("Starting executor service name=" + name + 097 ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() + 098 ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize()); 099 } 100 101 boolean isExecutorServiceRunning(String name) { 102 return this.executorMap.containsKey(name); 103 } 104 105 public void shutdown() { 106 this.delayedSubmitTimer.shutdownNow(); 107 for(Entry<String, Executor> entry: this.executorMap.entrySet()) { 108 List<Runnable> wasRunning = 109 entry.getValue().threadPoolExecutor.shutdownNow(); 110 if (!wasRunning.isEmpty()) { 111 LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown"); 112 } 113 } 114 this.executorMap.clear(); 115 } 116 117 Executor getExecutor(final ExecutorType type) { 118 return getExecutor(type.getExecutorName(this.servername)); 119 } 120 121 Executor getExecutor(String name) { 122 Executor executor = this.executorMap.get(name); 123 return executor; 124 } 125 126 public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) { 127 return getExecutor(type).getThreadPoolExecutor(); 128 } 129 130 public void startExecutorService(final ExecutorType type, final int maxThreads) { 131 String name = type.getExecutorName(this.servername); 132 if (isExecutorServiceRunning(name)) { 133 LOG.debug("Executor service " + toString() + " already running on " + this.servername); 134 return; 135 } 136 startExecutorService(name, maxThreads); 137 } 138 139 /** 140 * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all 141 * paths should use this method to get the executor, should not start executor by using 142 * {@link ExecutorService#startExecutorService(ExecutorType, int)} 143 */ 144 public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) { 145 String name = type.getExecutorName(this.servername); 146 return executorMap 147 .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads)) 148 .getThreadPoolExecutor(); 149 } 150 151 public void submit(final EventHandler eh) { 152 Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); 153 if (executor == null) { 154 // This happens only when events are submitted after shutdown() was 155 // called, so dropping them should be "ok" since it means we're 156 // shutting down. 157 LOG.error("Cannot submit [" + eh + "] because the executor is missing." + 158 " Is this process shutting down?"); 159 } else { 160 executor.submit(eh); 161 } 162 } 163 164 // Submit the handler after the given delay. Used for retrying. 165 public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) { 166 ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit); 167 future.addListener(() -> { 168 try { 169 future.get(); 170 } catch (Exception e) { 171 LOG.error("Failed to submit the event handler {} to executor", eh, e); 172 } 173 }, MoreExecutors.directExecutor()); 174 } 175 176 public Map<String, ExecutorStatus> getAllExecutorStatuses() { 177 Map<String, ExecutorStatus> ret = Maps.newHashMap(); 178 for (Map.Entry<String, Executor> e : executorMap.entrySet()) { 179 ret.put(e.getKey(), e.getValue().getStatus()); 180 } 181 return ret; 182 } 183 184 /** 185 * Executor instance. 186 */ 187 static class Executor { 188 // how long to retain excess threads 189 static final long keepAliveTimeInMillis = 1000; 190 // the thread pool executor that services the requests 191 final TrackingThreadPoolExecutor threadPoolExecutor; 192 // work queue to use - unbounded queue 193 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>(); 194 private final String name; 195 private static final AtomicLong seqids = new AtomicLong(0); 196 private final long id; 197 198 protected Executor(String name, int maxThreads) { 199 this.id = seqids.incrementAndGet(); 200 this.name = name; 201 // create the thread pool executor 202 this.threadPoolExecutor = new TrackingThreadPoolExecutor( 203 maxThreads, maxThreads, 204 keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q); 205 // name the threads for this threadpool 206 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 207 tfb.setNameFormat(this.name + "-%d"); 208 tfb.setDaemon(true); 209 this.threadPoolExecutor.setThreadFactory(tfb.build()); 210 } 211 212 /** 213 * Submit the event to the queue for handling. 214 * @param event 215 */ 216 void submit(final EventHandler event) { 217 // If there is a listener for this type, make sure we call the before 218 // and after process methods. 219 this.threadPoolExecutor.execute(event); 220 } 221 222 TrackingThreadPoolExecutor getThreadPoolExecutor() { 223 return threadPoolExecutor; 224 } 225 226 @Override 227 public String toString() { 228 return getClass().getSimpleName() + "-" + id + "-" + name; 229 } 230 231 public ExecutorStatus getStatus() { 232 List<EventHandler> queuedEvents = Lists.newArrayList(); 233 for (Runnable r : q) { 234 if (!(r instanceof EventHandler)) { 235 LOG.warn("Non-EventHandler " + r + " queued in " + name); 236 continue; 237 } 238 queuedEvents.add((EventHandler)r); 239 } 240 241 List<RunningEventStatus> running = Lists.newArrayList(); 242 for (Map.Entry<Thread, Runnable> e : 243 threadPoolExecutor.getRunningTasks().entrySet()) { 244 Runnable r = e.getValue(); 245 if (!(r instanceof EventHandler)) { 246 LOG.warn("Non-EventHandler " + r + " running in " + name); 247 continue; 248 } 249 running.add(new RunningEventStatus(e.getKey(), (EventHandler)r)); 250 } 251 252 return new ExecutorStatus(this, queuedEvents, running); 253 } 254 } 255 256 /** 257 * A subclass of ThreadPoolExecutor that keeps track of the Runnables that 258 * are executing at any given point in time. 259 */ 260 static class TrackingThreadPoolExecutor extends ThreadPoolExecutor { 261 private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 262 263 public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 264 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 265 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 266 } 267 268 @Override 269 protected void afterExecute(Runnable r, Throwable t) { 270 super.afterExecute(r, t); 271 running.remove(Thread.currentThread()); 272 } 273 274 @Override 275 protected void beforeExecute(Thread t, Runnable r) { 276 Runnable oldPut = running.put(t, r); 277 assert oldPut == null : "inconsistency for thread " + t; 278 super.beforeExecute(t, r); 279 } 280 281 /** 282 * @return a map of the threads currently running tasks 283 * inside this executor. Each key is an active thread, 284 * and the value is the task that is currently running. 285 * Note that this is not a stable snapshot of the map. 286 */ 287 public ConcurrentMap<Thread, Runnable> getRunningTasks() { 288 return running; 289 } 290 } 291 292 /** 293 * A snapshot of the status of a particular executor. This includes 294 * the contents of the executor's pending queue, as well as the 295 * threads and events currently being processed. 296 * 297 * This is a consistent snapshot that is immutable once constructed. 298 */ 299 public static class ExecutorStatus { 300 final Executor executor; 301 final List<EventHandler> queuedEvents; 302 final List<RunningEventStatus> running; 303 304 ExecutorStatus(Executor executor, 305 List<EventHandler> queuedEvents, 306 List<RunningEventStatus> running) { 307 this.executor = executor; 308 this.queuedEvents = queuedEvents; 309 this.running = running; 310 } 311 312 /** 313 * Dump a textual representation of the executor's status 314 * to the given writer. 315 * 316 * @param out the stream to write to 317 * @param indent a string prefix for each line, used for indentation 318 */ 319 public void dumpTo(Writer out, String indent) throws IOException { 320 out.write(indent + "Status for executor: " + executor + "\n"); 321 out.write(indent + "=======================================\n"); 322 out.write(indent + queuedEvents.size() + " events queued, " + 323 running.size() + " running\n"); 324 if (!queuedEvents.isEmpty()) { 325 out.write(indent + "Queued:\n"); 326 for (EventHandler e : queuedEvents) { 327 out.write(indent + " " + e + "\n"); 328 } 329 out.write("\n"); 330 } 331 if (!running.isEmpty()) { 332 out.write(indent + "Running:\n"); 333 for (RunningEventStatus stat : running) { 334 out.write(indent + " Running on thread '" + 335 stat.threadInfo.getThreadName() + 336 "': " + stat.event + "\n"); 337 out.write(ThreadMonitoring.formatThreadInfo( 338 stat.threadInfo, indent + " ")); 339 out.write("\n"); 340 } 341 } 342 out.flush(); 343 } 344 } 345 346 /** 347 * The status of a particular event that is in the middle of being 348 * handled by an executor. 349 */ 350 public static class RunningEventStatus { 351 final ThreadInfo threadInfo; 352 final EventHandler event; 353 354 public RunningEventStatus(Thread t, EventHandler event) { 355 this.threadInfo = ThreadMonitoring.getThreadInfo(t); 356 this.event = event; 357 } 358 } 359}