001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.executor;
019
020import java.io.IOException;
021import java.io.Writer;
022import java.lang.management.ThreadInfo;
023import java.util.List;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.Executors;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
040import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
041import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046
047/**
048 * This is a generic executor service. This component abstracts a threadpool, a queue to which
049 * {@link EventType}s can be submitted, and a <code>Runnable</code> that handles the object that is
050 * added to the queue.
051 * <p>
052 * In order to create a new service, create an instance of this class and then do:
053 * <code>instance.startExecutorService(executorConfig);</code>. {@link ExecutorConfig} wraps the
054 * configuration needed by this service. When done call {@link #shutdown()}.
055 * <p>
056 * In order to use the service created above, call {@link #submit(EventHandler)}.
057 */
058@InterfaceAudience.Private
059public class ExecutorService {
060  private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
061
062  // hold the all the executors created in a map addressable by their names
063  private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>();
064
065  // Name of the server hosting this executor service.
066  private final String servername;
067
068  private final ListeningScheduledExecutorService delayedSubmitTimer =
069    MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
070      .setDaemon(true).setNameFormat("Event-Executor-Delay-Submit-Timer").build()));
071
072  /**
073   * Default constructor.
074   * @param servername Name of the hosting server.
075   */
076  public ExecutorService(final String servername) {
077    this.servername = servername;
078  }
079
080  /**
081   * Start an executor service with a given name. If there was a service already started with the
082   * same name, this throws a RuntimeException.
083   * @param config Configuration to use for the executor.
084   */
085  public void startExecutorService(final ExecutorConfig config) {
086    final String name = config.getName();
087    Executor hbes = this.executorMap.compute(name, (key, value) -> {
088      if (value != null) {
089        throw new RuntimeException(
090          "An executor service with the name " + key + " is already running!");
091      }
092      return new Executor(config);
093    });
094
095    LOG.debug("Starting executor service name={}, corePoolSize={}, maxPoolSize={}", name,
096      hbes.threadPoolExecutor.getCorePoolSize(), hbes.threadPoolExecutor.getMaximumPoolSize());
097  }
098
099  boolean isExecutorServiceRunning(String name) {
100    return this.executorMap.containsKey(name);
101  }
102
103  public void shutdown() {
104    this.delayedSubmitTimer.shutdownNow();
105    for (Entry<String, Executor> entry : this.executorMap.entrySet()) {
106      List<Runnable> wasRunning = entry.getValue().threadPoolExecutor.shutdownNow();
107      if (!wasRunning.isEmpty()) {
108        LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
109      }
110    }
111    this.executorMap.clear();
112  }
113
114  Executor getExecutor(final ExecutorType type) {
115    return getExecutor(type.getExecutorName(this.servername));
116  }
117
118  Executor getExecutor(String name) {
119    return this.executorMap.get(name);
120  }
121
122  public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
123    return getExecutor(type).getThreadPoolExecutor();
124  }
125
126  /**
127   * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
128   * paths should use this method to get the executor, should not start executor by using
129   * {@link ExecutorService#startExecutorService(ExecutorConfig)}
130   */
131  public ThreadPoolExecutor getExecutorLazily(ExecutorConfig config) {
132    return executorMap.computeIfAbsent(config.getName(), (executorName) -> new Executor(config))
133      .getThreadPoolExecutor();
134  }
135
136  public void submit(final EventHandler eh) {
137    Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
138    if (executor == null) {
139      // This happens only when events are submitted after shutdown() was
140      // called, so dropping them should be "ok" since it means we're
141      // shutting down.
142      LOG.error("Cannot submit [" + eh + "] because the executor is missing."
143        + " Is this process shutting down?");
144    } else {
145      executor.submit(eh);
146    }
147  }
148
149  // Submit the handler after the given delay. Used for retrying.
150  public void delayedSubmit(EventHandler eh, long delay, TimeUnit unit) {
151    ListenableFuture<?> future = delayedSubmitTimer.schedule(() -> submit(eh), delay, unit);
152    future.addListener(() -> {
153      try {
154        future.get();
155      } catch (Exception e) {
156        LOG.error("Failed to submit the event handler {} to executor", eh, e);
157      }
158    }, MoreExecutors.directExecutor());
159  }
160
161  public Map<String, ExecutorStatus> getAllExecutorStatuses() {
162    Map<String, ExecutorStatus> ret = Maps.newHashMap();
163    for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
164      ret.put(e.getKey(), e.getValue().getStatus());
165    }
166    return ret;
167  }
168
169  /**
170   * Configuration wrapper for {@link Executor}.
171   */
172  public class ExecutorConfig {
173    // Refer to ThreadPoolExecutor javadoc for details of these configuration.
174    // Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
175    // implementation.
176    public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000;
177    private int corePoolSize = -1;
178    private boolean allowCoreThreadTimeout = false;
179    private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
180    private ExecutorType executorType;
181
182    public ExecutorConfig setExecutorType(ExecutorType type) {
183      this.executorType = type;
184      return this;
185    }
186
187    private ExecutorType getExecutorType() {
188      return Preconditions.checkNotNull(executorType, "ExecutorType not set.");
189    }
190
191    public int getCorePoolSize() {
192      return corePoolSize;
193    }
194
195    public ExecutorConfig setCorePoolSize(int corePoolSize) {
196      this.corePoolSize = corePoolSize;
197      return this;
198    }
199
200    public boolean allowCoreThreadTimeout() {
201      return allowCoreThreadTimeout;
202    }
203
204    /**
205     * Allows timing out of core threads. Good to set this for non-critical thread pools for release
206     * of unused resources. Refer to {@link ThreadPoolExecutor#allowCoreThreadTimeOut} for
207     * additional details.
208     */
209    public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
210      this.allowCoreThreadTimeout = allowCoreThreadTimeout;
211      return this;
212    }
213
214    /**
215     * Returns the executor name inferred from the type and the servername on which this is running.
216     */
217    public String getName() {
218      return getExecutorType().getExecutorName(servername);
219    }
220
221    public long getKeepAliveTimeMillis() {
222      return keepAliveTimeMillis;
223    }
224
225    public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) {
226      this.keepAliveTimeMillis = keepAliveTimeMillis;
227      return this;
228    }
229  }
230
231  /**
232   * Executor instance.
233   */
234  static class Executor {
235    // the thread pool executor that services the requests
236    final TrackingThreadPoolExecutor threadPoolExecutor;
237    // work queue to use - unbounded queue
238    final BlockingQueue<Runnable> q = new LinkedBlockingQueue<>();
239    private final String name;
240    private static final AtomicLong seqids = new AtomicLong(0);
241    private final long id;
242
243    protected Executor(ExecutorConfig config) {
244      this.id = seqids.incrementAndGet();
245      this.name = config.getName();
246      // create the thread pool executor
247      this.threadPoolExecutor = new TrackingThreadPoolExecutor(
248        // setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue.
249        config.getCorePoolSize(), config.getCorePoolSize(), config.getKeepAliveTimeMillis(),
250        TimeUnit.MILLISECONDS, q);
251      this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout());
252      // name the threads for this threadpool
253      ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
254      tfb.setNameFormat(this.name + "-%d");
255      tfb.setDaemon(true);
256      this.threadPoolExecutor.setThreadFactory(tfb.build());
257    }
258
259    /**
260     * Submit the event to the queue for handling.
261     */
262    void submit(final EventHandler event) {
263      // If there is a listener for this type, make sure we call the before
264      // and after process methods.
265      this.threadPoolExecutor.execute(event);
266    }
267
268    TrackingThreadPoolExecutor getThreadPoolExecutor() {
269      return threadPoolExecutor;
270    }
271
272    @Override
273    public String toString() {
274      return getClass().getSimpleName() + "-" + id + "-" + name;
275    }
276
277    public ExecutorStatus getStatus() {
278      List<EventHandler> queuedEvents = Lists.newArrayList();
279      for (Runnable r : q) {
280        if (!(r instanceof EventHandler)) {
281          LOG.warn("Non-EventHandler " + r + " queued in " + name);
282          continue;
283        }
284        queuedEvents.add((EventHandler) r);
285      }
286
287      List<RunningEventStatus> running = Lists.newArrayList();
288      for (Map.Entry<Thread, Runnable> e : threadPoolExecutor.getRunningTasks().entrySet()) {
289        Runnable r = e.getValue();
290        if (!(r instanceof EventHandler)) {
291          LOG.warn("Non-EventHandler " + r + " running in " + name);
292          continue;
293        }
294        running.add(new RunningEventStatus(e.getKey(), (EventHandler) r));
295      }
296
297      return new ExecutorStatus(this, queuedEvents, running);
298    }
299  }
300
301  /**
302   * A subclass of ThreadPoolExecutor that keeps track of the Runnables that are executing at any
303   * given point in time.
304   */
305  static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
306    private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
307
308    public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
309      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
310      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
311    }
312
313    @Override
314    protected void afterExecute(Runnable r, Throwable t) {
315      super.afterExecute(r, t);
316      running.remove(Thread.currentThread());
317    }
318
319    @Override
320    protected void beforeExecute(Thread t, Runnable r) {
321      Runnable oldPut = running.put(t, r);
322      assert oldPut == null : "inconsistency for thread " + t;
323      super.beforeExecute(t, r);
324    }
325
326    /**
327     * @return a map of the threads currently running tasks inside this executor. Each key is an
328     *         active thread, and the value is the task that is currently running. Note that this is
329     *         not a stable snapshot of the map.
330     */
331    public ConcurrentMap<Thread, Runnable> getRunningTasks() {
332      return running;
333    }
334  }
335
336  /**
337   * A snapshot of the status of a particular executor. This includes the contents of the executor's
338   * pending queue, as well as the threads and events currently being processed. This is a
339   * consistent snapshot that is immutable once constructed.
340   */
341  public static class ExecutorStatus {
342    final Executor executor;
343    final List<EventHandler> queuedEvents;
344    final List<RunningEventStatus> running;
345
346    ExecutorStatus(Executor executor, List<EventHandler> queuedEvents,
347      List<RunningEventStatus> running) {
348      this.executor = executor;
349      this.queuedEvents = queuedEvents;
350      this.running = running;
351    }
352
353    public List<EventHandler> getQueuedEvents() {
354      return queuedEvents;
355    }
356
357    public List<RunningEventStatus> getRunning() {
358      return running;
359    }
360
361    /**
362     * Dump a textual representation of the executor's status to the given writer.
363     * @param out    the stream to write to
364     * @param indent a string prefix for each line, used for indentation
365     */
366    public void dumpTo(Writer out, String indent) throws IOException {
367      out.write(indent + "Status for executor: " + executor + "\n");
368      out.write(indent + "=======================================\n");
369      out.write(indent + queuedEvents.size() + " events queued, " + running.size() + " running\n");
370      if (!queuedEvents.isEmpty()) {
371        out.write(indent + "Queued:\n");
372        for (EventHandler e : queuedEvents) {
373          out.write(indent + "  " + e + "\n");
374        }
375        out.write("\n");
376      }
377      if (!running.isEmpty()) {
378        out.write(indent + "Running:\n");
379        for (RunningEventStatus stat : running) {
380          out.write(indent + "  Running on thread '" + stat.threadInfo.getThreadName() + "': "
381            + stat.event + "\n");
382          out.write(ThreadMonitoring.formatThreadInfo(stat.threadInfo, indent + "  "));
383          out.write("\n");
384        }
385      }
386      out.flush();
387    }
388  }
389
390  /**
391   * The status of a particular event that is in the middle of being handled by an executor.
392   */
393  public static class RunningEventStatus {
394    final ThreadInfo threadInfo;
395    final EventHandler event;
396
397    public RunningEventStatus(Thread t, EventHandler event) {
398      this.threadInfo = ThreadMonitoring.getThreadInfo(t);
399      this.event = event;
400    }
401  }
402}