View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.executor;
20  
21  import java.io.IOException;
22  import java.io.Writer;
23  import java.lang.management.ThreadInfo;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
39  
40  import com.google.common.annotations.VisibleForTesting;
41  import com.google.common.collect.Lists;
42  import com.google.common.collect.Maps;
43  import com.google.common.util.concurrent.ThreadFactoryBuilder;
44  
45  /**
46   * This is a generic executor service. This component abstracts a
47   * threadpool, a queue to which {@link EventType}s can be submitted,
48   * and a <code>Runnable</code> that handles the object that is added to the queue.
49   *
50   * <p>In order to create a new service, create an instance of this class and
51   * then do: <code>instance.startExecutorService("myService");</code>.  When done
52   * call {@link #shutdown()}.
53   *
54   * <p>In order to use the service created above, call
55   * {@link #submit(EventHandler)}.
56   */
57  @InterfaceAudience.Private
58  public class ExecutorService {
59    private static final Log LOG = LogFactory.getLog(ExecutorService.class);
60  
61    // hold the all the executors created in a map addressable by their names
62    private final ConcurrentHashMap<String, Executor> executorMap =
63      new ConcurrentHashMap<String, Executor>();
64  
65    // Name of the server hosting this executor service.
66    private final String servername;
67  
68    /**
69     * Default constructor.
70     * @param servername Name of the hosting server.
71     */
72    public ExecutorService(final String servername) {
73      super();
74      this.servername = servername;
75    }
76  
77    /**
78     * Start an executor service with a given name. If there was a service already
79     * started with the same name, this throws a RuntimeException.
80     * @param name Name of the service to start.
81     */
82    @VisibleForTesting
83    public void startExecutorService(String name, int maxThreads) {
84      if (this.executorMap.get(name) != null) {
85        throw new RuntimeException("An executor service with the name " + name +
86          " is already running!");
87      }
88      Executor hbes = new Executor(name, maxThreads);
89      if (this.executorMap.putIfAbsent(name, hbes) != null) {
90        throw new RuntimeException("An executor service with the name " + name +
91        " is already running (2)!");
92      }
93      LOG.debug("Starting executor service name=" + name +
94        ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
95        ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
96    }
97  
98    boolean isExecutorServiceRunning(String name) {
99      return this.executorMap.containsKey(name);
100   }
101 
102   public void shutdown() {
103     for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
104       List<Runnable> wasRunning =
105         entry.getValue().threadPoolExecutor.shutdownNow();
106       if (!wasRunning.isEmpty()) {
107         LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
108       }
109     }
110     this.executorMap.clear();
111   }
112 
113   Executor getExecutor(final ExecutorType type) {
114     return getExecutor(type.getExecutorName(this.servername));
115   }
116 
117   Executor getExecutor(String name) {
118     Executor executor = this.executorMap.get(name);
119     return executor;
120   }
121 
122 
123   public void startExecutorService(final ExecutorType type, final int maxThreads) {
124     String name = type.getExecutorName(this.servername);
125     if (isExecutorServiceRunning(name)) {
126       LOG.debug("Executor service " + toString() + " already running on " +
127           this.servername);
128       return;
129     }
130     startExecutorService(name, maxThreads);
131   }
132 
133   public void submit(final EventHandler eh) {
134     Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
135     if (executor == null) {
136       // This happens only when events are submitted after shutdown() was
137       // called, so dropping them should be "ok" since it means we're
138       // shutting down.
139       LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
140         " Is this process shutting down?");
141     } else {
142       executor.submit(eh);
143     }
144   }
145 
146   public Map<String, ExecutorStatus> getAllExecutorStatuses() {
147     Map<String, ExecutorStatus> ret = Maps.newHashMap();
148     for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
149       ret.put(e.getKey(), e.getValue().getStatus());
150     }
151     return ret;
152   }
153   
154   /**
155    * Executor instance.
156    */
157   static class Executor {
158     // how long to retain excess threads
159     static final long keepAliveTimeInMillis = 1000;
160     // the thread pool executor that services the requests
161     final TrackingThreadPoolExecutor threadPoolExecutor;
162     // work queue to use - unbounded queue
163     final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
164     private final String name;
165     private static final AtomicLong seqids = new AtomicLong(0);
166     private final long id;
167 
168     protected Executor(String name, int maxThreads) {
169       this.id = seqids.incrementAndGet();
170       this.name = name;
171       // create the thread pool executor
172       this.threadPoolExecutor = new TrackingThreadPoolExecutor(
173           maxThreads, maxThreads,
174           keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
175       // name the threads for this threadpool
176       ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
177       tfb.setNameFormat(this.name + "-%d");
178       this.threadPoolExecutor.setThreadFactory(tfb.build());
179     }
180 
181     /**
182      * Submit the event to the queue for handling.
183      * @param event
184      */
185     void submit(final EventHandler event) {
186       // If there is a listener for this type, make sure we call the before
187       // and after process methods.
188       this.threadPoolExecutor.execute(event);
189     }
190     
191     public String toString() {
192       return getClass().getSimpleName() + "-" + id + "-" + name;
193     }
194 
195     public ExecutorStatus getStatus() {
196       List<EventHandler> queuedEvents = Lists.newArrayList();
197       for (Runnable r : q) {
198         if (!(r instanceof EventHandler)) {
199           LOG.warn("Non-EventHandler " + r + " queued in " + name);
200           continue;
201         }
202         queuedEvents.add((EventHandler)r);
203       }
204       
205       List<RunningEventStatus> running = Lists.newArrayList();
206       for (Map.Entry<Thread, Runnable> e :
207           threadPoolExecutor.getRunningTasks().entrySet()) {
208         Runnable r = e.getValue();
209         if (!(r instanceof EventHandler)) {
210           LOG.warn("Non-EventHandler " + r + " running in " + name);
211           continue;
212         }
213         running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
214       }
215       
216       return new ExecutorStatus(this, queuedEvents, running);
217     }
218   }
219  
220   /**
221    * A subclass of ThreadPoolExecutor that keeps track of the Runnables that
222    * are executing at any given point in time.
223    */
224   static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
225     private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap(); 
226       
227     public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
228         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
229       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
230     }
231 
232     @Override
233     protected void afterExecute(Runnable r, Throwable t) {
234       super.afterExecute(r, t);
235       running.remove(Thread.currentThread());
236     }
237 
238     @Override
239     protected void beforeExecute(Thread t, Runnable r) {
240       Runnable oldPut = running.put(t, r);
241       assert oldPut == null : "inconsistency for thread " + t;
242       super.beforeExecute(t, r);
243     }
244    
245     /**
246      * @return a map of the threads currently running tasks
247      * inside this executor. Each key is an active thread,
248      * and the value is the task that is currently running.
249      * Note that this is not a stable snapshot of the map.
250      */
251     public ConcurrentMap<Thread, Runnable> getRunningTasks() {
252       return running;
253     }
254   }
255 
256   /**
257    * A snapshot of the status of a particular executor. This includes
258    * the contents of the executor's pending queue, as well as the
259    * threads and events currently being processed.
260    *
261    * This is a consistent snapshot that is immutable once constructed.
262    */
263   public static class ExecutorStatus {
264     final Executor executor;
265     final List<EventHandler> queuedEvents;
266     final List<RunningEventStatus> running;
267 
268     ExecutorStatus(Executor executor,
269         List<EventHandler> queuedEvents,
270         List<RunningEventStatus> running) {
271       this.executor = executor;
272       this.queuedEvents = queuedEvents;
273       this.running = running;
274     }
275    
276     /**
277      * Dump a textual representation of the executor's status
278      * to the given writer.
279      *
280      * @param out the stream to write to
281      * @param indent a string prefix for each line, used for indentation
282      */
283     public void dumpTo(Writer out, String indent) throws IOException {
284       out.write(indent + "Status for executor: " + executor + "\n");
285       out.write(indent + "=======================================\n");
286       out.write(indent + queuedEvents.size() + " events queued, " +
287           running.size() + " running\n");
288       if (!queuedEvents.isEmpty()) {
289         out.write(indent + "Queued:\n");
290         for (EventHandler e : queuedEvents) {
291           out.write(indent + "  " + e + "\n");
292         }
293         out.write("\n");
294       }
295       if (!running.isEmpty()) {
296         out.write(indent + "Running:\n");
297         for (RunningEventStatus stat : running) {
298           out.write(indent + "  Running on thread '" +
299               stat.threadInfo.getThreadName() +
300               "': " + stat.event + "\n");
301           out.write(ThreadMonitoring.formatThreadInfo(
302               stat.threadInfo, indent + "  "));
303           out.write("\n");
304         }
305       }
306       out.flush();
307     }
308   }
309 
310   /**
311    * The status of a particular event that is in the middle of being
312    * handled by an executor.
313    */
314   public static class RunningEventStatus {
315     final ThreadInfo threadInfo;
316     final EventHandler event;
317 
318     public RunningEventStatus(Thread t, EventHandler event) {
319       this.threadInfo = ThreadMonitoring.getThreadInfo(t);
320       this.event = event;
321     }
322   }
323 }