001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.LinkedHashMap;
024import java.util.Map.Entry;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.ScheduledThreadPoolExecutor;
027import java.util.concurrent.ThreadFactory;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.hbase.trace.TraceUtil;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
036 * periodically while sharing threads. The ChoreService is backed by a
037 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
038 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
039 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
040 * <p>
041 * The ChoreService provides the ability to schedule, cancel, and trigger instances of
042 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
043 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
044 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
045 * there may be a need to increase the number of threads if it is noticed that chores are no longer
046 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
047 * made to reduce the number of running threads to see if chores can still meet their start times
048 * with a smaller thread pool.
049 * <p>
050 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
051 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
052 */
053@InterfaceAudience.Private
054public class ChoreService {
055  private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class);
056
057  /**
058   * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
059   */
060  @InterfaceAudience.Private
061  public final static int MIN_CORE_POOL_SIZE = 1;
062  /**
063   * The initial number of threads in the core pool for the {@link ChoreService}.
064   */
065  public static final String CHORE_SERVICE_INITIAL_POOL_SIZE =
066    "hbase.choreservice.initial.pool.size";
067  public static final int DEFAULT_CHORE_SERVICE_INITIAL_POOL_SIZE = 1;
068
069  /**
070   * This thread pool is used to schedule all of the Chores
071   */
072  private final ScheduledThreadPoolExecutor scheduler;
073
074  /**
075   * Maps chores to their futures. Futures are used to control a chore's schedule
076   */
077  private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
078
079  /**
080   * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
081   * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
082   * increase the core pool size by 1 (otherwise a single long running chore whose execution is
083   * longer than its period would be able to spawn too many threads).
084   */
085  private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
086
087  /**
088   * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
089   * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
090   * running on. The prefix is useful because it allows us to monitor how the thread pool of a
091   * particular service changes over time VIA thread dumps.
092   */
093  private final String coreThreadPoolPrefix;
094
095  /**
096   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
097   *                             spawned by this service
098   */
099  @InterfaceAudience.Private
100  public ChoreService(final String coreThreadPoolPrefix) {
101    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
102  }
103
104  /**
105   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
106   *                             spawned by this service
107   * @param jitter               Should chore service add some jitter for all of the scheduled
108   *                             chores. When set to true this will add -10% to 10% jitter.
109   */
110  public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
111    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
112  }
113
114  /**
115   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
116   *                             spawned by this service
117   * @param corePoolSize         The initial size to set the core pool of the
118   *                             ScheduledThreadPoolExecutor to during initialization. The default
119   *                             size is 1, but specifying a larger size may be beneficial if you
120   *                             know that 1 thread will not be enough.
121   * @param jitter               Should chore service add some jitter for all of the scheduled
122   *                             chores. When set to true this will add -10% to 10% jitter.
123   */
124  public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
125    this.coreThreadPoolPrefix = coreThreadPoolPrefix;
126    if (corePoolSize < MIN_CORE_POOL_SIZE) {
127      corePoolSize = MIN_CORE_POOL_SIZE;
128    }
129
130    final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
131    if (jitter) {
132      scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
133    } else {
134      scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
135    }
136
137    scheduler.setRemoveOnCancelPolicy(true);
138    scheduledChores = new HashMap<>();
139    choresMissingStartTime = new HashMap<>();
140  }
141
142  /**
143   * Schedule a chore.
144   * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
145   *              instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
146   *              with a single ChoreService instance).
147   * @return true when the chore was successfully scheduled. false when the scheduling failed
148   *         (typically occurs when a chore is scheduled during shutdown of service)
149   */
150  public boolean scheduleChore(ScheduledChore chore) {
151    if (chore == null) {
152      return false;
153    }
154    // always lock chore first to prevent dead lock
155    synchronized (chore) {
156      synchronized (this) {
157        try {
158          // Chores should only ever be scheduled with a single ChoreService. If the choreService
159          // is changing, cancel any existing schedules of this chore.
160          if (chore.getChoreService() == this) {
161            LOG.warn("Chore {} has already been scheduled with us", chore);
162            return false;
163          }
164          if (chore.getPeriod() <= 0) {
165            LOG.info("Chore {} is disabled because its period is not positive.", chore);
166            return false;
167          }
168          LOG.info("Chore {} is enabled.", chore);
169          if (chore.getChoreService() != null) {
170            LOG.info("Cancel chore {} from its previous service", chore);
171            chore.getChoreService().cancelChore(chore);
172          }
173          chore.setChoreService(this);
174          ScheduledFuture<?> future =
175            scheduler.scheduleAtFixedRate(TraceUtil.tracedRunnable(chore, chore.getName()),
176              chore.getInitialDelay(), chore.getPeriod(), chore.getTimeUnit());
177          scheduledChores.put(chore, future);
178          return true;
179        } catch (Exception e) {
180          LOG.error("Could not successfully schedule chore: {}", chore.getName(), e);
181          return false;
182        }
183      }
184    }
185  }
186
187  /**
188   * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
189   *              yet then this call is equivalent to a call to scheduleChore.
190   */
191  private void rescheduleChore(ScheduledChore chore, boolean immediately) {
192    if (scheduledChores.containsKey(chore)) {
193      ScheduledFuture<?> future = scheduledChores.get(chore);
194      future.cancel(false);
195    }
196    // set initial delay to 0 as we want to run it immediately
197    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore,
198      immediately ? 0 : chore.getPeriod(), chore.getPeriod(), chore.getTimeUnit());
199    scheduledChores.put(chore, future);
200  }
201
202  /**
203   * Cancel any ongoing schedules that this chore has with the implementer of this interface.
204   * <p/>
205   * Call {@link ScheduledChore#cancel()} to cancel a {@link ScheduledChore}, in
206   * {@link ScheduledChore#cancel()} method we will call this method to remove the
207   * {@link ScheduledChore} from this {@link ChoreService}.
208   */
209  @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
210      allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java")
211  synchronized void cancelChore(ScheduledChore chore) {
212    cancelChore(chore, true);
213  }
214
215  /**
216   * Cancel any ongoing schedules that this chore has with the implementer of this interface.
217   * <p/>
218   * Call {@link ScheduledChore#cancel(boolean)} to cancel a {@link ScheduledChore}, in
219   * {@link ScheduledChore#cancel(boolean)} method we will call this method to remove the
220   * {@link ScheduledChore} from this {@link ChoreService}.
221   */
222  @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
223      allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java")
224  synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
225    if (scheduledChores.containsKey(chore)) {
226      ScheduledFuture<?> future = scheduledChores.get(chore);
227      future.cancel(mayInterruptIfRunning);
228      scheduledChores.remove(chore);
229
230      // Removing a chore that was missing its start time means it may be possible
231      // to reduce the number of threads
232      if (choresMissingStartTime.containsKey(chore)) {
233        choresMissingStartTime.remove(chore);
234        requestCorePoolDecrease();
235      }
236    }
237  }
238
239  /** Returns true when the chore is scheduled with the implementer of this interface */
240  @InterfaceAudience.Private
241  public synchronized boolean isChoreScheduled(ScheduledChore chore) {
242    return chore != null && scheduledChores.containsKey(chore)
243      && !scheduledChores.get(chore).isDone();
244  }
245
246  /**
247   * This method tries to execute the chore immediately. If the chore is executing at the time of
248   * this call, the chore will begin another execution as soon as the current execution finishes
249   */
250  @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
251      allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
252  synchronized void triggerNow(ScheduledChore chore) {
253    assert chore.getChoreService() == this;
254    rescheduleChore(chore, true);
255  }
256
257  /** Returns number of chores that this service currently has scheduled */
258  int getNumberOfScheduledChores() {
259    return scheduledChores.size();
260  }
261
262  /**
263   * Return number of chores that this service currently has scheduled that are missing their
264   * scheduled start time
265   */
266  int getNumberOfChoresMissingStartTime() {
267    return choresMissingStartTime.size();
268  }
269
270  /** Returns number of threads in the core pool of the underlying ScheduledThreadPoolExecutor */
271  int getCorePoolSize() {
272    return scheduler.getCorePoolSize();
273  }
274
275  /**
276   * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
277   * daemon threads, and thus, don't prevent the JVM from shutting down
278   */
279  static class ChoreServiceThreadFactory implements ThreadFactory {
280    private final String threadPrefix;
281    private final static String THREAD_NAME_SUFFIX = ".Chore.";
282    private AtomicInteger threadNumber = new AtomicInteger(1);
283
284    public ChoreServiceThreadFactory(final String threadPrefix) {
285      this.threadPrefix = threadPrefix;
286    }
287
288    @Override
289    public Thread newThread(Runnable r) {
290      Thread thread =
291        new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
292      thread.setDaemon(true);
293      return thread;
294    }
295  }
296
297  /**
298   * Represents a request to increase the number of core pool threads. Typically a request
299   * originates from the fact that the current core pool size is not sufficient to service all of
300   * the currently running Chores
301   * @return true when the request to increase the core pool size succeeds
302   */
303  private synchronized boolean requestCorePoolIncrease() {
304    // There is no point in creating more threads than scheduledChores.size since scheduled runs
305    // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
306    // amongst occurrences of the same chore).
307    if (scheduler.getCorePoolSize() < scheduledChores.size()) {
308      scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
309      printChoreServiceDetails("requestCorePoolIncrease");
310      return true;
311    }
312    return false;
313  }
314
315  /**
316   * Represents a request to decrease the number of core pool threads. Typically a request
317   * originates from the fact that the current core pool size is more than sufficient to service the
318   * running Chores.
319   */
320  private synchronized void requestCorePoolDecrease() {
321    if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
322      scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
323      printChoreServiceDetails("requestCorePoolDecrease");
324    }
325  }
326
327  /**
328   * A callback that tells the implementer of this interface that one of the scheduled chores is
329   * missing its start time. The implication of a chore missing its start time is that the service's
330   * current means of scheduling may not be sufficient to handle the number of ongoing chores (the
331   * other explanation is that the chore's execution time is greater than its scheduled period). The
332   * service should try to increase its concurrency when this callback is received.
333   * @param chore The chore that missed its start time
334   */
335  @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
336      allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
337  synchronized void onChoreMissedStartTime(ScheduledChore chore) {
338    if (!scheduledChores.containsKey(chore)) {
339      return;
340    }
341
342    // If the chore has not caused an increase in the size of the core thread pool then request an
343    // increase. This allows each chore missing its start time to increase the core pool size by
344    // at most 1.
345    if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
346      choresMissingStartTime.put(chore, requestCorePoolIncrease());
347    }
348
349    // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
350    // the chore is NOT rescheduled, future executions of this chore will be delayed more and
351    // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
352    // idle threads to chores based on how delayed they are.
353    rescheduleChore(chore, false);
354    printChoreDetails("onChoreMissedStartTime", chore);
355  }
356
357  /**
358   * Shut down the service. Any chores that are scheduled for execution will be cancelled. Any
359   * chores in the middle of execution will be interrupted and shutdown. This service will be
360   * unusable after this method has been called (i.e. future scheduling attempts will fail).
361   * <p/>
362   * Notice that, this will only clean the chore from this ChoreService but you could still schedule
363   * the chore with other ChoreService.
364   */
365  public synchronized void shutdown() {
366    if (isShutdown()) {
367      return;
368    }
369    scheduler.shutdownNow();
370    LOG.info("Chore service for: {} had {} on shutdown", coreThreadPoolPrefix,
371      scheduledChores.keySet());
372    cancelAllChores(true);
373    scheduledChores.clear();
374    choresMissingStartTime.clear();
375  }
376
377  /** Returns true when the service is shutdown and thus cannot be used anymore */
378  public boolean isShutdown() {
379    return scheduler.isShutdown();
380  }
381
382  /** Returns true when the service is shutdown and all threads have terminated */
383  public boolean isTerminated() {
384    return scheduler.isTerminated();
385  }
386
387  private void cancelAllChores(final boolean mayInterruptIfRunning) {
388    // Build list of chores to cancel so we can iterate through a set that won't change
389    // as chores are cancelled. If we tried to cancel each chore while iterating through
390    // keySet the results would be undefined because the keySet would be changing
391    ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet());
392
393    for (ScheduledChore chore : choresToCancel) {
394      cancelChore(chore, mayInterruptIfRunning);
395    }
396  }
397
398  /** Prints a summary of important details about the chore. Used for debugging purposes */
399  private void printChoreDetails(final String header, ScheduledChore chore) {
400    if (!LOG.isTraceEnabled()) {
401      return;
402    }
403    LinkedHashMap<String, String> output = new LinkedHashMap<>();
404    output.put(header, "");
405    output.put("Chore name: ", chore.getName());
406    output.put("Chore period: ", Integer.toString(chore.getPeriod()));
407    output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
408
409    for (Entry<String, String> entry : output.entrySet()) {
410      LOG.trace(entry.getKey() + entry.getValue());
411    }
412  }
413
414  /** Prints a summary of important details about the service. Used for debugging purposes */
415  private void printChoreServiceDetails(final String header) {
416    if (!LOG.isTraceEnabled()) {
417      return;
418    }
419    LinkedHashMap<String, String> output = new LinkedHashMap<>();
420    output.put(header, "");
421    output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
422    output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
423    output.put("ChoreService missingStartTimeCount: ",
424      Integer.toString(getNumberOfChoresMissingStartTime()));
425
426    for (Entry<String, String> entry : output.entrySet()) {
427      LOG.trace(entry.getKey() + entry.getValue());
428    }
429  }
430}