View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.executor;
20  
21  import java.io.IOException;
22  import java.io.Writer;
23  import java.lang.management.ThreadInfo;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
39  
40  import com.google.common.annotations.VisibleForTesting;
41  import com.google.common.collect.Lists;
42  import com.google.common.collect.Maps;
43  import com.google.common.util.concurrent.ThreadFactoryBuilder;
44  
45  /**
46   * This is a generic executor service. This component abstracts a
47   * threadpool, a queue to which {@link EventType}s can be submitted,
48   * and a <code>Runnable</code> that handles the object that is added to the queue.
49   *
50   * <p>In order to create a new service, create an instance of this class and
51   * then do: <code>instance.startExecutorService("myService");</code>.  When done
52   * call {@link #shutdown()}.
53   *
54   * <p>In order to use the service created above, call
55   * {@link #submit(EventHandler)}.
56   */
57  @InterfaceAudience.Private
58  public class ExecutorService {
59    private static final Log LOG = LogFactory.getLog(ExecutorService.class);
60  
61    // hold the all the executors created in a map addressable by their names
62    private final ConcurrentHashMap<String, Executor> executorMap =
63      new ConcurrentHashMap<String, Executor>();
64  
65    // Name of the server hosting this executor service.
66    private final String servername;
67  
68    /**
69     * Default constructor.
70     * @param servername Name of the hosting server.
71     */
72    public ExecutorService(final String servername) {
73      super();
74      this.servername = servername;
75    }
76  
77    /**
78     * Start an executor service with a given name. If there was a service already
79     * started with the same name, this throws a RuntimeException.
80     * @param name Name of the service to start.
81     */
82    @VisibleForTesting
83    public void startExecutorService(String name, int maxThreads) {
84      if (this.executorMap.get(name) != null) {
85        throw new RuntimeException("An executor service with the name " + name +
86          " is already running!");
87      }
88      Executor hbes = new Executor(name, maxThreads);
89      if (this.executorMap.putIfAbsent(name, hbes) != null) {
90        throw new RuntimeException("An executor service with the name " + name +
91        " is already running (2)!");
92      }
93      LOG.debug("Starting executor service name=" + name +
94        ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
95        ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
96    }
97  
98    boolean isExecutorServiceRunning(String name) {
99      return this.executorMap.containsKey(name);
100   }
101 
102   public void shutdown() {
103     for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
104       List<Runnable> wasRunning =
105         entry.getValue().threadPoolExecutor.shutdownNow();
106       if (!wasRunning.isEmpty()) {
107         LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
108       }
109     }
110     this.executorMap.clear();
111   }
112 
113   Executor getExecutor(final ExecutorType type) {
114     return getExecutor(type.getExecutorName(this.servername));
115   }
116 
117   Executor getExecutor(String name) {
118     Executor executor = this.executorMap.get(name);
119     return executor;
120   }
121 
122   @VisibleForTesting
123   public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
124     return getExecutor(type).getThreadPoolExecutor();
125   }
126
127   public void startExecutorService(final ExecutorType type, final int maxThreads) {
128     String name = type.getExecutorName(this.servername);
129     if (isExecutorServiceRunning(name)) {
130       LOG.debug("Executor service " + toString() + " already running on " +
131           this.servername);
132       return;
133     }
134     startExecutorService(name, maxThreads);
135   }
136
137   public void submit(final EventHandler eh) {
138     Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
139     if (executor == null) {
140       // This happens only when events are submitted after shutdown() was
141       // called, so dropping them should be "ok" since it means we're
142       // shutting down.
143       LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
144         " Is this process shutting down?");
145     } else {
146       executor.submit(eh);
147     }
148   }
149
150   public Map<String, ExecutorStatus> getAllExecutorStatuses() {
151     Map<String, ExecutorStatus> ret = Maps.newHashMap();
152     for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
153       ret.put(e.getKey(), e.getValue().getStatus());
154     }
155     return ret;
156   }
157
158   /**
159    * Executor instance.
160    */
161   static class Executor {
162     // how long to retain excess threads
163     static final long keepAliveTimeInMillis = 1000;
164     // the thread pool executor that services the requests
165     final TrackingThreadPoolExecutor threadPoolExecutor;
166     // work queue to use - unbounded queue
167     final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
168     private final String name;
169     private static final AtomicLong seqids = new AtomicLong(0);
170     private final long id;
171
172     protected Executor(String name, int maxThreads) {
173       this.id = seqids.incrementAndGet();
174       this.name = name;
175       // create the thread pool executor
176       this.threadPoolExecutor = new TrackingThreadPoolExecutor(
177           maxThreads, maxThreads,
178           keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
179       // name the threads for this threadpool
180       ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
181       tfb.setNameFormat(this.name + "-%d");
182       this.threadPoolExecutor.setThreadFactory(tfb.build());
183     }
184
185     /**
186      * Submit the event to the queue for handling.
187      * @param event
188      */
189     void submit(final EventHandler event) {
190       // If there is a listener for this type, make sure we call the before
191       // and after process methods.
192       this.threadPoolExecutor.execute(event);
193     }
194 
195     TrackingThreadPoolExecutor getThreadPoolExecutor() {
196       return threadPoolExecutor;
197     }
198
199     @Override
200     public String toString() {
201       return getClass().getSimpleName() + "-" + id + "-" + name;
202     }
203
204     public ExecutorStatus getStatus() {
205       List<EventHandler> queuedEvents = Lists.newArrayList();
206       for (Runnable r : q) {
207         if (!(r instanceof EventHandler)) {
208           LOG.warn("Non-EventHandler " + r + " queued in " + name);
209           continue;
210         }
211         queuedEvents.add((EventHandler)r);
212       }
213
214       List<RunningEventStatus> running = Lists.newArrayList();
215       for (Map.Entry<Thread, Runnable> e :
216           threadPoolExecutor.getRunningTasks().entrySet()) {
217         Runnable r = e.getValue();
218         if (!(r instanceof EventHandler)) {
219           LOG.warn("Non-EventHandler " + r + " running in " + name);
220           continue;
221         }
222         running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
223       }
224
225       return new ExecutorStatus(this, queuedEvents, running);
226     }
227   }
228
229   /**
230    * A subclass of ThreadPoolExecutor that keeps track of the Runnables that
231    * are executing at any given point in time.
232    */
233   static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
234     private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
235
236     public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
237         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
238       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
239     }
240
241     @Override
242     protected void afterExecute(Runnable r, Throwable t) {
243       super.afterExecute(r, t);
244       running.remove(Thread.currentThread());
245     }
246
247     @Override
248     protected void beforeExecute(Thread t, Runnable r) {
249       Runnable oldPut = running.put(t, r);
250       assert oldPut == null : "inconsistency for thread " + t;
251       super.beforeExecute(t, r);
252     }
253
254     /**
255      * @return a map of the threads currently running tasks
256      * inside this executor. Each key is an active thread,
257      * and the value is the task that is currently running.
258      * Note that this is not a stable snapshot of the map.
259      */
260     public ConcurrentMap<Thread, Runnable> getRunningTasks() {
261       return running;
262     }
263   }
264
265   /**
266    * A snapshot of the status of a particular executor. This includes
267    * the contents of the executor's pending queue, as well as the
268    * threads and events currently being processed.
269    *
270    * This is a consistent snapshot that is immutable once constructed.
271    */
272   public static class ExecutorStatus {
273     final Executor executor;
274     final List<EventHandler> queuedEvents;
275     final List<RunningEventStatus> running;
276
277     ExecutorStatus(Executor executor,
278         List<EventHandler> queuedEvents,
279         List<RunningEventStatus> running) {
280       this.executor = executor;
281       this.queuedEvents = queuedEvents;
282       this.running = running;
283     }
284
285     /**
286      * Dump a textual representation of the executor's status
287      * to the given writer.
288      *
289      * @param out the stream to write to
290      * @param indent a string prefix for each line, used for indentation
291      */
292     public void dumpTo(Writer out, String indent) throws IOException {
293       out.write(indent + "Status for executor: " + executor + "\n");
294       out.write(indent + "=======================================\n");
295       out.write(indent + queuedEvents.size() + " events queued, " +
296           running.size() + " running\n");
297       if (!queuedEvents.isEmpty()) {
298         out.write(indent + "Queued:\n");
299         for (EventHandler e : queuedEvents) {
300           out.write(indent + "  " + e + "\n");
301         }
302         out.write("\n");
303       }
304       if (!running.isEmpty()) {
305         out.write(indent + "Running:\n");
306         for (RunningEventStatus stat : running) {
307           out.write(indent + "  Running on thread '" +
308               stat.threadInfo.getThreadName() +
309               "': " + stat.event + "\n");
310           out.write(ThreadMonitoring.formatThreadInfo(
311               stat.threadInfo, indent + "  "));
312           out.write("\n");
313         }
314       }
315       out.flush();
316     }
317   }
318
319   /**
320    * The status of a particular event that is in the middle of being
321    * handled by an executor.
322    */
323   public static class RunningEventStatus {
324     final ThreadInfo threadInfo;
325     final EventHandler event;
326
327     public RunningEventStatus(Thread t, EventHandler event) {
328       this.threadInfo = ThreadMonitoring.getThreadInfo(t);
329       this.event = event;
330     }
331   }
332 }