001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.util.concurrent.ScheduledThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
028
029/**
030 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
031 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
032 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
033 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
034 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
035 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
036 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
037 * thread pool.
038 * <p>
039 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
040 * an entry being added to a queue, etc.
041 */
042@InterfaceAudience.Public
043public abstract class ScheduledChore implements Runnable {
044  private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class);
045
046  private final String name;
047
048  /**
049   * Default values for scheduling parameters should they be excluded during construction
050   */
051  private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
052  private final static long DEFAULT_INITIAL_DELAY = 0;
053
054  /**
055   * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
056   */
057  private final int period; // in TimeUnit units
058  private final TimeUnit timeUnit;
059  private final long initialDelay; // in TimeUnit units
060
061  /**
062   * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
063   * not scheduled.
064   */
065  private ChoreServicer choreServicer;
066
067  /**
068   * Variables that encapsulate the meaningful state information
069   */
070  private long timeOfLastRun = -1; // system time millis
071  private long timeOfThisRun = -1; // system time millis
072  private boolean initialChoreComplete = false;
073
074  /**
075   * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
076   * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
077   * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
078   * command can cause many chores to stop together.
079   */
080  private final Stoppable stopper;
081
082  interface ChoreServicer {
083    /**
084     * Cancel any ongoing schedules that this chore has with the implementer of this interface.
085     */
086    public void cancelChore(ScheduledChore chore);
087    public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
088
089    /**
090     * @return true when the chore is scheduled with the implementer of this interface
091     */
092    public boolean isChoreScheduled(ScheduledChore chore);
093
094    /**
095     * This method tries to execute the chore immediately. If the chore is executing at the time of
096     * this call, the chore will begin another execution as soon as the current execution finishes
097     * <p>
098     * If the chore is not scheduled with a ChoreService, this call will fail.
099     * @return false when the chore could not be triggered immediately
100     */
101    public boolean triggerNow(ScheduledChore chore);
102
103    /**
104     * A callback that tells the implementer of this interface that one of the scheduled chores is
105     * missing its start time. The implication of a chore missing its start time is that the
106     * service's current means of scheduling may not be sufficient to handle the number of ongoing
107     * chores (the other explanation is that the chore's execution time is greater than its
108     * scheduled period). The service should try to increase its concurrency when this callback is
109     * received.
110     * @param chore The chore that missed its start time
111     */
112    public void onChoreMissedStartTime(ScheduledChore chore);
113  }
114
115  /**
116   * This constructor is for test only. It allows us to create an object and to call chore() on it.
117   */
118  @InterfaceAudience.Private
119  @VisibleForTesting
120  protected ScheduledChore() {
121    this.name = null;
122    this.stopper = null;
123    this.period = 0;
124    this.initialDelay = DEFAULT_INITIAL_DELAY;
125    this.timeUnit = DEFAULT_TIME_UNIT;
126  }
127
128  /**
129   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
130   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
131   * @param period Period in millis with which this Chore repeats execution when scheduled.
132   */
133  public ScheduledChore(final String name, Stoppable stopper, final int period) {
134    this(name, stopper, period, DEFAULT_INITIAL_DELAY);
135  }
136
137  /**
138   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
139   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
140   * @param period Period in millis with which this Chore repeats execution when scheduled.
141   * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
142   *          value of 0 means the chore will begin to execute immediately. Negative delays are
143   *          invalid and will be corrected to a value of 0.
144   */
145  public ScheduledChore(final String name, Stoppable stopper, final int period,
146      final long initialDelay) {
147    this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT);
148  }
149
150  /**
151   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
152   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
153   * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled.
154   * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been
155   *          scheduled. A value of 0 means the chore will begin to execute immediately. Negative
156   *          delays are invalid and will be corrected to a value of 0.
157   * @param unit The unit that is used to measure period and initialDelay
158   */
159  public ScheduledChore(final String name, Stoppable stopper, final int period,
160      final long initialDelay, final TimeUnit unit) {
161    this.name = name;
162    this.stopper = stopper;
163    this.period = period;
164    this.initialDelay = initialDelay < 0 ? 0 : initialDelay;
165    this.timeUnit = unit;
166  }
167
168  /**
169   * @see java.lang.Runnable#run()
170   */
171  @Override
172  public void run() {
173    updateTimeTrackingBeforeRun();
174    if (missedStartTime() && isScheduled()) {
175      onChoreMissedStartTime();
176      if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");
177    } else if (stopper.isStopped() || !isScheduled()) {
178      cancel(false);
179      cleanup();
180      if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");
181    } else {
182      try {
183        if (!initialChoreComplete) {
184          initialChoreComplete = initialChore();
185        } else {
186          chore();
187        }
188      } catch (Throwable t) {
189        if (LOG.isErrorEnabled()) LOG.error("Caught error", t);
190        if (this.stopper.isStopped()) {
191          cancel(false);
192          cleanup();
193        }
194      }
195    }
196  }
197
198  /**
199   * Update our time tracking members. Called at the start of an execution of this chore's run()
200   * method so that a correct decision can be made as to whether or not we missed the start time
201   */
202  private synchronized void updateTimeTrackingBeforeRun() {
203    timeOfLastRun = timeOfThisRun;
204    timeOfThisRun = System.currentTimeMillis();
205  }
206
207  /**
208   * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
209   * make the decision as to whether or not it would be worthwhile to increase the number of core
210   * pool threads
211   */
212  private synchronized void onChoreMissedStartTime() {
213    if (choreServicer != null) choreServicer.onChoreMissedStartTime(this);
214  }
215
216  /**
217   * @return How long in millis has it been since this chore last run. Useful for checking if the
218   *         chore has missed its scheduled start time by too large of a margin
219   */
220  synchronized long getTimeBetweenRuns() {
221    return timeOfThisRun - timeOfLastRun;
222  }
223
224  /**
225   * @return true when the time between runs exceeds the acceptable threshold
226   */
227  private synchronized boolean missedStartTime() {
228    return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun)
229        && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
230  }
231
232  /**
233   * @return max allowed time in millis between runs.
234   */
235  private double getMaximumAllowedTimeBetweenRuns() {
236    // Threshold used to determine if the Chore's current run started too late
237    return 1.5 * timeUnit.toMillis(period);
238  }
239
240  /**
241   * @param time in system millis
242   * @return true if time is earlier or equal to current milli time
243   */
244  private synchronized boolean isValidTime(final long time) {
245    return time > 0 && time <= System.currentTimeMillis();
246  }
247
248  /**
249   * @return false when the Chore is not currently scheduled with a ChoreService
250   */
251  public synchronized boolean triggerNow() {
252    if (choreServicer != null) {
253      return choreServicer.triggerNow(this);
254    } else {
255      return false;
256    }
257  }
258
259  synchronized void setChoreServicer(ChoreServicer service) {
260    // Chores should only ever be scheduled with a single ChoreService. If the choreServicer
261    // is changing, cancel any existing schedules of this chore.
262    if (choreServicer != null && choreServicer != service) {
263      choreServicer.cancelChore(this, false);
264    }
265    choreServicer = service;
266    timeOfThisRun = System.currentTimeMillis();
267  }
268
269  public synchronized void cancel() {
270    cancel(true);
271  }
272
273  public synchronized void cancel(boolean mayInterruptIfRunning) {
274    if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning);
275
276    choreServicer = null;
277  }
278
279  public String getName() {
280    return name;
281  }
282
283  public Stoppable getStopper() {
284    return stopper;
285  }
286
287  /**
288   * @return period to execute chore in getTimeUnit() units
289   */
290  public int getPeriod() {
291    return period;
292  }
293
294  /**
295   * @return initial delay before executing chore in getTimeUnit() units
296   */
297  public long getInitialDelay() {
298    return initialDelay;
299  }
300
301  public TimeUnit getTimeUnit() {
302    return timeUnit;
303  }
304
305  public synchronized boolean isInitialChoreComplete() {
306    return initialChoreComplete;
307  }
308
309  @VisibleForTesting
310  synchronized ChoreServicer getChoreServicer() {
311    return choreServicer;
312  }
313
314  @VisibleForTesting
315  synchronized long getTimeOfLastRun() {
316    return timeOfLastRun;
317  }
318
319  @VisibleForTesting
320  synchronized long getTimeOfThisRun() {
321    return timeOfThisRun;
322  }
323
324  /**
325   * @return true when this Chore is scheduled with a ChoreService
326   */
327  public synchronized boolean isScheduled() {
328    return choreServicer != null && choreServicer.isChoreScheduled(this);
329  }
330
331  @InterfaceAudience.Private
332  @VisibleForTesting
333  public synchronized void choreForTesting() {
334    chore();
335  }
336
337  /**
338   * The task to execute on each scheduled execution of the Chore
339   */
340  protected abstract void chore();
341
342  /**
343   * Override to run a task before we start looping.
344   * @return true if initial chore was successful
345   */
346  protected boolean initialChore() {
347    // Default does nothing
348    return true;
349  }
350
351  /**
352   * Override to run cleanup tasks when the Chore encounters an error and must stop running
353   */
354  protected synchronized void cleanup() {
355  }
356
357  /**
358   * A summation of this chore in human readable format. Downstream users should not presume
359   * parsing of this string can relaibly be done between versions. Instead, they should rely
360   * on the public accessor methods to get the information they desire.
361   */
362  @InterfaceAudience.Private
363  @Override
364  public String toString() {
365    return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: "
366        + getTimeUnit() + "]";
367  }
368}