001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.executor; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.lang.management.ThreadInfo; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 041import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 042import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 044 045/** 046 * This is a generic executor service. This component abstracts a 047 * threadpool, a queue to which {@link EventType}s can be submitted, 048 * and a <code>Runnable</code> that handles the object that is added to the queue. 049 * 050 * <p>In order to create a new service, create an instance of this class and 051 * then do: <code>instance.startExecutorService("myService");</code>. When done 052 * call {@link #shutdown()}. 053 * 054 * <p>In order to use the service created above, call 055 * {@link #submit(EventHandler)}. 056 */ 057@InterfaceAudience.Private 058public class ExecutorService { 059 private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class); 060 061 // hold the all the executors created in a map addressable by their names 062 private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>(); 063 064 // Name of the server hosting this executor service. 065 private final String servername; 066 067 /** 068 * Default constructor. 069 * @param servername Name of the hosting server. 070 */ 071 public ExecutorService(final String servername) { 072 super(); 073 this.servername = servername; 074 } 075 076 /** 077 * Start an executor service with a given name. If there was a service already 078 * started with the same name, this throws a RuntimeException. 079 * @param name Name of the service to start. 080 */ 081 @VisibleForTesting 082 public void startExecutorService(String name, int maxThreads) { 083 if (this.executorMap.get(name) != null) { 084 throw new RuntimeException("An executor service with the name " + name + 085 " is already running!"); 086 } 087 Executor hbes = new Executor(name, maxThreads); 088 if (this.executorMap.putIfAbsent(name, hbes) != null) { 089 throw new RuntimeException("An executor service with the name " + name + 090 " is already running (2)!"); 091 } 092 LOG.debug("Starting executor service name=" + name + 093 ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() + 094 ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize()); 095 } 096 097 boolean isExecutorServiceRunning(String name) { 098 return this.executorMap.containsKey(name); 099 } 100 101 public void shutdown() { 102 for(Entry<String, Executor> entry: this.executorMap.entrySet()) { 103 List<Runnable> wasRunning = 104 entry.getValue().threadPoolExecutor.shutdownNow(); 105 if (!wasRunning.isEmpty()) { 106 LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown"); 107 } 108 } 109 this.executorMap.clear(); 110 } 111 112 Executor getExecutor(final ExecutorType type) { 113 return getExecutor(type.getExecutorName(this.servername)); 114 } 115 116 Executor getExecutor(String name) { 117 Executor executor = this.executorMap.get(name); 118 return executor; 119 } 120 121 @VisibleForTesting 122 public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) { 123 return getExecutor(type).getThreadPoolExecutor(); 124 } 125 126 public void startExecutorService(final ExecutorType type, final int maxThreads) { 127 String name = type.getExecutorName(this.servername); 128 if (isExecutorServiceRunning(name)) { 129 LOG.debug("Executor service " + toString() + " already running on " + this.servername); 130 return; 131 } 132 startExecutorService(name, maxThreads); 133 } 134 135 /** 136 * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all 137 * paths should use this method to get the executor, should not start executor by using 138 * {@link ExecutorService#startExecutorService(ExecutorType, int)} 139 */ 140 public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) { 141 String name = type.getExecutorName(this.servername); 142 return executorMap 143 .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads)) 144 .getThreadPoolExecutor(); 145 } 146 147 public void submit(final EventHandler eh) { 148 Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); 149 if (executor == null) { 150 // This happens only when events are submitted after shutdown() was 151 // called, so dropping them should be "ok" since it means we're 152 // shutting down. 153 LOG.error("Cannot submit [" + eh + "] because the executor is missing." + 154 " Is this process shutting down?"); 155 } else { 156 executor.submit(eh); 157 } 158 } 159 160 public Map<String, ExecutorStatus> getAllExecutorStatuses() { 161 Map<String, ExecutorStatus> ret = Maps.newHashMap(); 162 for (Map.Entry<String, Executor> e : executorMap.entrySet()) { 163 ret.put(e.getKey(), e.getValue().getStatus()); 164 } 165 return ret; 166 } 167 168 /** 169 * Executor instance. 170 */ 171 static class Executor { 172 // how long to retain excess threads 173 static final long keepAliveTimeInMillis = 1000; 174 // the thread pool executor that services the requests 175 final TrackingThreadPoolExecutor threadPoolExecutor; 176 // work queue to use - unbounded queue 177 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>(); 178 private final String name; 179 private static final AtomicLong seqids = new AtomicLong(0); 180 private final long id; 181 182 protected Executor(String name, int maxThreads) { 183 this.id = seqids.incrementAndGet(); 184 this.name = name; 185 // create the thread pool executor 186 this.threadPoolExecutor = new TrackingThreadPoolExecutor( 187 maxThreads, maxThreads, 188 keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q); 189 // name the threads for this threadpool 190 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 191 tfb.setNameFormat(this.name + "-%d"); 192 tfb.setDaemon(true); 193 this.threadPoolExecutor.setThreadFactory(tfb.build()); 194 } 195 196 /** 197 * Submit the event to the queue for handling. 198 * @param event 199 */ 200 void submit(final EventHandler event) { 201 // If there is a listener for this type, make sure we call the before 202 // and after process methods. 203 this.threadPoolExecutor.execute(event); 204 } 205 206 TrackingThreadPoolExecutor getThreadPoolExecutor() { 207 return threadPoolExecutor; 208 } 209 210 @Override 211 public String toString() { 212 return getClass().getSimpleName() + "-" + id + "-" + name; 213 } 214 215 public ExecutorStatus getStatus() { 216 List<EventHandler> queuedEvents = Lists.newArrayList(); 217 for (Runnable r : q) { 218 if (!(r instanceof EventHandler)) { 219 LOG.warn("Non-EventHandler " + r + " queued in " + name); 220 continue; 221 } 222 queuedEvents.add((EventHandler)r); 223 } 224 225 List<RunningEventStatus> running = Lists.newArrayList(); 226 for (Map.Entry<Thread, Runnable> e : 227 threadPoolExecutor.getRunningTasks().entrySet()) { 228 Runnable r = e.getValue(); 229 if (!(r instanceof EventHandler)) { 230 LOG.warn("Non-EventHandler " + r + " running in " + name); 231 continue; 232 } 233 running.add(new RunningEventStatus(e.getKey(), (EventHandler)r)); 234 } 235 236 return new ExecutorStatus(this, queuedEvents, running); 237 } 238 } 239 240 /** 241 * A subclass of ThreadPoolExecutor that keeps track of the Runnables that 242 * are executing at any given point in time. 243 */ 244 static class TrackingThreadPoolExecutor extends ThreadPoolExecutor { 245 private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 246 247 public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 248 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 249 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 250 } 251 252 @Override 253 protected void afterExecute(Runnable r, Throwable t) { 254 super.afterExecute(r, t); 255 running.remove(Thread.currentThread()); 256 } 257 258 @Override 259 protected void beforeExecute(Thread t, Runnable r) { 260 Runnable oldPut = running.put(t, r); 261 assert oldPut == null : "inconsistency for thread " + t; 262 super.beforeExecute(t, r); 263 } 264 265 /** 266 * @return a map of the threads currently running tasks 267 * inside this executor. Each key is an active thread, 268 * and the value is the task that is currently running. 269 * Note that this is not a stable snapshot of the map. 270 */ 271 public ConcurrentMap<Thread, Runnable> getRunningTasks() { 272 return running; 273 } 274 } 275 276 /** 277 * A snapshot of the status of a particular executor. This includes 278 * the contents of the executor's pending queue, as well as the 279 * threads and events currently being processed. 280 * 281 * This is a consistent snapshot that is immutable once constructed. 282 */ 283 public static class ExecutorStatus { 284 final Executor executor; 285 final List<EventHandler> queuedEvents; 286 final List<RunningEventStatus> running; 287 288 ExecutorStatus(Executor executor, 289 List<EventHandler> queuedEvents, 290 List<RunningEventStatus> running) { 291 this.executor = executor; 292 this.queuedEvents = queuedEvents; 293 this.running = running; 294 } 295 296 /** 297 * Dump a textual representation of the executor's status 298 * to the given writer. 299 * 300 * @param out the stream to write to 301 * @param indent a string prefix for each line, used for indentation 302 */ 303 public void dumpTo(Writer out, String indent) throws IOException { 304 out.write(indent + "Status for executor: " + executor + "\n"); 305 out.write(indent + "=======================================\n"); 306 out.write(indent + queuedEvents.size() + " events queued, " + 307 running.size() + " running\n"); 308 if (!queuedEvents.isEmpty()) { 309 out.write(indent + "Queued:\n"); 310 for (EventHandler e : queuedEvents) { 311 out.write(indent + " " + e + "\n"); 312 } 313 out.write("\n"); 314 } 315 if (!running.isEmpty()) { 316 out.write(indent + "Running:\n"); 317 for (RunningEventStatus stat : running) { 318 out.write(indent + " Running on thread '" + 319 stat.threadInfo.getThreadName() + 320 "': " + stat.event + "\n"); 321 out.write(ThreadMonitoring.formatThreadInfo( 322 stat.threadInfo, indent + " ")); 323 out.write("\n"); 324 } 325 } 326 out.flush(); 327 } 328 } 329 330 /** 331 * The status of a particular event that is in the middle of being 332 * handled by an executor. 333 */ 334 public static class RunningEventStatus { 335 final ThreadInfo threadInfo; 336 final EventHandler event; 337 338 public RunningEventStatus(Thread t, EventHandler event) { 339 this.threadInfo = ThreadMonitoring.getThreadInfo(t); 340 this.event = event; 341 } 342 } 343}