001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.util.concurrent.ScheduledThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.hadoop.hbase.util.MovingAverage;
025import org.apache.hadoop.hbase.util.WindowMovingAverage;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
030
031/**
032 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
033 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
034 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
035 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
036 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
037 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
038 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
039 * thread pool.
040 * <p>
041 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
042 * an entry being added to a queue, etc.
043 */
044@InterfaceAudience.Public
045public abstract class ScheduledChore implements Runnable {
046  private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class);
047
048  private final String name;
049
050  /**
051   * Default values for scheduling parameters should they be excluded during construction
052   */
053  private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
054  private final static long DEFAULT_INITIAL_DELAY = 0;
055
056  /**
057   * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
058   */
059  private final int period; // in TimeUnit units
060  private final TimeUnit timeUnit;
061  private final long initialDelay; // in TimeUnit units
062
063  /**
064   * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
065   * not scheduled.
066   */
067  private ChoreServicer choreServicer;
068
069  /**
070   * Variables that encapsulate the meaningful state information
071   */
072  private long timeOfLastRun = -1; // system time millis
073  private long timeOfThisRun = -1; // system time millis
074  private boolean initialChoreComplete = false;
075
076  /**
077   * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
078   * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
079   * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
080   * command can cause many chores to stop together.
081   */
082  private final Stoppable stopper;
083
084  private final MovingAverage<Void> timeMeasurement;
085  private static final long FIVE_MINUTES_IN_NANOS = TimeUnit.MINUTES.toNanos(5L);
086  private long lastLog = System.nanoTime();
087
088  interface ChoreServicer {
089    /**
090     * Cancel any ongoing schedules that this chore has with the implementer of this interface.
091     */
092    public void cancelChore(ScheduledChore chore);
093    public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
094
095    /**
096     * @return true when the chore is scheduled with the implementer of this interface
097     */
098    public boolean isChoreScheduled(ScheduledChore chore);
099
100    /**
101     * This method tries to execute the chore immediately. If the chore is executing at the time of
102     * this call, the chore will begin another execution as soon as the current execution finishes
103     * <p>
104     * If the chore is not scheduled with a ChoreService, this call will fail.
105     * @return false when the chore could not be triggered immediately
106     */
107    public boolean triggerNow(ScheduledChore chore);
108
109    /**
110     * A callback that tells the implementer of this interface that one of the scheduled chores is
111     * missing its start time. The implication of a chore missing its start time is that the
112     * service's current means of scheduling may not be sufficient to handle the number of ongoing
113     * chores (the other explanation is that the chore's execution time is greater than its
114     * scheduled period). The service should try to increase its concurrency when this callback is
115     * received.
116     * @param chore The chore that missed its start time
117     */
118    public void onChoreMissedStartTime(ScheduledChore chore);
119  }
120
121  /**
122   * This constructor is for test only. It allows us to create an object and to call chore() on it.
123   */
124  @InterfaceAudience.Private
125  @VisibleForTesting
126  protected ScheduledChore() {
127    this("TestChore", null, 0, DEFAULT_INITIAL_DELAY, DEFAULT_TIME_UNIT);
128  }
129
130  /**
131   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
132   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
133   * @param period Period in millis with which this Chore repeats execution when scheduled.
134   */
135  public ScheduledChore(final String name, Stoppable stopper, final int period) {
136    this(name, stopper, period, DEFAULT_INITIAL_DELAY);
137  }
138
139  /**
140   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
141   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
142   * @param period Period in millis with which this Chore repeats execution when scheduled.
143   * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
144   *          value of 0 means the chore will begin to execute immediately. Negative delays are
145   *          invalid and will be corrected to a value of 0.
146   */
147  public ScheduledChore(final String name, Stoppable stopper, final int period,
148      final long initialDelay) {
149    this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT);
150  }
151
152  /**
153   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
154   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
155   * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled.
156   * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been
157   *          scheduled. A value of 0 means the chore will begin to execute immediately. Negative
158   *          delays are invalid and will be corrected to a value of 0.
159   * @param unit The unit that is used to measure period and initialDelay
160   */
161  public ScheduledChore(final String name, Stoppable stopper, final int period,
162      final long initialDelay, final TimeUnit unit) {
163    this.name = name;
164    this.stopper = stopper;
165    this.period = period;
166    this.initialDelay = initialDelay < 0 ? 0 : initialDelay;
167    this.timeUnit = unit;
168    this.timeMeasurement = new WindowMovingAverage(name);
169  }
170
171  /**
172   * @see java.lang.Runnable#run()
173   */
174  @Override
175  public void run() {
176    updateTimeTrackingBeforeRun();
177    if (missedStartTime() && isScheduled()) {
178      onChoreMissedStartTime();
179      if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");
180    } else if (stopper.isStopped() || !isScheduled()) {
181      cancel(false);
182      cleanup();
183      if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");
184    } else {
185      try {
186        if (!initialChoreComplete) {
187          initialChoreComplete = initialChore();
188        } else {
189          timeMeasurement.measure(() -> {
190            chore();
191            return null;
192          });
193          if (LOG.isInfoEnabled() && (System.nanoTime() - lastLog > FIVE_MINUTES_IN_NANOS)) {
194            LOG.info("{} average execution time: {} ns.", getName(),
195                (long)(timeMeasurement.getAverageTime()));
196            lastLog = System.nanoTime();
197          }
198        }
199      } catch (Throwable t) {
200        if (LOG.isErrorEnabled()) LOG.error("Caught error", t);
201        if (this.stopper.isStopped()) {
202          cancel(false);
203          cleanup();
204        }
205      }
206    }
207  }
208
209  /**
210   * Update our time tracking members. Called at the start of an execution of this chore's run()
211   * method so that a correct decision can be made as to whether or not we missed the start time
212   */
213  private synchronized void updateTimeTrackingBeforeRun() {
214    timeOfLastRun = timeOfThisRun;
215    timeOfThisRun = System.currentTimeMillis();
216  }
217
218  /**
219   * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
220   * make the decision as to whether or not it would be worthwhile to increase the number of core
221   * pool threads
222   */
223  private synchronized void onChoreMissedStartTime() {
224    if (choreServicer != null) choreServicer.onChoreMissedStartTime(this);
225  }
226
227  /**
228   * @return How long in millis has it been since this chore last run. Useful for checking if the
229   *         chore has missed its scheduled start time by too large of a margin
230   */
231  synchronized long getTimeBetweenRuns() {
232    return timeOfThisRun - timeOfLastRun;
233  }
234
235  /**
236   * @return true when the time between runs exceeds the acceptable threshold
237   */
238  private synchronized boolean missedStartTime() {
239    return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun)
240        && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
241  }
242
243  /**
244   * @return max allowed time in millis between runs.
245   */
246  private double getMaximumAllowedTimeBetweenRuns() {
247    // Threshold used to determine if the Chore's current run started too late
248    return 1.5 * timeUnit.toMillis(period);
249  }
250
251  /**
252   * @param time in system millis
253   * @return true if time is earlier or equal to current milli time
254   */
255  private synchronized boolean isValidTime(final long time) {
256    return time > 0 && time <= System.currentTimeMillis();
257  }
258
259  /**
260   * @return false when the Chore is not currently scheduled with a ChoreService
261   */
262  public synchronized boolean triggerNow() {
263    if (choreServicer != null) {
264      return choreServicer.triggerNow(this);
265    } else {
266      return false;
267    }
268  }
269
270  synchronized void setChoreServicer(ChoreServicer service) {
271    // Chores should only ever be scheduled with a single ChoreService. If the choreServicer
272    // is changing, cancel any existing schedules of this chore.
273    if (choreServicer != null && choreServicer != service) {
274      choreServicer.cancelChore(this, false);
275    }
276    choreServicer = service;
277    timeOfThisRun = System.currentTimeMillis();
278  }
279
280  public synchronized void cancel() {
281    cancel(true);
282  }
283
284  public synchronized void cancel(boolean mayInterruptIfRunning) {
285    if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning);
286
287    choreServicer = null;
288  }
289
290  public String getName() {
291    return name;
292  }
293
294  public Stoppable getStopper() {
295    return stopper;
296  }
297
298  /**
299   * @return period to execute chore in getTimeUnit() units
300   */
301  public int getPeriod() {
302    return period;
303  }
304
305  /**
306   * @return initial delay before executing chore in getTimeUnit() units
307   */
308  public long getInitialDelay() {
309    return initialDelay;
310  }
311
312  public TimeUnit getTimeUnit() {
313    return timeUnit;
314  }
315
316  public synchronized boolean isInitialChoreComplete() {
317    return initialChoreComplete;
318  }
319
320  @VisibleForTesting
321  synchronized ChoreServicer getChoreServicer() {
322    return choreServicer;
323  }
324
325  @VisibleForTesting
326  synchronized long getTimeOfLastRun() {
327    return timeOfLastRun;
328  }
329
330  @VisibleForTesting
331  synchronized long getTimeOfThisRun() {
332    return timeOfThisRun;
333  }
334
335  /**
336   * @return true when this Chore is scheduled with a ChoreService
337   */
338  public synchronized boolean isScheduled() {
339    return choreServicer != null && choreServicer.isChoreScheduled(this);
340  }
341
342  @InterfaceAudience.Private
343  @VisibleForTesting
344  public synchronized void choreForTesting() {
345    chore();
346  }
347
348  /**
349   * The task to execute on each scheduled execution of the Chore
350   */
351  protected abstract void chore();
352
353  /**
354   * Override to run a task before we start looping.
355   * @return true if initial chore was successful
356   */
357  protected boolean initialChore() {
358    // Default does nothing
359    return true;
360  }
361
362  /**
363   * Override to run cleanup tasks when the Chore encounters an error and must stop running
364   */
365  protected synchronized void cleanup() {
366  }
367
368  /**
369   * A summation of this chore in human readable format. Downstream users should not presume
370   * parsing of this string can relaibly be done between versions. Instead, they should rely
371   * on the public accessor methods to get the information they desire.
372   */
373  @InterfaceAudience.Private
374  @Override
375  public String toString() {
376    return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: "
377        + getTimeUnit() + "]";
378  }
379}