1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.executor;
20
21 import java.io.IOException;
22 import java.io.Writer;
23 import java.lang.management.ThreadInfo;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
39 import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
40
41 import com.google.common.collect.Lists;
42 import com.google.common.collect.Maps;
43 import com.google.common.util.concurrent.ThreadFactoryBuilder;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 public class ExecutorService {
62 private static final Log LOG = LogFactory.getLog(ExecutorService.class);
63
64
65 private final ConcurrentHashMap<String, Executor> executorMap =
66 new ConcurrentHashMap<String, Executor>();
67
68
69 private ConcurrentHashMap<EventType, EventHandlerListener> eventHandlerListeners =
70 new ConcurrentHashMap<EventType, EventHandlerListener>();
71
72
73 private final String servername;
74
75
76
77
78
79 public ExecutorService(final String servername) {
80 super();
81 this.servername = servername;
82 }
83
84
85
86
87
88
89 void startExecutorService(String name, int maxThreads) {
90 if (this.executorMap.get(name) != null) {
91 throw new RuntimeException("An executor service with the name " + name +
92 " is already running!");
93 }
94 Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
95 if (this.executorMap.putIfAbsent(name, hbes) != null) {
96 throw new RuntimeException("An executor service with the name " + name +
97 " is already running (2)!");
98 }
99 LOG.debug("Starting executor service name=" + name +
100 ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
101 ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
102 }
103
104 boolean isExecutorServiceRunning(String name) {
105 return this.executorMap.containsKey(name);
106 }
107
108 public void shutdown() {
109 for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
110 List<Runnable> wasRunning =
111 entry.getValue().threadPoolExecutor.shutdownNow();
112 if (!wasRunning.isEmpty()) {
113 LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
114 }
115 }
116 this.executorMap.clear();
117 }
118
119 Executor getExecutor(final ExecutorType type) {
120 return getExecutor(type.getExecutorName(this.servername));
121 }
122
123 Executor getExecutor(String name) {
124 Executor executor = this.executorMap.get(name);
125 return executor;
126 }
127
128
129 public void startExecutorService(final ExecutorType type, final int maxThreads) {
130 String name = type.getExecutorName(this.servername);
131 if (isExecutorServiceRunning(name)) {
132 LOG.debug("Executor service " + toString() + " already running on " +
133 this.servername);
134 return;
135 }
136 startExecutorService(name, maxThreads);
137 }
138
139 public void submit(final EventHandler eh) {
140 Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
141 if (executor == null) {
142
143
144
145 LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
146 " Is this process shutting down?");
147 } else {
148 executor.submit(eh);
149 }
150 }
151
152
153
154
155
156
157
158
159 public void registerListener(final EventType type,
160 final EventHandlerListener listener) {
161 this.eventHandlerListeners.put(type, listener);
162 }
163
164
165
166
167
168
169
170 public EventHandlerListener unregisterListener(final EventType type) {
171 return this.eventHandlerListeners.remove(type);
172 }
173
174 public Map<String, ExecutorStatus> getAllExecutorStatuses() {
175 Map<String, ExecutorStatus> ret = Maps.newHashMap();
176 for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
177 ret.put(e.getKey(), e.getValue().getStatus());
178 }
179 return ret;
180 }
181
182
183
184
185 static class Executor {
186
187 static final long keepAliveTimeInMillis = 1000;
188
189 final TrackingThreadPoolExecutor threadPoolExecutor;
190
191 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
192 private final String name;
193 private final Map<EventType, EventHandlerListener> eventHandlerListeners;
194 private static final AtomicLong seqids = new AtomicLong(0);
195 private final long id;
196
197 protected Executor(String name, int maxThreads,
198 final Map<EventType, EventHandlerListener> eventHandlerListeners) {
199 this.id = seqids.incrementAndGet();
200 this.name = name;
201 this.eventHandlerListeners = eventHandlerListeners;
202
203 this.threadPoolExecutor = new TrackingThreadPoolExecutor(
204 maxThreads, maxThreads,
205 keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
206
207 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
208 tfb.setNameFormat(this.name + "-%d");
209 this.threadPoolExecutor.setThreadFactory(tfb.build());
210 }
211
212
213
214
215
216 void submit(final EventHandler event) {
217
218
219 EventHandlerListener listener =
220 this.eventHandlerListeners.get(event.getEventType());
221 if (listener != null) {
222 event.setListener(listener);
223 }
224 this.threadPoolExecutor.execute(event);
225 }
226
227 public String toString() {
228 return getClass().getSimpleName() + "-" + id + "-" + name;
229 }
230
231 public ExecutorStatus getStatus() {
232 List<EventHandler> queuedEvents = Lists.newArrayList();
233 for (Runnable r : q) {
234 if (!(r instanceof EventHandler)) {
235 LOG.warn("Non-EventHandler " + r + " queued in " + name);
236 continue;
237 }
238 queuedEvents.add((EventHandler)r);
239 }
240
241 List<RunningEventStatus> running = Lists.newArrayList();
242 for (Map.Entry<Thread, Runnable> e :
243 threadPoolExecutor.getRunningTasks().entrySet()) {
244 Runnable r = e.getValue();
245 if (!(r instanceof EventHandler)) {
246 LOG.warn("Non-EventHandler " + r + " running in " + name);
247 continue;
248 }
249 running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
250 }
251
252 return new ExecutorStatus(this, queuedEvents, running);
253 }
254 }
255
256
257
258
259
260 static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
261 private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
262
263 public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
264 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
265 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
266 }
267
268 @Override
269 protected void afterExecute(Runnable r, Throwable t) {
270 super.afterExecute(r, t);
271 running.remove(Thread.currentThread());
272 }
273
274 @Override
275 protected void beforeExecute(Thread t, Runnable r) {
276 Runnable oldPut = running.put(t, r);
277 assert oldPut == null : "inconsistency for thread " + t;
278 super.beforeExecute(t, r);
279 }
280
281
282
283
284
285
286
287 public ConcurrentMap<Thread, Runnable> getRunningTasks() {
288 return running;
289 }
290 }
291
292
293
294
295
296
297
298
299 public static class ExecutorStatus {
300 final Executor executor;
301 final List<EventHandler> queuedEvents;
302 final List<RunningEventStatus> running;
303
304 ExecutorStatus(Executor executor,
305 List<EventHandler> queuedEvents,
306 List<RunningEventStatus> running) {
307 this.executor = executor;
308 this.queuedEvents = queuedEvents;
309 this.running = running;
310 }
311
312
313
314
315
316
317
318
319 public void dumpTo(Writer out, String indent) throws IOException {
320 out.write(indent + "Status for executor: " + executor + "\n");
321 out.write(indent + "=======================================\n");
322 out.write(indent + queuedEvents.size() + " events queued, " +
323 running.size() + " running\n");
324 if (!queuedEvents.isEmpty()) {
325 out.write(indent + "Queued:\n");
326 for (EventHandler e : queuedEvents) {
327 out.write(indent + " " + e + "\n");
328 }
329 out.write("\n");
330 }
331 if (!running.isEmpty()) {
332 out.write(indent + "Running:\n");
333 for (RunningEventStatus stat : running) {
334 out.write(indent + " Running on thread '" +
335 stat.threadInfo.getThreadName() +
336 "': " + stat.event + "\n");
337 out.write(ThreadMonitoring.formatThreadInfo(
338 stat.threadInfo, indent + " "));
339 out.write("\n");
340 }
341 }
342 out.flush();
343 }
344 }
345
346
347
348
349
350 public static class RunningEventStatus {
351 final ThreadInfo threadInfo;
352 final EventHandler event;
353
354 public RunningEventStatus(Thread t, EventHandler event) {
355 this.threadInfo = ThreadMonitoring.getThreadInfo(t);
356 this.event = event;
357 }
358 }
359 }