001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.executor;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.lang.management.ThreadInfo;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.concurrent.Executors;
031import java.util.concurrent.LinkedBlockingQueue;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035
036import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
042import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
043import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
047import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
048
049/**
050 * This is a generic executor service. This component abstracts a
051 * threadpool, a queue to which {@link EventType}s can be submitted,
052 * and a <code>Runnable</code> that handles the object that is added to the queue.
053 *
054 * <p>In order to create a new service, create an instance of this class and
055 * then do: <code>instance.startExecutorService("myService");</code>.  When done
056 * call {@link #shutdown()}.
057 *
058 * <p>In order to use the service created above, call
059 * {@link #submit(EventHandler)}.
060 */
061@InterfaceAudience.Private
062public class ExecutorService {
063  private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
064
065  // hold the all the executors created in a map addressable by their names
066  private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>();
067
068  // Name of the server hosting this executor service.
069  private final String servername;
070
071  private final ListeningScheduledExecutorService delayedSubmitTimer =
072    MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
073      .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build()));
074
075  /**
076   * Default constructor.
077   * @param servername Name of the hosting server.
078   */
079  public ExecutorService(final String servername) {
080    this.servername = servername;
081  }
082
083  /**
084   * Start an executor service with a given name. If there was a service already
085   * started with the same name, this throws a RuntimeException.
086   * @param name Name of the service to start.
087   */
088  @VisibleForTesting
089  public void startExecutorService(String name, int maxThreads) {
090    Executor hbes = this.executorMap.compute(name, (key, value) -> {
091      if (value != null) {
092        throw new RuntimeException("An executor service with the name " + key +
093            " is already running!");
094      }
095      return new Executor(key, maxThreads);
096    });
097
098    LOG.debug(
099        "Starting executor service name={}, corePoolSize={}, maxPoolSize={}",
100        name, hbes.threadPoolExecutor.getCorePoolSize(),
101        hbes.threadPoolExecutor.getMaximumPoolSize());
102  }
103
104  boolean isExecutorServiceRunning(String name) {
105    return this.executorMap.containsKey(name);
106  }
107
108  public void shutdown() {
109    this.delayedSubmitTimer.shutdownNow();
110    for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
111      List<Runnable> wasRunning =
112        entry.getValue().threadPoolExecutor.shutdownNow();
113      if (!wasRunning.isEmpty()) {
114        LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
115      }
116    }
117    this.executorMap.clear();
118  }
119
120  Executor getExecutor(final ExecutorType type) {
121    return getExecutor(type.getExecutorName(this.servername));
122  }
123
124  Executor getExecutor(String name) {
125    Executor executor = this.executorMap.get(name);
126    return executor;
127  }
128
129  @VisibleForTesting
130  public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
131    return getExecutor(type).getThreadPoolExecutor();
132  }
133
134  public void startExecutorService(final ExecutorType type, final int maxThreads) {
135    String name = type.getExecutorName(this.servername);
136    if (isExecutorServiceRunning(name)) {
137      LOG.debug("Executor service {} already running on {}", this,
138          this.servername);
139      return;
140    }
141    startExecutorService(name, maxThreads);
142  }
143
144  /**
145   * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
146   * paths should use this method to get the executor, should not start executor by using
147   * {@link ExecutorService#startExecutorService(ExecutorType, int)}
148   */
149  public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) {
150    String name = type.getExecutorName(this.servername);
151    return executorMap
152        .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads))
153        .getThreadPoolExecutor();
154  }
155
156  public void submit(final EventHandler eh) {
157    Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
158    if (executor == null) {
159      // This happens only when events are submitted after shutdown() was
160      // called, so dropping them should be "ok" since it means we're
161      // shutting down.
162      LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
163        " Is this process shutting down?");
164    } else {
165      executor.submit(eh);
166    }
167  }
168
169  // Submit the handler after the given delay. Used for retrying.
170  public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) {
171    ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit);
172    future.addListener(() -> {
173      try {
174        future.get();
175      } catch (Exception e) {
176        LOG.error("Failed to submit the event handler {} to executor", eh, e);
177      }
178    }, MoreExecutors.directExecutor());
179  }
180
181  public Map<String, ExecutorStatus> getAllExecutorStatuses() {
182    Map<String, ExecutorStatus> ret = Maps.newHashMap();
183    for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
184      ret.put(e.getKey(), e.getValue().getStatus());
185    }
186    return ret;
187  }
188
189  /**
190   * Executor instance.
191   */
192  static class Executor {
193    // how long to retain excess threads
194    static final long keepAliveTimeInMillis = 1000;
195    // the thread pool executor that services the requests
196    final TrackingThreadPoolExecutor threadPoolExecutor;
197    // work queue to use - unbounded queue
198    final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>();
199    private final String name;
200    private static final AtomicLong seqids = new AtomicLong(0);
201    private final long id;
202
203    protected Executor(String name, int maxThreads) {
204      this.id = seqids.incrementAndGet();
205      this.name = name;
206      // create the thread pool executor
207      this.threadPoolExecutor = new TrackingThreadPoolExecutor(
208          maxThreads, maxThreads,
209          keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
210      // name the threads for this threadpool
211      ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
212      tfb.setNameFormat(this.name + "-%d");
213      tfb.setDaemon(true);
214      this.threadPoolExecutor.setThreadFactory(tfb.build());
215    }
216
217    /**
218     * Submit the event to the queue for handling.
219     * @param event
220     */
221    void submit(final EventHandler event) {
222      // If there is a listener for this type, make sure we call the before
223      // and after process methods.
224      this.threadPoolExecutor.execute(event);
225    }
226
227    TrackingThreadPoolExecutor getThreadPoolExecutor() {
228      return threadPoolExecutor;
229    }
230
231    @Override
232    public String toString() {
233      return getClass().getSimpleName() + "-" + id + "-" + name;
234    }
235
236    public ExecutorStatus getStatus() {
237      List<EventHandler> queuedEvents = Lists.newArrayList();
238      for (Runnable r : q) {
239        if (!(r instanceof EventHandler)) {
240          LOG.warn("Non-EventHandler " + r + " queued in " + name);
241          continue;
242        }
243        queuedEvents.add((EventHandler)r);
244      }
245
246      List<RunningEventStatus> running = Lists.newArrayList();
247      for (Map.Entry<Thread, Runnable> e :
248          threadPoolExecutor.getRunningTasks().entrySet()) {
249        Runnable r = e.getValue();
250        if (!(r instanceof EventHandler)) {
251          LOG.warn("Non-EventHandler " + r + " running in " + name);
252          continue;
253        }
254        running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
255      }
256
257      return new ExecutorStatus(this, queuedEvents, running);
258    }
259  }
260
261  /**
262   * A subclass of ThreadPoolExecutor that keeps track of the Runnables that
263   * are executing at any given point in time.
264   */
265  static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
266    private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
267
268    public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
269        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
270      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
271    }
272
273    @Override
274    protected void afterExecute(Runnable r, Throwable t) {
275      super.afterExecute(r, t);
276      running.remove(Thread.currentThread());
277    }
278
279    @Override
280    protected void beforeExecute(Thread t, Runnable r) {
281      Runnable oldPut = running.put(t, r);
282      assert oldPut == null : "inconsistency for thread " + t;
283      super.beforeExecute(t, r);
284    }
285
286    /**
287     * @return a map of the threads currently running tasks
288     * inside this executor. Each key is an active thread,
289     * and the value is the task that is currently running.
290     * Note that this is not a stable snapshot of the map.
291     */
292    public ConcurrentMap<Thread, Runnable> getRunningTasks() {
293      return running;
294    }
295  }
296
297  /**
298   * A snapshot of the status of a particular executor. This includes
299   * the contents of the executor's pending queue, as well as the
300   * threads and events currently being processed.
301   *
302   * This is a consistent snapshot that is immutable once constructed.
303   */
304  public static class ExecutorStatus {
305    final Executor executor;
306    final List<EventHandler> queuedEvents;
307    final List<RunningEventStatus> running;
308
309    ExecutorStatus(Executor executor,
310        List<EventHandler> queuedEvents,
311        List<RunningEventStatus> running) {
312      this.executor = executor;
313      this.queuedEvents = queuedEvents;
314      this.running = running;
315    }
316
317    public List<EventHandler> getQueuedEvents() {
318      return queuedEvents;
319    }
320
321    public List<RunningEventStatus> getRunning() {
322      return running;
323    }
324
325    /**
326     * Dump a textual representation of the executor's status
327     * to the given writer.
328     *
329     * @param out the stream to write to
330     * @param indent a string prefix for each line, used for indentation
331     */
332    public void dumpTo(Writer out, String indent) throws IOException {
333      out.write(indent + "Status for executor: " + executor + "\n");
334      out.write(indent + "=======================================\n");
335      out.write(indent + queuedEvents.size() + " events queued, " +
336          running.size() + " running\n");
337      if (!queuedEvents.isEmpty()) {
338        out.write(indent + "Queued:\n");
339        for (EventHandler e : queuedEvents) {
340          out.write(indent + "  " + e + "\n");
341        }
342        out.write("\n");
343      }
344      if (!running.isEmpty()) {
345        out.write(indent + "Running:\n");
346        for (RunningEventStatus stat : running) {
347          out.write(indent + "  Running on thread '" +
348              stat.threadInfo.getThreadName() +
349              "': " + stat.event + "\n");
350          out.write(ThreadMonitoring.formatThreadInfo(
351              stat.threadInfo, indent + "  "));
352          out.write("\n");
353        }
354      }
355      out.flush();
356    }
357  }
358
359  /**
360   * The status of a particular event that is in the middle of being
361   * handled by an executor.
362   */
363  public static class RunningEventStatus {
364    final ThreadInfo threadInfo;
365    final EventHandler event;
366
367    public RunningEventStatus(Thread t, EventHandler event) {
368      this.threadInfo = ThreadMonitoring.getThreadInfo(t);
369      this.event = event;
370    }
371  }
372}