View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.executor;
20  
21  import java.io.IOException;
22  import java.io.Writer;
23  import java.lang.management.ThreadInfo;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
39  import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
40  
41  import com.google.common.collect.Lists;
42  import com.google.common.collect.Maps;
43  import com.google.common.util.concurrent.ThreadFactoryBuilder;
44  
45  /**
46   * This is a generic executor service. This component abstracts a
47   * threadpool, a queue to which {@link EventType}s can be submitted,
48   * and a <code>Runnable</code> that handles the object that is added to the queue.
49   *
50   * <p>In order to create a new service, create an instance of this class and
51   * then do: <code>instance.startExecutorService("myService");</code>.  When done
52   * call {@link #shutdown()}.
53   *
54   * <p>In order to use the service created above, call
55   * {@link #submit(EventHandler)}. Register pre- and post- processing listeners
56   * by registering your implementation of {@link EventHandler.EventHandlerListener}
57   * with {@link #registerListener(EventType, EventHandler.EventHandlerListener)}.  Be sure
58   * to deregister your listener when done via {@link #unregisterListener(EventType)}.
59   */
60  @InterfaceAudience.Private
61  public class ExecutorService {
62    private static final Log LOG = LogFactory.getLog(ExecutorService.class);
63  
64    // hold the all the executors created in a map addressable by their names
65    private final ConcurrentHashMap<String, Executor> executorMap =
66      new ConcurrentHashMap<String, Executor>();
67  
68    // listeners that are called before and after an event is processed
69    private ConcurrentHashMap<EventType, EventHandlerListener> eventHandlerListeners =
70      new ConcurrentHashMap<EventType, EventHandlerListener>();
71  
72    // Name of the server hosting this executor service.
73    private final String servername;
74  
75    /**
76     * Default constructor.
77     * @param servername Name of the hosting server.
78     */
79    public ExecutorService(final String servername) {
80      super();
81      this.servername = servername;
82    }
83  
84    /**
85     * Start an executor service with a given name. If there was a service already
86     * started with the same name, this throws a RuntimeException.
87     * @param name Name of the service to start.
88     */
89    void startExecutorService(String name, int maxThreads) {
90      if (this.executorMap.get(name) != null) {
91        throw new RuntimeException("An executor service with the name " + name +
92          " is already running!");
93      }
94      Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
95      if (this.executorMap.putIfAbsent(name, hbes) != null) {
96        throw new RuntimeException("An executor service with the name " + name +
97        " is already running (2)!");
98      }
99      LOG.debug("Starting executor service name=" + name +
100       ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
101       ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
102   }
103 
104   boolean isExecutorServiceRunning(String name) {
105     return this.executorMap.containsKey(name);
106   }
107 
108   public void shutdown() {
109     for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
110       List<Runnable> wasRunning =
111         entry.getValue().threadPoolExecutor.shutdownNow();
112       if (!wasRunning.isEmpty()) {
113         LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
114       }
115     }
116     this.executorMap.clear();
117   }
118 
119   Executor getExecutor(final ExecutorType type) {
120     return getExecutor(type.getExecutorName(this.servername));
121   }
122 
123   Executor getExecutor(String name) {
124     Executor executor = this.executorMap.get(name);
125     return executor;
126   }
127 
128 
129   public void startExecutorService(final ExecutorType type, final int maxThreads) {
130     String name = type.getExecutorName(this.servername);
131     if (isExecutorServiceRunning(name)) {
132       LOG.debug("Executor service " + toString() + " already running on " +
133         this.servername);
134       return;
135     }
136     startExecutorService(name, maxThreads);
137   }
138 
139   public void submit(final EventHandler eh) {
140     Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
141     if (executor == null) {
142       // This happens only when events are submitted after shutdown() was
143       // called, so dropping them should be "ok" since it means we're
144       // shutting down.
145       LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
146         " Is this process shutting down?");
147     } else {
148       executor.submit(eh);
149     }
150   }
151 
152   /**
153    * Subscribe to updates before and after processing instances of
154    * {@link EventType}.  Currently only one listener per
155    * event type.
156    * @param type Type of event we're registering listener for
157    * @param listener The listener to run.
158    */
159   public void registerListener(final EventType type,
160       final EventHandlerListener listener) {
161     this.eventHandlerListeners.put(type, listener);
162   }
163 
164   /**
165    * Stop receiving updates before and after processing instances of
166    * {@link EventType}
167    * @param type Type of event we're registering listener for
168    * @return The listener we removed or null if we did not remove it.
169    */
170   public EventHandlerListener unregisterListener(final EventType type) {
171     return this.eventHandlerListeners.remove(type);
172   }
173 
174   public Map<String, ExecutorStatus> getAllExecutorStatuses() {
175     Map<String, ExecutorStatus> ret = Maps.newHashMap();
176     for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
177       ret.put(e.getKey(), e.getValue().getStatus());
178     }
179     return ret;
180   }
181   
182   /**
183    * Executor instance.
184    */
185   static class Executor {
186     // how long to retain excess threads
187     static final long keepAliveTimeInMillis = 1000;
188     // the thread pool executor that services the requests
189     final TrackingThreadPoolExecutor threadPoolExecutor;
190     // work queue to use - unbounded queue
191     final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
192     private final String name;
193     private final Map<EventType, EventHandlerListener> eventHandlerListeners;
194     private static final AtomicLong seqids = new AtomicLong(0);
195     private final long id;
196 
197     protected Executor(String name, int maxThreads,
198         final Map<EventType, EventHandlerListener> eventHandlerListeners) {
199       this.id = seqids.incrementAndGet();
200       this.name = name;
201       this.eventHandlerListeners = eventHandlerListeners;
202       // create the thread pool executor
203       this.threadPoolExecutor = new TrackingThreadPoolExecutor(
204           maxThreads, maxThreads,
205           keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
206       // name the threads for this threadpool
207       ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
208       tfb.setNameFormat(this.name + "-%d");
209       this.threadPoolExecutor.setThreadFactory(tfb.build());
210     }
211 
212     /**
213      * Submit the event to the queue for handling.
214      * @param event
215      */
216     void submit(final EventHandler event) {
217       // If there is a listener for this type, make sure we call the before
218       // and after process methods.
219       EventHandlerListener listener =
220         this.eventHandlerListeners.get(event.getEventType());
221       if (listener != null) {
222         event.setListener(listener);
223       }
224       this.threadPoolExecutor.execute(event);
225     }
226     
227     public String toString() {
228       return getClass().getSimpleName() + "-" + id + "-" + name;
229     }
230 
231     public ExecutorStatus getStatus() {
232       List<EventHandler> queuedEvents = Lists.newArrayList();
233       for (Runnable r : q) {
234         if (!(r instanceof EventHandler)) {
235           LOG.warn("Non-EventHandler " + r + " queued in " + name);
236           continue;
237         }
238         queuedEvents.add((EventHandler)r);
239       }
240       
241       List<RunningEventStatus> running = Lists.newArrayList();
242       for (Map.Entry<Thread, Runnable> e :
243           threadPoolExecutor.getRunningTasks().entrySet()) {
244         Runnable r = e.getValue();
245         if (!(r instanceof EventHandler)) {
246           LOG.warn("Non-EventHandler " + r + " running in " + name);
247           continue;
248         }
249         running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
250       }
251       
252       return new ExecutorStatus(this, queuedEvents, running);
253     }
254   }
255  
256   /**
257    * A subclass of ThreadPoolExecutor that keeps track of the Runnables that
258    * are executing at any given point in time.
259    */
260   static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
261     private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 
262       
263     public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
264         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
265       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
266     }
267 
268     @Override
269     protected void afterExecute(Runnable r, Throwable t) {
270       super.afterExecute(r, t);
271       running.remove(Thread.currentThread());
272     }
273 
274     @Override
275     protected void beforeExecute(Thread t, Runnable r) {
276       Runnable oldPut = running.put(t, r);
277       assert oldPut == null : "inconsistency for thread " + t;
278       super.beforeExecute(t, r);
279     }
280    
281     /**
282      * @return a map of the threads currently running tasks
283      * inside this executor. Each key is an active thread,
284      * and the value is the task that is currently running.
285      * Note that this is not a stable snapshot of the map.
286      */
287     public ConcurrentMap<Thread, Runnable> getRunningTasks() {
288       return running;
289     }
290   }
291 
292   /**
293    * A snapshot of the status of a particular executor. This includes
294    * the contents of the executor's pending queue, as well as the
295    * threads and events currently being processed.
296    *
297    * This is a consistent snapshot that is immutable once constructed.
298    */
299   public static class ExecutorStatus {
300     final Executor executor;
301     final List<EventHandler> queuedEvents;
302     final List<RunningEventStatus> running;
303 
304     ExecutorStatus(Executor executor,
305         List<EventHandler> queuedEvents,
306         List<RunningEventStatus> running) {
307       this.executor = executor;
308       this.queuedEvents = queuedEvents;
309       this.running = running;
310     }
311    
312     /**
313      * Dump a textual representation of the executor's status
314      * to the given writer.
315      *
316      * @param out the stream to write to
317      * @param indent a string prefix for each line, used for indentation
318      */
319     public void dumpTo(Writer out, String indent) throws IOException {
320       out.write(indent + "Status for executor: " + executor + "\n");
321       out.write(indent + "=======================================\n");
322       out.write(indent + queuedEvents.size() + " events queued, " +
323           running.size() + " running\n");
324       if (!queuedEvents.isEmpty()) {
325         out.write(indent + "Queued:\n");
326         for (EventHandler e : queuedEvents) {
327           out.write(indent + "  " + e + "\n");
328         }
329         out.write("\n");
330       }
331       if (!running.isEmpty()) {
332         out.write(indent + "Running:\n");
333         for (RunningEventStatus stat : running) {
334           out.write(indent + "  Running on thread '" +
335               stat.threadInfo.getThreadName() +
336               "': " + stat.event + "\n");
337           out.write(ThreadMonitoring.formatThreadInfo(
338               stat.threadInfo, indent + "  "));
339           out.write("\n");
340         }
341       }
342       out.flush();
343     }
344   }
345 
346   /**
347    * The status of a particular event that is in the middle of being
348    * handled by an executor.
349    */
350   public static class RunningEventStatus {
351     final ThreadInfo threadInfo;
352     final EventHandler event;
353 
354     public RunningEventStatus(Thread t, EventHandler event) {
355       this.threadInfo = ThreadMonitoring.getThreadInfo(t);
356       this.event = event;
357     }
358   }
359 }