001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.util.concurrent.ScheduledThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
028
029/**
030 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
031 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
032 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
033 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
034 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
035 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
036 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
037 * thread pool.
038 * <p>
039 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
040 * an entry being added to a queue, etc.
041 */
042@InterfaceAudience.Public
043public abstract class ScheduledChore implements Runnable {
044  private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class);
045
046  private final String name;
047
048  /**
049   * Default values for scheduling parameters should they be excluded during construction
050   */
051  private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
052  private final static long DEFAULT_INITIAL_DELAY = 0;
053
054  /**
055   * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
056   */
057  private final int period; // in TimeUnit units
058  private final TimeUnit timeUnit;
059  private final long initialDelay; // in TimeUnit units
060
061  /**
062   * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
063   * not scheduled.
064   */
065  private ChoreServicer choreServicer;
066
067  /**
068   * Variables that encapsulate the meaningful state information
069   */
070  private long timeOfLastRun = -1; // system time millis
071  private long timeOfThisRun = -1; // system time millis
072  private boolean initialChoreComplete = false;
073
074  /**
075   * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
076   * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
077   * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
078   * command can cause many chores to stop together.
079   */
080  private final Stoppable stopper;
081
082  interface ChoreServicer {
083    /**
084     * Cancel any ongoing schedules that this chore has with the implementer of this interface.
085     */
086    public void cancelChore(ScheduledChore chore);
087    public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
088
089    /**
090     * @return true when the chore is scheduled with the implementer of this interface
091     */
092    public boolean isChoreScheduled(ScheduledChore chore);
093
094    /**
095     * This method tries to execute the chore immediately. If the chore is executing at the time of
096     * this call, the chore will begin another execution as soon as the current execution finishes
097     * <p>
098     * If the chore is not scheduled with a ChoreService, this call will fail.
099     * @return false when the chore could not be triggered immediately
100     */
101    public boolean triggerNow(ScheduledChore chore);
102
103    /**
104     * A callback that tells the implementer of this interface that one of the scheduled chores is
105     * missing its start time. The implication of a chore missing its start time is that the
106     * service's current means of scheduling may not be sufficient to handle the number of ongoing
107     * chores (the other explanation is that the chore's execution time is greater than its
108     * scheduled period). The service should try to increase its concurrency when this callback is
109     * received.
110     * @param chore The chore that missed its start time
111     */
112    public void onChoreMissedStartTime(ScheduledChore chore);
113  }
114
115  /**
116   * This constructor is for test only. It allows us to create an object and to call chore() on it.
117   */
118  @InterfaceAudience.Private
119  @VisibleForTesting
120  protected ScheduledChore() {
121    this("TestChore", null, 0, DEFAULT_INITIAL_DELAY, DEFAULT_TIME_UNIT);
122  }
123
124  /**
125   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
126   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
127   * @param period Period in millis with which this Chore repeats execution when scheduled.
128   */
129  public ScheduledChore(final String name, Stoppable stopper, final int period) {
130    this(name, stopper, period, DEFAULT_INITIAL_DELAY);
131  }
132
133  /**
134   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
135   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
136   * @param period Period in millis with which this Chore repeats execution when scheduled.
137   * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
138   *          value of 0 means the chore will begin to execute immediately. Negative delays are
139   *          invalid and will be corrected to a value of 0.
140   */
141  public ScheduledChore(final String name, Stoppable stopper, final int period,
142      final long initialDelay) {
143    this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT);
144  }
145
146  /**
147   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
148   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
149   * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled.
150   * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been
151   *          scheduled. A value of 0 means the chore will begin to execute immediately. Negative
152   *          delays are invalid and will be corrected to a value of 0.
153   * @param unit The unit that is used to measure period and initialDelay
154   */
155  public ScheduledChore(final String name, Stoppable stopper, final int period,
156      final long initialDelay, final TimeUnit unit) {
157    this.name = name;
158    this.stopper = stopper;
159    this.period = period;
160    this.initialDelay = initialDelay < 0 ? 0 : initialDelay;
161    this.timeUnit = unit;
162  }
163
164  /**
165   * @see java.lang.Runnable#run()
166   */
167  @Override
168  public void run() {
169    updateTimeTrackingBeforeRun();
170    if (missedStartTime() && isScheduled()) {
171      onChoreMissedStartTime();
172      LOG.info("Chore: {} missed its start time", getName());
173    } else if (stopper.isStopped() || !isScheduled()) {
174      cancel(false);
175      cleanup();
176      LOG.info("Chore: {} was stopped", getName());
177    } else {
178      try {
179        // TODO: Histogram metrics per chore name.
180        // For now, just measure and log if DEBUG level logging is enabled.
181        long start = 0;
182        if (LOG.isDebugEnabled()) {
183          start = System.nanoTime();
184        }
185        if (!initialChoreComplete) {
186          initialChoreComplete = initialChore();
187        } else {
188          chore();
189        }
190        if (LOG.isDebugEnabled() && start > 0) {
191          long end = System.nanoTime();
192          LOG.debug("{} execution time: {} ms.", getName(),
193            TimeUnit.NANOSECONDS.toMillis(end - start));
194        }
195      } catch (Throwable t) {
196        LOG.error("Caught error", t);
197        if (this.stopper.isStopped()) {
198          cancel(false);
199          cleanup();
200        }
201      }
202    }
203  }
204
205  /**
206   * Update our time tracking members. Called at the start of an execution of this chore's run()
207   * method so that a correct decision can be made as to whether or not we missed the start time
208   */
209  private synchronized void updateTimeTrackingBeforeRun() {
210    timeOfLastRun = timeOfThisRun;
211    timeOfThisRun = System.currentTimeMillis();
212  }
213
214  /**
215   * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
216   * make the decision as to whether or not it would be worthwhile to increase the number of core
217   * pool threads
218   */
219  private synchronized void onChoreMissedStartTime() {
220    if (choreServicer != null) choreServicer.onChoreMissedStartTime(this);
221  }
222
223  /**
224   * @return How long in millis has it been since this chore last run. Useful for checking if the
225   *         chore has missed its scheduled start time by too large of a margin
226   */
227  synchronized long getTimeBetweenRuns() {
228    return timeOfThisRun - timeOfLastRun;
229  }
230
231  /**
232   * @return true when the time between runs exceeds the acceptable threshold
233   */
234  private synchronized boolean missedStartTime() {
235    return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun)
236        && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
237  }
238
239  /**
240   * @return max allowed time in millis between runs.
241   */
242  private double getMaximumAllowedTimeBetweenRuns() {
243    // Threshold used to determine if the Chore's current run started too late
244    return 1.5 * timeUnit.toMillis(period);
245  }
246
247  /**
248   * @param time in system millis
249   * @return true if time is earlier or equal to current milli time
250   */
251  private synchronized boolean isValidTime(final long time) {
252    return time > 0 && time <= System.currentTimeMillis();
253  }
254
255  /**
256   * @return false when the Chore is not currently scheduled with a ChoreService
257   */
258  public synchronized boolean triggerNow() {
259    if (choreServicer != null) {
260      return choreServicer.triggerNow(this);
261    } else {
262      return false;
263    }
264  }
265
266  synchronized void setChoreServicer(ChoreServicer service) {
267    // Chores should only ever be scheduled with a single ChoreService. If the choreServicer
268    // is changing, cancel any existing schedules of this chore.
269    if (choreServicer != null && choreServicer != service) {
270      choreServicer.cancelChore(this, false);
271    }
272    choreServicer = service;
273    timeOfThisRun = System.currentTimeMillis();
274  }
275
276  public synchronized void cancel() {
277    cancel(true);
278  }
279
280  public synchronized void cancel(boolean mayInterruptIfRunning) {
281    if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning);
282
283    choreServicer = null;
284  }
285
286  public String getName() {
287    return name;
288  }
289
290  public Stoppable getStopper() {
291    return stopper;
292  }
293
294  /**
295   * @return period to execute chore in getTimeUnit() units
296   */
297  public int getPeriod() {
298    return period;
299  }
300
301  /**
302   * @return initial delay before executing chore in getTimeUnit() units
303   */
304  public long getInitialDelay() {
305    return initialDelay;
306  }
307
308  public TimeUnit getTimeUnit() {
309    return timeUnit;
310  }
311
312  public synchronized boolean isInitialChoreComplete() {
313    return initialChoreComplete;
314  }
315
316  @VisibleForTesting
317  synchronized ChoreServicer getChoreServicer() {
318    return choreServicer;
319  }
320
321  @VisibleForTesting
322  synchronized long getTimeOfLastRun() {
323    return timeOfLastRun;
324  }
325
326  @VisibleForTesting
327  synchronized long getTimeOfThisRun() {
328    return timeOfThisRun;
329  }
330
331  /**
332   * @return true when this Chore is scheduled with a ChoreService
333   */
334  public synchronized boolean isScheduled() {
335    return choreServicer != null && choreServicer.isChoreScheduled(this);
336  }
337
338  @InterfaceAudience.Private
339  @VisibleForTesting
340  public synchronized void choreForTesting() {
341    chore();
342  }
343
344  /**
345   * The task to execute on each scheduled execution of the Chore
346   */
347  protected abstract void chore();
348
349  /**
350   * Override to run a task before we start looping.
351   * @return true if initial chore was successful
352   */
353  protected boolean initialChore() {
354    // Default does nothing
355    return true;
356  }
357
358  /**
359   * Override to run cleanup tasks when the Chore encounters an error and must stop running
360   */
361  protected synchronized void cleanup() {
362  }
363
364  /**
365   * A summation of this chore in human readable format. Downstream users should not presume
366   * parsing of this string can relaibly be done between versions. Instead, they should rely
367   * on the public accessor methods to get the information they desire.
368   */
369  @InterfaceAudience.Private
370  @Override
371  public String toString() {
372    return "ScheduledChore name=" + getName() + ", period=" + getPeriod() +
373      ", unit=" + getTimeUnit();
374  }
375}