001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.executor;
019
020import java.io.IOException;
021import java.io.Writer;
022import java.lang.management.ThreadInfo;
023import java.util.List;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.Executors;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
042import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
047
048/**
049 * This is a generic executor service. This component abstracts a threadpool, a queue to which
050 * {@link EventType}s can be submitted, and a <code>Runnable</code> that handles the object that is
051 * added to the queue.
052 * <p>
053 * In order to create a new service, create an instance of this class and then do:
054 * <code>instance.startExecutorService(executorConfig);</code>. {@link ExecutorConfig} wraps the
055 * configuration needed by this service. When done call {@link #shutdown()}.
056 * <p>
057 * In order to use the service created above, call {@link #submit(EventHandler)}.
058 */
059@InterfaceAudience.Private
060public class ExecutorService {
061  private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
062
063  // hold the all the executors created in a map addressable by their names
064  private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>();
065
066  // Name of the server hosting this executor service.
067  private final String servername;
068
069  private final ListeningScheduledExecutorService delayedSubmitTimer =
070    MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
071      .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build()));
072
073  /**
074   * Default constructor.
075   * @param servername Name of the hosting server.
076   */
077  public ExecutorService(final String servername) {
078    this.servername = servername;
079  }
080
081  /**
082   * Start an executor service with a given name. If there was a service already started with the
083   * same name, this throws a RuntimeException.
084   * @param config Configuration to use for the executor.
085   */
086  public void startExecutorService(final ExecutorConfig config) {
087    final String name = config.getName();
088    Executor hbes = this.executorMap.compute(name, (key, value) -> {
089      if (value != null) {
090        throw new RuntimeException(
091          "An executor service with the name " + key + " is already running!");
092      }
093      return new Executor(config);
094    });
095
096    LOG.debug("Starting executor service name={}, corePoolSize={}, maxPoolSize={}", name,
097      hbes.threadPoolExecutor.getCorePoolSize(), hbes.threadPoolExecutor.getMaximumPoolSize());
098  }
099
100  boolean isExecutorServiceRunning(String name) {
101    return this.executorMap.containsKey(name);
102  }
103
104  public void shutdown() {
105    this.delayedSubmitTimer.shutdownNow();
106    for (Entry<String, Executor> entry : this.executorMap.entrySet()) {
107      List<Runnable> wasRunning = entry.getValue().threadPoolExecutor.shutdownNow();
108      if (!wasRunning.isEmpty()) {
109        LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
110      }
111    }
112    this.executorMap.clear();
113  }
114
115  Executor getExecutor(final ExecutorType type) {
116    return getExecutor(type.getExecutorName(this.servername));
117  }
118
119  Executor getExecutor(String name) {
120    return this.executorMap.get(name);
121  }
122
123  public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
124    return getExecutor(type).getThreadPoolExecutor();
125  }
126
127  /**
128   * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
129   * paths should use this method to get the executor, should not start executor by using
130   * {@link ExecutorService#startExecutorService(ExecutorConfig)}
131   */
132  public ThreadPoolExecutor getExecutorLazily(ExecutorConfig config) {
133    return executorMap.computeIfAbsent(config.getName(), (executorName) -> new Executor(config))
134      .getThreadPoolExecutor();
135  }
136
137  public void submit(final EventHandler eh) {
138    Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
139    if (executor == null) {
140      // This happens only when events are submitted after shutdown() was
141      // called, so dropping them should be "ok" since it means we're
142      // shutting down.
143      LOG.error("Cannot submit [" + eh + "] because the executor is missing."
144        + " Is this process shutting down?");
145    } else {
146      executor.submit(eh);
147    }
148  }
149
150  // Submit the handler after the given delay. Used for retrying.
151  public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) {
152    ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit);
153    future.addListener(() -> {
154      try {
155        future.get();
156      } catch (Exception e) {
157        LOG.error("Failed to submit the event handler {} to executor", eh, e);
158      }
159    }, MoreExecutors.directExecutor());
160  }
161
162  public Map<String, ExecutorStatus> getAllExecutorStatuses() {
163    Map<String, ExecutorStatus> ret = Maps.newHashMap();
164    for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
165      ret.put(e.getKey(), e.getValue().getStatus());
166    }
167    return ret;
168  }
169
170  /**
171   * Configuration wrapper for {@link Executor}.
172   */
173  public class ExecutorConfig {
174    // Refer to ThreadPoolExecutor javadoc for details of these configuration.
175    // Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
176    // implementation.
177    public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000;
178    private int corePoolSize = -1;
179    private boolean allowCoreThreadTimeout = false;
180    private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
181    private ExecutorType executorType;
182
183    public ExecutorConfig setExecutorType(ExecutorType type) {
184      this.executorType = type;
185      return this;
186    }
187
188    private ExecutorType getExecutorType() {
189      return Preconditions.checkNotNull(executorType, "ExecutorType not set.");
190    }
191
192    public int getCorePoolSize() {
193      return corePoolSize;
194    }
195
196    public ExecutorConfig setCorePoolSize(int corePoolSize) {
197      this.corePoolSize = corePoolSize;
198      return this;
199    }
200
201    public boolean allowCoreThreadTimeout() {
202      return allowCoreThreadTimeout;
203    }
204
205    /**
206     * Allows timing out of core threads. Good to set this for non-critical thread pools for release
207     * of unused resources. Refer to {@link ThreadPoolExecutor#allowCoreThreadTimeOut} for
208     * additional details.
209     */
210    public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
211      this.allowCoreThreadTimeout = allowCoreThreadTimeout;
212      return this;
213    }
214
215    /**
216     * Returns the executor name inferred from the type and the servername on which this is running.
217     */
218    public String getName() {
219      return getExecutorType().getExecutorName(servername);
220    }
221
222    public long getKeepAliveTimeMillis() {
223      return keepAliveTimeMillis;
224    }
225
226    public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) {
227      this.keepAliveTimeMillis = keepAliveTimeMillis;
228      return this;
229    }
230  }
231
232  /**
233   * Executor instance.
234   */
235  static class Executor {
236    // the thread pool executor that services the requests
237    final TrackingThreadPoolExecutor threadPoolExecutor;
238    // work queue to use - unbounded queue
239    final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>();
240    private final String name;
241    private static final AtomicLong seqids = new AtomicLong(0);
242    private final long id;
243
244    protected Executor(ExecutorConfig config) {
245      this.id = seqids.incrementAndGet();
246      this.name = config.getName();
247      // create the thread pool executor
248      this.threadPoolExecutor = new TrackingThreadPoolExecutor(
249        // setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue.
250        config.getCorePoolSize(), config.getCorePoolSize(), config.getKeepAliveTimeMillis(),
251        TimeUnit.MILLISECONDS, q);
252      this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout());
253      // name the threads for this threadpool
254      ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
255      tfb.setNameFormat(this.name + "-%d");
256      tfb.setDaemon(true);
257      tfb.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
258      this.threadPoolExecutor.setThreadFactory(tfb.build());
259    }
260
261    /**
262     * Submit the event to the queue for handling.
263     */
264    void submit(final EventHandler event) {
265      // If there is a listener for this type, make sure we call the before
266      // and after process methods.
267      this.threadPoolExecutor.execute(event);
268    }
269
270    TrackingThreadPoolExecutor getThreadPoolExecutor() {
271      return threadPoolExecutor;
272    }
273
274    @Override
275    public String toString() {
276      return getClass().getSimpleName() + "-" + id + "-" + name;
277    }
278
279    public ExecutorStatus getStatus() {
280      List<EventHandler> queuedEvents = Lists.newArrayList();
281      for (Runnable r : q) {
282        if (!(r instanceof EventHandler)) {
283          LOG.warn("Non-EventHandler " + r + " queued in " + name);
284          continue;
285        }
286        queuedEvents.add((EventHandler) r);
287      }
288
289      List<RunningEventStatus> running = Lists.newArrayList();
290      for (Map.Entry<Thread, Runnable> e : threadPoolExecutor.getRunningTasks().entrySet()) {
291        Runnable r = e.getValue();
292        if (!(r instanceof EventHandler)) {
293          LOG.warn("Non-EventHandler " + r + " running in " + name);
294          continue;
295        }
296        running.add(new RunningEventStatus(e.getKey(), (EventHandler) r));
297      }
298
299      return new ExecutorStatus(this, queuedEvents, running);
300    }
301  }
302
303  /**
304   * A subclass of ThreadPoolExecutor that keeps track of the Runnables that are executing at any
305   * given point in time.
306   */
307  static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
308    private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
309
310    public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
311      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
312      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
313    }
314
315    @Override
316    protected void afterExecute(Runnable r, Throwable t) {
317      super.afterExecute(r, t);
318      running.remove(Thread.currentThread());
319    }
320
321    @Override
322    protected void beforeExecute(Thread t, Runnable r) {
323      Runnable oldPut = running.put(t, r);
324      assert oldPut == null : "inconsistency for thread " + t;
325      super.beforeExecute(t, r);
326    }
327
328    /**
329     * @return a map of the threads currently running tasks inside this executor. Each key is an
330     *         active thread, and the value is the task that is currently running. Note that this is
331     *         not a stable snapshot of the map.
332     */
333    public ConcurrentMap<Thread, Runnable> getRunningTasks() {
334      return running;
335    }
336  }
337
338  /**
339   * A snapshot of the status of a particular executor. This includes the contents of the executor's
340   * pending queue, as well as the threads and events currently being processed. This is a
341   * consistent snapshot that is immutable once constructed.
342   */
343  public static class ExecutorStatus {
344    final Executor executor;
345    final List<EventHandler> queuedEvents;
346    final List<RunningEventStatus> running;
347
348    ExecutorStatus(Executor executor, List<EventHandler> queuedEvents,
349      List<RunningEventStatus> running) {
350      this.executor = executor;
351      this.queuedEvents = queuedEvents;
352      this.running = running;
353    }
354
355    public List<EventHandler> getQueuedEvents() {
356      return queuedEvents;
357    }
358
359    public List<RunningEventStatus> getRunning() {
360      return running;
361    }
362
363    /**
364     * Dump a textual representation of the executor's status to the given writer.
365     * @param out    the stream to write to
366     * @param indent a string prefix for each line, used for indentation
367     */
368    public void dumpTo(Writer out, String indent) throws IOException {
369      out.write(indent + "Status for executor: " + executor + "\n");
370      out.write(indent + "=======================================\n");
371      out.write(indent + queuedEvents.size() + " events queued, " + running.size() + " running\n");
372      if (!queuedEvents.isEmpty()) {
373        out.write(indent + "Queued:\n");
374        for (EventHandler e : queuedEvents) {
375          out.write(indent + "  " + e + "\n");
376        }
377        out.write("\n");
378      }
379      if (!running.isEmpty()) {
380        out.write(indent + "Running:\n");
381        for (RunningEventStatus stat : running) {
382          out.write(indent + "  Running on thread '" + stat.threadInfo.getThreadName() + "': "
383            + stat.event + "\n");
384          out.write(ThreadMonitoring.formatThreadInfo(stat.threadInfo, indent + "  "));
385          out.write("\n");
386        }
387      }
388      out.flush();
389    }
390  }
391
392  /**
393   * The status of a particular event that is in the middle of being handled by an executor.
394   */
395  public static class RunningEventStatus {
396    final ThreadInfo threadInfo;
397    final EventHandler event;
398
399    public RunningEventStatus(Thread t, EventHandler event) {
400      this.threadInfo = ThreadMonitoring.getThreadInfo(t);
401      this.event = event;
402    }
403  }
404}