001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.executor;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.lang.management.ThreadInfo;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.concurrent.Executors;
031import java.util.concurrent.LinkedBlockingQueue;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
041import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046
047/**
048 * This is a generic executor service. This component abstracts a
049 * threadpool, a queue to which {@link EventType}s can be submitted,
050 * and a <code>Runnable</code> that handles the object that is added to the queue.
051 *
052 * <p>In order to create a new service, create an instance of this class and
053 * then do: <code>instance.startExecutorService("myService");</code>.  When done
054 * call {@link #shutdown()}.
055 *
056 * <p>In order to use the service created above, call
057 * {@link #submit(EventHandler)}.
058 */
059@InterfaceAudience.Private
060public class ExecutorService {
061  private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
062
063  // hold the all the executors created in a map addressable by their names
064  private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>();
065
066  // Name of the server hosting this executor service.
067  private final String servername;
068
069  private final ListeningScheduledExecutorService delayedSubmitTimer =
070    MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
071      .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build()));
072
073  /**
074   * Default constructor.
075   * @param servername Name of the hosting server.
076   */
077  public ExecutorService(final String servername) {
078    this.servername = servername;
079  }
080
081  /**
082   * Start an executor service with a given name. If there was a service already
083   * started with the same name, this throws a RuntimeException.
084   * @param name Name of the service to start.
085   */
086  public void startExecutorService(String name, int maxThreads) {
087    if (this.executorMap.get(name) != null) {
088      throw new RuntimeException("An executor service with the name " + name +
089        " is already running!");
090    }
091    Executor hbes = new Executor(name, maxThreads);
092    if (this.executorMap.putIfAbsent(name, hbes) != null) {
093      throw new RuntimeException("An executor service with the name " + name +
094      " is already running (2)!");
095    }
096    LOG.debug("Starting executor service name=" + name +
097      ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
098      ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
099  }
100
101  boolean isExecutorServiceRunning(String name) {
102    return this.executorMap.containsKey(name);
103  }
104
105  public void shutdown() {
106    this.delayedSubmitTimer.shutdownNow();
107    for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
108      List<Runnable> wasRunning =
109        entry.getValue().threadPoolExecutor.shutdownNow();
110      if (!wasRunning.isEmpty()) {
111        LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
112      }
113    }
114    this.executorMap.clear();
115  }
116
117  Executor getExecutor(final ExecutorType type) {
118    return getExecutor(type.getExecutorName(this.servername));
119  }
120
121  Executor getExecutor(String name) {
122    Executor executor = this.executorMap.get(name);
123    return executor;
124  }
125
126  public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
127    return getExecutor(type).getThreadPoolExecutor();
128  }
129
130  public void startExecutorService(final ExecutorType type, final int maxThreads) {
131    String name = type.getExecutorName(this.servername);
132    if (isExecutorServiceRunning(name)) {
133      LOG.debug("Executor service " + toString() + " already running on " + this.servername);
134      return;
135    }
136    startExecutorService(name, maxThreads);
137  }
138
139  /**
140   * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
141   * paths should use this method to get the executor, should not start executor by using
142   * {@link ExecutorService#startExecutorService(ExecutorType, int)}
143   */
144  public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) {
145    String name = type.getExecutorName(this.servername);
146    return executorMap
147        .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads))
148        .getThreadPoolExecutor();
149  }
150
151  public void submit(final EventHandler eh) {
152    Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
153    if (executor == null) {
154      // This happens only when events are submitted after shutdown() was
155      // called, so dropping them should be "ok" since it means we're
156      // shutting down.
157      LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
158        " Is this process shutting down?");
159    } else {
160      executor.submit(eh);
161    }
162  }
163
164  // Submit the handler after the given delay. Used for retrying.
165  public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) {
166    ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit);
167    future.addListener(() -> {
168      try {
169        future.get();
170      } catch (Exception e) {
171        LOG.error("Failed to submit the event handler {} to executor", eh, e);
172      }
173    }, MoreExecutors.directExecutor());
174  }
175
176  public Map<String, ExecutorStatus> getAllExecutorStatuses() {
177    Map<String, ExecutorStatus> ret = Maps.newHashMap();
178    for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
179      ret.put(e.getKey(), e.getValue().getStatus());
180    }
181    return ret;
182  }
183
184  /**
185   * Executor instance.
186   */
187  static class Executor {
188    // how long to retain excess threads
189    static final long keepAliveTimeInMillis = 1000;
190    // the thread pool executor that services the requests
191    final TrackingThreadPoolExecutor threadPoolExecutor;
192    // work queue to use - unbounded queue
193    final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>();
194    private final String name;
195    private static final AtomicLong seqids = new AtomicLong(0);
196    private final long id;
197
198    protected Executor(String name, int maxThreads) {
199      this.id = seqids.incrementAndGet();
200      this.name = name;
201      // create the thread pool executor
202      this.threadPoolExecutor = new TrackingThreadPoolExecutor(
203          maxThreads, maxThreads,
204          keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
205      // name the threads for this threadpool
206      ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
207      tfb.setNameFormat(this.name + "-%d");
208      tfb.setDaemon(true);
209      this.threadPoolExecutor.setThreadFactory(tfb.build());
210    }
211
212    /**
213     * Submit the event to the queue for handling.
214     * @param event
215     */
216    void submit(final EventHandler event) {
217      // If there is a listener for this type, make sure we call the before
218      // and after process methods.
219      this.threadPoolExecutor.execute(event);
220    }
221
222    TrackingThreadPoolExecutor getThreadPoolExecutor() {
223      return threadPoolExecutor;
224    }
225
226    @Override
227    public String toString() {
228      return getClass().getSimpleName() + "-" + id + "-" + name;
229    }
230
231    public ExecutorStatus getStatus() {
232      List<EventHandler> queuedEvents = Lists.newArrayList();
233      for (Runnable r : q) {
234        if (!(r instanceof EventHandler)) {
235          LOG.warn("Non-EventHandler " + r + " queued in " + name);
236          continue;
237        }
238        queuedEvents.add((EventHandler)r);
239      }
240
241      List<RunningEventStatus> running = Lists.newArrayList();
242      for (Map.Entry<Thread, Runnable> e :
243          threadPoolExecutor.getRunningTasks().entrySet()) {
244        Runnable r = e.getValue();
245        if (!(r instanceof EventHandler)) {
246          LOG.warn("Non-EventHandler " + r + " running in " + name);
247          continue;
248        }
249        running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
250      }
251
252      return new ExecutorStatus(this, queuedEvents, running);
253    }
254  }
255
256  /**
257   * A subclass of ThreadPoolExecutor that keeps track of the Runnables that
258   * are executing at any given point in time.
259   */
260  static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
261    private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
262
263    public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
264        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
265      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
266    }
267
268    @Override
269    protected void afterExecute(Runnable r, Throwable t) {
270      super.afterExecute(r, t);
271      running.remove(Thread.currentThread());
272    }
273
274    @Override
275    protected void beforeExecute(Thread t, Runnable r) {
276      Runnable oldPut = running.put(t, r);
277      assert oldPut == null : "inconsistency for thread " + t;
278      super.beforeExecute(t, r);
279    }
280
281    /**
282     * @return a map of the threads currently running tasks
283     * inside this executor. Each key is an active thread,
284     * and the value is the task that is currently running.
285     * Note that this is not a stable snapshot of the map.
286     */
287    public ConcurrentMap<Thread, Runnable> getRunningTasks() {
288      return running;
289    }
290  }
291
292  /**
293   * A snapshot of the status of a particular executor. This includes
294   * the contents of the executor's pending queue, as well as the
295   * threads and events currently being processed.
296   *
297   * This is a consistent snapshot that is immutable once constructed.
298   */
299  public static class ExecutorStatus {
300    final Executor executor;
301    final List<EventHandler> queuedEvents;
302    final List<RunningEventStatus> running;
303
304    ExecutorStatus(Executor executor,
305        List<EventHandler> queuedEvents,
306        List<RunningEventStatus> running) {
307      this.executor = executor;
308      this.queuedEvents = queuedEvents;
309      this.running = running;
310    }
311
312    /**
313     * Dump a textual representation of the executor's status
314     * to the given writer.
315     *
316     * @param out the stream to write to
317     * @param indent a string prefix for each line, used for indentation
318     */
319    public void dumpTo(Writer out, String indent) throws IOException {
320      out.write(indent + "Status for executor: " + executor + "\n");
321      out.write(indent + "=======================================\n");
322      out.write(indent + queuedEvents.size() + " events queued, " +
323          running.size() + " running\n");
324      if (!queuedEvents.isEmpty()) {
325        out.write(indent + "Queued:\n");
326        for (EventHandler e : queuedEvents) {
327          out.write(indent + "  " + e + "\n");
328        }
329        out.write("\n");
330      }
331      if (!running.isEmpty()) {
332        out.write(indent + "Running:\n");
333        for (RunningEventStatus stat : running) {
334          out.write(indent + "  Running on thread '" +
335              stat.threadInfo.getThreadName() +
336              "': " + stat.event + "\n");
337          out.write(ThreadMonitoring.formatThreadInfo(
338              stat.threadInfo, indent + "  "));
339          out.write("\n");
340        }
341      }
342      out.flush();
343    }
344  }
345
346  /**
347   * The status of a particular event that is in the middle of being
348   * handled by an executor.
349   */
350  public static class RunningEventStatus {
351    final ThreadInfo threadInfo;
352    final EventHandler event;
353
354    public RunningEventStatus(Thread t, EventHandler event) {
355      this.threadInfo = ThreadMonitoring.getThreadInfo(t);
356      this.event = event;
357    }
358  }
359}