001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.LinkedHashMap;
024import java.util.Map.Entry;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.ScheduledThreadPoolExecutor;
027import java.util.concurrent.ThreadFactory;
028import java.util.concurrent.atomic.AtomicInteger;
029
030import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
036
037/**
038 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
039 * periodically while sharing threads. The ChoreService is backed by a
040 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
041 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
042 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
043 * <p>
044 * The ChoreService provides the ability to schedule, cancel, and trigger instances of
045 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
046 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
047 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
048 * there may be a need to increase the number of threads if it is noticed that chores are no longer
049 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
050 * made to reduce the number of running threads to see if chores can still meet their start times
051 * with a smaller thread pool.
052 * <p>
053 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
054 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
055 */
056@InterfaceAudience.Public
057public class ChoreService implements ChoreServicer {
058  private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class);
059
060  /**
061   * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
062   */
063  @InterfaceAudience.Private
064  public final static int MIN_CORE_POOL_SIZE = 1;
065
066  /**
067   * This thread pool is used to schedule all of the Chores
068   */
069  private final ScheduledThreadPoolExecutor scheduler;
070
071  /**
072   * Maps chores to their futures. Futures are used to control a chore's schedule
073   */
074  private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
075
076  /**
077   * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
078   * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
079   * increase the core pool size by 1 (otherwise a single long running chore whose execution is
080   * longer than its period would be able to spawn too many threads).
081   */
082  private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
083
084  /**
085   * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
086   * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
087   * running on. The prefix is useful because it allows us to monitor how the thread pool of a
088   * particular service changes over time VIA thread dumps.
089   */
090  private final String coreThreadPoolPrefix;
091
092  /**
093   *
094   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
095   *          spawned by this service
096   */
097  @InterfaceAudience.Private
098  @VisibleForTesting
099  public ChoreService(final String coreThreadPoolPrefix) {
100    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
101  }
102
103  /**
104   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
105   *          spawned by this service
106   * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
107   *               to true this will add -10% to 10% jitter.
108   */
109  public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
110    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
111  }
112
113  /**
114   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
115   *          spawned by this service
116   * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor
117   *          to during initialization. The default size is 1, but specifying a larger size may be
118   *          beneficial if you know that 1 thread will not be enough.
119   * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
120   *               to true this will add -10% to 10% jitter.
121   */
122  public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
123    this.coreThreadPoolPrefix = coreThreadPoolPrefix;
124    if (corePoolSize < MIN_CORE_POOL_SIZE)  {
125      corePoolSize = MIN_CORE_POOL_SIZE;
126    }
127
128    final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
129    if (jitter) {
130      scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
131    } else {
132      scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
133    }
134
135    scheduler.setRemoveOnCancelPolicy(true);
136    scheduledChores = new HashMap<>();
137    choresMissingStartTime = new HashMap<>();
138  }
139
140  /**
141   * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
142   *          instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
143   *          with a single ChoreService instance).
144   * @return true when the chore was successfully scheduled. false when the scheduling failed
145   *         (typically occurs when a chore is scheduled during shutdown of service)
146   */
147  public synchronized boolean scheduleChore(ScheduledChore chore) {
148    if (chore == null) {
149      return false;
150    }
151
152    try {
153      if (chore.getPeriod() <= 0) {
154        LOG.info("Chore {} is disabled because its period is not positive.", chore);
155        return false;
156      }
157      LOG.info("Chore {} is enabled.", chore);
158      chore.setChoreServicer(this);
159      ScheduledFuture<?> future =
160          scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
161            chore.getTimeUnit());
162      scheduledChores.put(chore, future);
163      return true;
164    } catch (Exception exception) {
165      if (LOG.isInfoEnabled()) {
166        LOG.info("Could not successfully schedule chore: " + chore.getName());
167      }
168      return false;
169    }
170  }
171
172  /**
173   * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
174   *          yet then this call is equivalent to a call to scheduleChore.
175   */
176  private synchronized void rescheduleChore(ScheduledChore chore) {
177    if (chore == null) return;
178
179    if (scheduledChores.containsKey(chore)) {
180      ScheduledFuture<?> future = scheduledChores.get(chore);
181      future.cancel(false);
182    }
183    scheduleChore(chore);
184  }
185
186  @InterfaceAudience.Private
187  @Override
188  public synchronized void cancelChore(ScheduledChore chore) {
189    cancelChore(chore, true);
190  }
191
192  @InterfaceAudience.Private
193  @Override
194  public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
195    if (chore != null && scheduledChores.containsKey(chore)) {
196      ScheduledFuture<?> future = scheduledChores.get(chore);
197      future.cancel(mayInterruptIfRunning);
198      scheduledChores.remove(chore);
199
200      // Removing a chore that was missing its start time means it may be possible
201      // to reduce the number of threads
202      if (choresMissingStartTime.containsKey(chore)) {
203        choresMissingStartTime.remove(chore);
204        requestCorePoolDecrease();
205      }
206    }
207  }
208
209  @InterfaceAudience.Private
210  @Override
211  public synchronized boolean isChoreScheduled(ScheduledChore chore) {
212    return chore != null && scheduledChores.containsKey(chore)
213        && !scheduledChores.get(chore).isDone();
214  }
215
216  @InterfaceAudience.Private
217  @Override
218  public synchronized boolean triggerNow(ScheduledChore chore) {
219    if (chore == null) {
220      return false;
221    } else {
222      rescheduleChore(chore);
223      return true;
224    }
225  }
226
227  /**
228   * @return number of chores that this service currently has scheduled
229   */
230  int getNumberOfScheduledChores() {
231    return scheduledChores.size();
232  }
233
234  /**
235   * @return number of chores that this service currently has scheduled that are missing their
236   *         scheduled start time
237   */
238  int getNumberOfChoresMissingStartTime() {
239    return choresMissingStartTime.size();
240  }
241
242  /**
243   * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
244   */
245  int getCorePoolSize() {
246    return scheduler.getCorePoolSize();
247  }
248
249  /**
250   * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
251   * daemon threads, and thus, don't prevent the JVM from shutting down
252   */
253  static class ChoreServiceThreadFactory implements ThreadFactory {
254    private final String threadPrefix;
255    private final static String THREAD_NAME_SUFFIX = ".Chore.";
256    private AtomicInteger threadNumber = new AtomicInteger(1);
257
258    /**
259     * @param threadPrefix The prefix given to all threads created by this factory
260     */
261    public ChoreServiceThreadFactory(final String threadPrefix) {
262      this.threadPrefix = threadPrefix;
263    }
264
265    @Override
266    public Thread newThread(Runnable r) {
267      Thread thread =
268          new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
269      thread.setDaemon(true);
270      return thread;
271    }
272  }
273
274  /**
275   * Represents a request to increase the number of core pool threads. Typically a request
276   * originates from the fact that the current core pool size is not sufficient to service all of
277   * the currently running Chores
278   * @return true when the request to increase the core pool size succeeds
279   */
280  private synchronized boolean requestCorePoolIncrease() {
281    // There is no point in creating more threads than scheduledChores.size since scheduled runs
282    // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
283    // amongst occurrences of the same chore).
284    if (scheduler.getCorePoolSize() < scheduledChores.size()) {
285      scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
286      printChoreServiceDetails("requestCorePoolIncrease");
287      return true;
288    }
289    return false;
290  }
291
292  /**
293   * Represents a request to decrease the number of core pool threads. Typically a request
294   * originates from the fact that the current core pool size is more than sufficient to service the
295   * running Chores.
296   */
297  private synchronized void requestCorePoolDecrease() {
298    if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
299      scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
300      printChoreServiceDetails("requestCorePoolDecrease");
301    }
302  }
303
304  @InterfaceAudience.Private
305  @Override
306  public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
307    if (chore == null || !scheduledChores.containsKey(chore)) return;
308
309    // If the chore has not caused an increase in the size of the core thread pool then request an
310    // increase. This allows each chore missing its start time to increase the core pool size by
311    // at most 1.
312    if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
313      choresMissingStartTime.put(chore, requestCorePoolIncrease());
314    }
315
316    // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
317    // the chore is NOT rescheduled, future executions of this chore will be delayed more and
318    // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
319    // idle threads to chores based on how delayed they are.
320    rescheduleChore(chore);
321    printChoreDetails("onChoreMissedStartTime", chore);
322  }
323
324  /**
325   * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
326   * in the middle of execution will be interrupted and shutdown. This service will be unusable
327   * after this method has been called (i.e. future scheduling attempts will fail).
328   */
329  public synchronized void shutdown() {
330    scheduler.shutdownNow();
331    if (LOG.isInfoEnabled()) {
332      LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
333          + " on shutdown");
334    }
335    cancelAllChores(true);
336    scheduledChores.clear();
337    choresMissingStartTime.clear();
338  }
339
340  /**
341   * @return true when the service is shutdown and thus cannot be used anymore
342   */
343  public boolean isShutdown() {
344    return scheduler.isShutdown();
345  }
346
347  /**
348   * @return true when the service is shutdown and all threads have terminated
349   */
350  public boolean isTerminated() {
351    return scheduler.isTerminated();
352  }
353
354  private void cancelAllChores(final boolean mayInterruptIfRunning) {
355    ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet().size());
356    // Build list of chores to cancel so we can iterate through a set that won't change
357    // as chores are cancelled. If we tried to cancel each chore while iterating through
358    // keySet the results would be undefined because the keySet would be changing
359    for (ScheduledChore chore : scheduledChores.keySet()) {
360      choresToCancel.add(chore);
361    }
362    for (ScheduledChore chore : choresToCancel) {
363      cancelChore(chore, mayInterruptIfRunning);
364    }
365    choresToCancel.clear();
366  }
367
368  /**
369   * Prints a summary of important details about the chore. Used for debugging purposes
370   */
371  private void printChoreDetails(final String header, ScheduledChore chore) {
372    LinkedHashMap<String, String> output = new LinkedHashMap<>();
373    output.put(header, "");
374    output.put("Chore name: ", chore.getName());
375    output.put("Chore period: ", Integer.toString(chore.getPeriod()));
376    output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
377
378    for (Entry<String, String> entry : output.entrySet()) {
379      if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
380    }
381  }
382
383  /**
384   * Prints a summary of important details about the service. Used for debugging purposes
385   */
386  private void printChoreServiceDetails(final String header) {
387    LinkedHashMap<String, String> output = new LinkedHashMap<>();
388    output.put(header, "");
389    output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
390    output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
391    output.put("ChoreService missingStartTimeCount: ",
392      Integer.toString(getNumberOfChoresMissingStartTime()));
393
394    for (Entry<String, String> entry : output.entrySet()) {
395      if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
396    }
397  }
398}