001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.LinkedHashMap;
024import java.util.Map.Entry;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.ScheduledThreadPoolExecutor;
027import java.util.concurrent.ThreadFactory;
028import java.util.concurrent.atomic.AtomicInteger;
029
030import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
037 * periodically while sharing threads. The ChoreService is backed by a
038 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
039 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
040 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
041 * <p>
042 * The ChoreService provides the ability to schedule, cancel, and trigger instances of
043 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
044 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
045 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
046 * there may be a need to increase the number of threads if it is noticed that chores are no longer
047 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
048 * made to reduce the number of running threads to see if chores can still meet their start times
049 * with a smaller thread pool.
050 * <p>
051 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
052 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
053 */
054@InterfaceAudience.Public
055public class ChoreService implements ChoreServicer {
056  private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class);
057
058  /**
059   * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
060   */
061  @InterfaceAudience.Private
062  public final static int MIN_CORE_POOL_SIZE = 1;
063
064  /**
065   * This thread pool is used to schedule all of the Chores
066   */
067  private final ScheduledThreadPoolExecutor scheduler;
068
069  /**
070   * Maps chores to their futures. Futures are used to control a chore's schedule
071   */
072  private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
073
074  /**
075   * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
076   * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
077   * increase the core pool size by 1 (otherwise a single long running chore whose execution is
078   * longer than its period would be able to spawn too many threads).
079   */
080  private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
081
082  /**
083   * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
084   * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
085   * running on. The prefix is useful because it allows us to monitor how the thread pool of a
086   * particular service changes over time VIA thread dumps.
087   */
088  private final String coreThreadPoolPrefix;
089
090  /**
091   *
092   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
093   *          spawned by this service
094   */
095  @InterfaceAudience.Private
096  public ChoreService(final String coreThreadPoolPrefix) {
097    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
098  }
099
100  /**
101   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
102   *          spawned by this service
103   * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
104   *               to true this will add -10% to 10% jitter.
105   */
106  public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
107    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
108  }
109
110  /**
111   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
112   *          spawned by this service
113   * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor
114   *          to during initialization. The default size is 1, but specifying a larger size may be
115   *          beneficial if you know that 1 thread will not be enough.
116   * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
117   *               to true this will add -10% to 10% jitter.
118   */
119  public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
120    this.coreThreadPoolPrefix = coreThreadPoolPrefix;
121    if (corePoolSize < MIN_CORE_POOL_SIZE)  {
122      corePoolSize = MIN_CORE_POOL_SIZE;
123    }
124
125    final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
126    if (jitter) {
127      scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
128    } else {
129      scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
130    }
131
132    scheduler.setRemoveOnCancelPolicy(true);
133    scheduledChores = new HashMap<>();
134    choresMissingStartTime = new HashMap<>();
135  }
136
137  /**
138   * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
139   *          instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
140   *          with a single ChoreService instance).
141   * @return true when the chore was successfully scheduled. false when the scheduling failed
142   *         (typically occurs when a chore is scheduled during shutdown of service)
143   */
144  public synchronized boolean scheduleChore(ScheduledChore chore) {
145    if (chore == null) {
146      return false;
147    }
148
149    try {
150      if (chore.getPeriod() <= 0) {
151        LOG.info("Chore {} is disabled because its period is not positive.", chore);
152        return false;
153      }
154      LOG.info("Chore {} is enabled.", chore);
155      chore.setChoreServicer(this);
156      ScheduledFuture<?> future =
157          scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
158            chore.getTimeUnit());
159      scheduledChores.put(chore, future);
160      return true;
161    } catch (Exception exception) {
162      if (LOG.isInfoEnabled()) {
163        LOG.info("Could not successfully schedule chore: " + chore.getName());
164      }
165      return false;
166    }
167  }
168
169  /**
170   * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
171   *          yet then this call is equivalent to a call to scheduleChore.
172   */
173  private void rescheduleChore(ScheduledChore chore) {
174    if (scheduledChores.containsKey(chore)) {
175      ScheduledFuture<?> future = scheduledChores.get(chore);
176      future.cancel(false);
177    }
178    scheduleChore(chore);
179  }
180
181  @InterfaceAudience.Private
182  @Override
183  public synchronized void cancelChore(ScheduledChore chore) {
184    cancelChore(chore, true);
185  }
186
187  @InterfaceAudience.Private
188  @Override
189  public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
190    if (chore != null && scheduledChores.containsKey(chore)) {
191      ScheduledFuture<?> future = scheduledChores.get(chore);
192      future.cancel(mayInterruptIfRunning);
193      scheduledChores.remove(chore);
194
195      // Removing a chore that was missing its start time means it may be possible
196      // to reduce the number of threads
197      if (choresMissingStartTime.containsKey(chore)) {
198        choresMissingStartTime.remove(chore);
199        requestCorePoolDecrease();
200      }
201    }
202  }
203
204  @InterfaceAudience.Private
205  @Override
206  public synchronized boolean isChoreScheduled(ScheduledChore chore) {
207    return chore != null && scheduledChores.containsKey(chore)
208        && !scheduledChores.get(chore).isDone();
209  }
210
211  @InterfaceAudience.Private
212  @Override
213  public synchronized boolean triggerNow(ScheduledChore chore) {
214    if (chore != null) {
215      rescheduleChore(chore);
216      return true;
217    }
218    return false;
219  }
220
221  /**
222   * @return number of chores that this service currently has scheduled
223   */
224  int getNumberOfScheduledChores() {
225    return scheduledChores.size();
226  }
227
228  /**
229   * @return number of chores that this service currently has scheduled that are missing their
230   *         scheduled start time
231   */
232  int getNumberOfChoresMissingStartTime() {
233    return choresMissingStartTime.size();
234  }
235
236  /**
237   * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
238   */
239  int getCorePoolSize() {
240    return scheduler.getCorePoolSize();
241  }
242
243  /**
244   * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
245   * daemon threads, and thus, don't prevent the JVM from shutting down
246   */
247  static class ChoreServiceThreadFactory implements ThreadFactory {
248    private final String threadPrefix;
249    private final static String THREAD_NAME_SUFFIX = ".Chore.";
250    private AtomicInteger threadNumber = new AtomicInteger(1);
251
252    /**
253     * @param threadPrefix The prefix given to all threads created by this factory
254     */
255    public ChoreServiceThreadFactory(final String threadPrefix) {
256      this.threadPrefix = threadPrefix;
257    }
258
259    @Override
260    public Thread newThread(Runnable r) {
261      Thread thread =
262          new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
263      thread.setDaemon(true);
264      return thread;
265    }
266  }
267
268  /**
269   * Represents a request to increase the number of core pool threads. Typically a request
270   * originates from the fact that the current core pool size is not sufficient to service all of
271   * the currently running Chores
272   * @return true when the request to increase the core pool size succeeds
273   */
274  private synchronized boolean requestCorePoolIncrease() {
275    // There is no point in creating more threads than scheduledChores.size since scheduled runs
276    // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
277    // amongst occurrences of the same chore).
278    if (scheduler.getCorePoolSize() < scheduledChores.size()) {
279      scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
280      printChoreServiceDetails("requestCorePoolIncrease");
281      return true;
282    }
283    return false;
284  }
285
286  /**
287   * Represents a request to decrease the number of core pool threads. Typically a request
288   * originates from the fact that the current core pool size is more than sufficient to service the
289   * running Chores.
290   */
291  private synchronized void requestCorePoolDecrease() {
292    if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
293      scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
294      printChoreServiceDetails("requestCorePoolDecrease");
295    }
296  }
297
298  @InterfaceAudience.Private
299  @Override
300  public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
301    if (chore == null || !scheduledChores.containsKey(chore)) return;
302
303    // If the chore has not caused an increase in the size of the core thread pool then request an
304    // increase. This allows each chore missing its start time to increase the core pool size by
305    // at most 1.
306    if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
307      choresMissingStartTime.put(chore, requestCorePoolIncrease());
308    }
309
310    // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
311    // the chore is NOT rescheduled, future executions of this chore will be delayed more and
312    // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
313    // idle threads to chores based on how delayed they are.
314    rescheduleChore(chore);
315    printChoreDetails("onChoreMissedStartTime", chore);
316  }
317
318  /**
319   * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
320   * in the middle of execution will be interrupted and shutdown. This service will be unusable
321   * after this method has been called (i.e. future scheduling attempts will fail).
322   */
323  public synchronized void shutdown() {
324    scheduler.shutdownNow();
325    if (LOG.isInfoEnabled()) {
326      LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
327          + " on shutdown");
328    }
329    cancelAllChores(true);
330    scheduledChores.clear();
331    choresMissingStartTime.clear();
332  }
333
334  /**
335   * @return true when the service is shutdown and thus cannot be used anymore
336   */
337  public boolean isShutdown() {
338    return scheduler.isShutdown();
339  }
340
341  /**
342   * @return true when the service is shutdown and all threads have terminated
343   */
344  public boolean isTerminated() {
345    return scheduler.isTerminated();
346  }
347
348  private void cancelAllChores(final boolean mayInterruptIfRunning) {
349    // Build list of chores to cancel so we can iterate through a set that won't change
350    // as chores are cancelled. If we tried to cancel each chore while iterating through
351    // keySet the results would be undefined because the keySet would be changing
352    ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet());
353
354    for (ScheduledChore chore : choresToCancel) {
355      cancelChore(chore, mayInterruptIfRunning);
356    }
357  }
358
359  /**
360   * Prints a summary of important details about the chore. Used for debugging purposes
361   */
362  private void printChoreDetails(final String header, ScheduledChore chore) {
363    LinkedHashMap<String, String> output = new LinkedHashMap<>();
364    output.put(header, "");
365    output.put("Chore name: ", chore.getName());
366    output.put("Chore period: ", Integer.toString(chore.getPeriod()));
367    output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
368
369    for (Entry<String, String> entry : output.entrySet()) {
370      if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
371    }
372  }
373
374  /**
375   * Prints a summary of important details about the service. Used for debugging purposes
376   */
377  private void printChoreServiceDetails(final String header) {
378    LinkedHashMap<String, String> output = new LinkedHashMap<>();
379    output.put(header, "");
380    output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
381    output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
382    output.put("ChoreService missingStartTimeCount: ",
383      Integer.toString(getNumberOfChoresMissingStartTime()));
384
385    for (Entry<String, String> entry : output.entrySet()) {
386      if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
387    }
388  }
389}