View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.executor;
21  
22  import java.io.IOException;
23  import java.io.PrintWriter;
24  import java.io.Writer;
25  import java.lang.management.ThreadInfo;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.concurrent.BlockingQueue;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentMap;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.RejectedExecutionHandler;
34  import java.util.concurrent.ThreadFactory;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
42  import org.apache.hadoop.hbase.executor.EventHandler.EventType;
43  import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
44  
45  import com.google.common.collect.Lists;
46  import com.google.common.collect.Maps;
47  import com.google.common.util.concurrent.ThreadFactoryBuilder;
48  
49  /**
50   * This is a generic executor service. This component abstracts a
51   * threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
52   * and a <code>Runnable</code> that handles the object that is added to the queue.
53   *
54   * <p>In order to create a new service, create an instance of this class and
55   * then do: <code>instance.startExecutorService("myService");</code>.  When done
56   * call {@link #shutdown()}.
57   *
58   * <p>In order to use the service created above, call
59   * {@link #submit(EventHandler)}. Register pre- and post- processing listeners
60   * by registering your implementation of {@link EventHandler.EventHandlerListener}
61   * with {@link #registerListener(EventHandler.EventType, EventHandler.EventHandlerListener)}.  Be sure
62   * to deregister your listener when done via {@link #unregisterListener(EventHandler.EventType)}.
63   */
64  public class ExecutorService {
65    private static final Log LOG = LogFactory.getLog(ExecutorService.class);
66  
67    // hold the all the executors created in a map addressable by their names
68    private final ConcurrentHashMap<String, Executor> executorMap =
69      new ConcurrentHashMap<String, Executor>();
70  
71    // listeners that are called before and after an event is processed
72    private ConcurrentHashMap<EventHandler.EventType, EventHandlerListener> eventHandlerListeners =
73      new ConcurrentHashMap<EventHandler.EventType, EventHandlerListener>();
74  
75    // Name of the server hosting this executor service.
76    private final String servername;
77  
78    /**
79     * The following is a list of all executor types, both those that run in the
80     * master and those that run in the regionserver.
81     */
82    public enum ExecutorType {
83  
84      // Master executor services
85      MASTER_CLOSE_REGION        (1),
86      MASTER_OPEN_REGION         (2),
87      MASTER_SERVER_OPERATIONS   (3),
88      MASTER_TABLE_OPERATIONS    (4),
89      MASTER_RS_SHUTDOWN         (5),
90      MASTER_META_SERVER_OPERATIONS (6),
91  
92      // RegionServer executor services
93      RS_OPEN_REGION             (20),
94      RS_OPEN_ROOT               (21),
95      RS_OPEN_META               (22),
96      RS_CLOSE_REGION            (23),
97      RS_CLOSE_ROOT              (24),
98      RS_CLOSE_META              (25);
99  
100     ExecutorType(int value) {}
101 
102     /**
103      * @param serverName
104      * @return Conflation of the executor type and the passed servername.
105      */
106     String getExecutorName(String serverName) {
107       return this.toString() + "-" + serverName.replace("%", "%%");
108     }
109   }
110 
111   /**
112    * Returns the executor service type (the thread pool instance) for the
113    * passed event handler type.
114    * @param type EventHandler type.
115    */
116   public ExecutorType getExecutorServiceType(final EventHandler.EventType type) {
117     switch(type) {
118       // Master executor services
119 
120       case RS_ZK_REGION_CLOSED:
121       case RS_ZK_REGION_FAILED_OPEN:
122         return ExecutorType.MASTER_CLOSE_REGION;
123 
124       case RS_ZK_REGION_OPENED:
125         return ExecutorType.MASTER_OPEN_REGION;
126 
127       case RS_ZK_REGION_SPLIT:
128       case M_SERVER_SHUTDOWN:
129         return ExecutorType.MASTER_SERVER_OPERATIONS;
130 
131       case M_META_SERVER_SHUTDOWN:
132         return ExecutorType.MASTER_META_SERVER_OPERATIONS;
133 
134       case C_M_DELETE_TABLE:
135       case C_M_DISABLE_TABLE:
136       case C_M_ENABLE_TABLE:
137       case C_M_MODIFY_TABLE:
138       case C_M_CREATE_TABLE:
139       case C_M_SNAPSHOT_TABLE:
140       case C_M_RESTORE_SNAPSHOT:
141         return ExecutorType.MASTER_TABLE_OPERATIONS;
142 
143       // RegionServer executor services
144 
145       case M_RS_OPEN_REGION:
146         return ExecutorType.RS_OPEN_REGION;
147 
148       case M_RS_OPEN_ROOT:
149         return ExecutorType.RS_OPEN_ROOT;
150 
151       case M_RS_OPEN_META:
152         return ExecutorType.RS_OPEN_META;
153 
154       case M_RS_CLOSE_REGION:
155         return ExecutorType.RS_CLOSE_REGION;
156 
157       case M_RS_CLOSE_ROOT:
158         return ExecutorType.RS_CLOSE_ROOT;
159 
160       case M_RS_CLOSE_META:
161         return ExecutorType.RS_CLOSE_META;
162 
163       default:
164         throw new RuntimeException("Unhandled event type " + type);
165     }
166   }
167 
168   /**
169    * Default constructor.
170    * @param servername Name of the hosting server.
171    */
172   public ExecutorService(final String servername) {
173     super();
174     this.servername = servername;
175   }
176 
177   /**
178    * Start an executor service with a given name. If there was a service already
179    * started with the same name, this throws a RuntimeException.
180    * @param name Name of the service to start.
181    */
182   void startExecutorService(String name, int maxThreads) {
183     if (this.executorMap.get(name) != null) {
184       throw new RuntimeException("An executor service with the name " + name +
185         " is already running!");
186     }
187     Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
188     if (this.executorMap.putIfAbsent(name, hbes) != null) {
189       throw new RuntimeException("An executor service with the name " + name +
190       " is already running (2)!");
191     }
192     LOG.debug("Starting executor service name=" + name +
193       ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
194       ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
195   }
196 
197   boolean isExecutorServiceRunning(String name) {
198     return this.executorMap.containsKey(name);
199   }
200 
201   public void shutdown() {
202     for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
203       List<Runnable> wasRunning =
204         entry.getValue().threadPoolExecutor.shutdownNow();
205       if (!wasRunning.isEmpty()) {
206         LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
207       }
208     }
209     this.executorMap.clear();
210   }
211 
212   Executor getExecutor(final ExecutorType type) {
213     return getExecutor(type.getExecutorName(this.servername));
214   }
215 
216   Executor getExecutor(String name) {
217     Executor executor = this.executorMap.get(name);
218     return executor;
219   }
220 
221 
222   public void startExecutorService(final ExecutorType type, final int maxThreads) {
223     String name = type.getExecutorName(this.servername);
224     if (isExecutorServiceRunning(name)) {
225       LOG.debug("Executor service " + toString() + " already running on " +
226         this.servername);
227       return;
228     }
229     startExecutorService(name, maxThreads);
230   }
231 
232   public void submit(final EventHandler eh) {
233     Executor executor = getExecutor(getExecutorServiceType(eh.getEventType()));
234     if (executor == null) {
235       // This happens only when events are submitted after shutdown() was
236       // called, so dropping them should be "ok" since it means we're
237       // shutting down.
238       LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
239         " Is this process shutting down?");
240     } else {
241       executor.submit(eh);
242     }
243   }
244 
245   /**
246    * Subscribe to updates before and after processing instances of
247    * {@link EventHandler.EventType}.  Currently only one listener per
248    * event type.
249    * @param type Type of event we're registering listener for
250    * @param listener The listener to run.
251    */
252   public void registerListener(final EventHandler.EventType type,
253       final EventHandlerListener listener) {
254     this.eventHandlerListeners.put(type, listener);
255   }
256 
257   /**
258    * Stop receiving updates before and after processing instances of
259    * {@link EventHandler.EventType}
260    * @param type Type of event we're registering listener for
261    * @return The listener we removed or null if we did not remove it.
262    */
263   public EventHandlerListener unregisterListener(final EventHandler.EventType type) {
264     return this.eventHandlerListeners.remove(type);
265   }
266 
267   public Map<String, ExecutorStatus> getAllExecutorStatuses() {
268     Map<String, ExecutorStatus> ret = Maps.newHashMap();
269     for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
270       ret.put(e.getKey(), e.getValue().getStatus());
271     }
272     return ret;
273   }
274   
275   /**
276    * Executor instance.
277    */
278   static class Executor {
279     // how long to retain excess threads
280     final long keepAliveTimeInMillis = 1000;
281     // the thread pool executor that services the requests
282     final TrackingThreadPoolExecutor threadPoolExecutor;
283     // work queue to use - unbounded queue
284     final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
285     private final String name;
286     private final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners;
287     private static final AtomicLong seqids = new AtomicLong(0);
288     private final long id;
289 
290     protected Executor(String name, int maxThreads,
291         final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners) {
292       this.id = seqids.incrementAndGet();
293       this.name = name;
294       this.eventHandlerListeners = eventHandlerListeners;
295       // create the thread pool executor
296       this.threadPoolExecutor = new TrackingThreadPoolExecutor(
297           maxThreads, maxThreads,
298           keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
299       // name the threads for this threadpool
300       ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
301       tfb.setNameFormat(this.name + "-%d");
302       this.threadPoolExecutor.setThreadFactory(tfb.build());
303     }
304 
305     /**
306      * Submit the event to the queue for handling.
307      * @param event
308      */
309     void submit(final EventHandler event) {
310       // If there is a listener for this type, make sure we call the before
311       // and after process methods.
312       EventHandlerListener listener =
313         this.eventHandlerListeners.get(event.getEventType());
314       if (listener != null) {
315         event.setListener(listener);
316       }
317       this.threadPoolExecutor.execute(event);
318     }
319     
320     public String toString() {
321       return getClass().getSimpleName() + "-" + id + "-" + name;
322     }
323 
324     public ExecutorStatus getStatus() {
325       List<EventHandler> queuedEvents = Lists.newArrayList();
326       for (Runnable r : q) {
327         if (!(r instanceof EventHandler)) {
328           LOG.warn("Non-EventHandler " + r + " queued in " + name);
329           continue;
330         }
331         queuedEvents.add((EventHandler)r);
332       }
333       
334       List<RunningEventStatus> running = Lists.newArrayList();
335       for (Map.Entry<Thread, Runnable> e :
336           threadPoolExecutor.getRunningTasks().entrySet()) {
337         Runnable r = e.getValue();
338         if (!(r instanceof EventHandler)) {
339           LOG.warn("Non-EventHandler " + r + " running in " + name);
340           continue;
341         }
342         running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
343       }
344       
345       return new ExecutorStatus(this, queuedEvents, running);
346     }
347   }
348  
349   /**
350    * A subclass of ThreadPoolExecutor that keeps track of the Runnables that
351    * are executing at any given point in time.
352    */
353   static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
354     private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 
355       
356     public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
357         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
358       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
359     }
360 
361     @Override
362     protected void afterExecute(Runnable r, Throwable t) {
363       super.afterExecute(r, t);
364       running.remove(Thread.currentThread());
365     }
366 
367     @Override
368     protected void beforeExecute(Thread t, Runnable r) {
369       Runnable oldPut = running.put(t, r);
370       assert oldPut == null : "inconsistency for thread " + t;
371       super.beforeExecute(t, r);
372     }
373    
374     /**
375      * @return a map of the threads currently running tasks
376      * inside this executor. Each key is an active thread,
377      * and the value is the task that is currently running.
378      * Note that this is not a stable snapshot of the map.
379      */
380     public ConcurrentMap<Thread, Runnable> getRunningTasks() {
381       return running;
382     }
383   }
384 
385   /**
386    * A snapshot of the status of a particular executor. This includes
387    * the contents of the executor's pending queue, as well as the
388    * threads and events currently being processed.
389    *
390    * This is a consistent snapshot that is immutable once constructed.
391    */
392   public static class ExecutorStatus {
393     final Executor executor;
394     final List<EventHandler> queuedEvents;
395     final List<RunningEventStatus> running;
396 
397     ExecutorStatus(Executor executor,
398         List<EventHandler> queuedEvents,
399         List<RunningEventStatus> running) {
400       this.executor = executor;
401       this.queuedEvents = queuedEvents;
402       this.running = running;
403     }
404    
405     /**
406      * Dump a textual representation of the executor's status
407      * to the given writer.
408      *
409      * @param out the stream to write to
410      * @param indent a string prefix for each line, used for indentation
411      */
412     public void dumpTo(Writer out, String indent) throws IOException {
413       out.write(indent + "Status for executor: " + executor + "\n");
414       out.write(indent + "=======================================\n");
415       out.write(indent + queuedEvents.size() + " events queued, " +
416           running.size() + " running\n");
417       if (!queuedEvents.isEmpty()) {
418         out.write(indent + "Queued:\n");
419         for (EventHandler e : queuedEvents) {
420           out.write(indent + "  " + e + "\n");
421         }
422         out.write("\n");
423       }
424       if (!running.isEmpty()) {
425         out.write(indent + "Running:\n");
426         for (RunningEventStatus stat : running) {
427           out.write(indent + "  Running on thread '" +
428               stat.threadInfo.getThreadName() +
429               "': " + stat.event + "\n");
430           out.write(ThreadMonitoring.formatThreadInfo(
431               stat.threadInfo, indent + "  "));
432           out.write("\n");
433         }
434       }
435       out.flush();
436     }
437   }
438 
439   /**
440    * The status of a particular event that is in the middle of being
441    * handled by an executor.
442    */
443   public static class RunningEventStatus {
444     final ThreadInfo threadInfo;
445     final EventHandler event;
446 
447     public RunningEventStatus(Thread t, EventHandler event) {
448       this.threadInfo = ThreadMonitoring.getThreadInfo(t);
449       this.event = event;
450     }
451   }
452 }