001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.util.concurrent.ScheduledThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
029 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
030 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
031 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
032 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
033 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
034 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
035 * thread pool.
036 * <p>
037 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
038 * an entry being added to a queue, etc.
039 */
040@InterfaceAudience.Public
041public abstract class ScheduledChore implements Runnable {
042  private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class);
043
044  private final String name;
045
046  /**
047   * Default values for scheduling parameters should they be excluded during construction
048   */
049  private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
050  private final static long DEFAULT_INITIAL_DELAY = 0;
051
052  /**
053   * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
054   */
055  private final int period; // in TimeUnit units
056  private final TimeUnit timeUnit;
057  private final long initialDelay; // in TimeUnit units
058
059  /**
060   * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
061   * not scheduled.
062   */
063  private ChoreServicer choreServicer;
064
065  /**
066   * Variables that encapsulate the meaningful state information
067   */
068  private long timeOfLastRun = -1; // system time millis
069  private long timeOfThisRun = -1; // system time millis
070  private boolean initialChoreComplete = false;
071
072  /**
073   * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
074   * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
075   * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
076   * command can cause many chores to stop together.
077   */
078  private final Stoppable stopper;
079
080  interface ChoreServicer {
081    /**
082     * Cancel any ongoing schedules that this chore has with the implementer of this interface.
083     */
084    public void cancelChore(ScheduledChore chore);
085    public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
086
087    /**
088     * @return true when the chore is scheduled with the implementer of this interface
089     */
090    public boolean isChoreScheduled(ScheduledChore chore);
091
092    /**
093     * This method tries to execute the chore immediately. If the chore is executing at the time of
094     * this call, the chore will begin another execution as soon as the current execution finishes
095     * <p>
096     * If the chore is not scheduled with a ChoreService, this call will fail.
097     * @return false when the chore could not be triggered immediately
098     */
099    public boolean triggerNow(ScheduledChore chore);
100
101    /**
102     * A callback that tells the implementer of this interface that one of the scheduled chores is
103     * missing its start time. The implication of a chore missing its start time is that the
104     * service's current means of scheduling may not be sufficient to handle the number of ongoing
105     * chores (the other explanation is that the chore's execution time is greater than its
106     * scheduled period). The service should try to increase its concurrency when this callback is
107     * received.
108     * @param chore The chore that missed its start time
109     */
110    public void onChoreMissedStartTime(ScheduledChore chore);
111  }
112
113  /**
114   * This constructor is for test only. It allows us to create an object and to call chore() on it.
115   */
116  @InterfaceAudience.Private
117  protected ScheduledChore() {
118    this("TestChore", null, 0, DEFAULT_INITIAL_DELAY, DEFAULT_TIME_UNIT);
119  }
120
121  /**
122   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
123   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
124   * @param period Period in millis with which this Chore repeats execution when scheduled.
125   */
126  public ScheduledChore(final String name, Stoppable stopper, final int period) {
127    this(name, stopper, period, DEFAULT_INITIAL_DELAY);
128  }
129
130  /**
131   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
132   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
133   * @param period Period in millis with which this Chore repeats execution when scheduled.
134   * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
135   *          value of 0 means the chore will begin to execute immediately. Negative delays are
136   *          invalid and will be corrected to a value of 0.
137   */
138  public ScheduledChore(final String name, Stoppable stopper, final int period,
139      final long initialDelay) {
140    this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT);
141  }
142
143  /**
144   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
145   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
146   * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled.
147   * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been
148   *          scheduled. A value of 0 means the chore will begin to execute immediately. Negative
149   *          delays are invalid and will be corrected to a value of 0.
150   * @param unit The unit that is used to measure period and initialDelay
151   */
152  public ScheduledChore(final String name, Stoppable stopper, final int period,
153      final long initialDelay, final TimeUnit unit) {
154    this.name = name;
155    this.stopper = stopper;
156    this.period = period;
157    this.initialDelay = initialDelay < 0 ? 0 : initialDelay;
158    this.timeUnit = unit;
159  }
160
161  /**
162   * @see java.lang.Runnable#run()
163   */
164  @Override
165  public void run() {
166    updateTimeTrackingBeforeRun();
167    if (missedStartTime() && isScheduled()) {
168      onChoreMissedStartTime();
169      LOG.info("Chore: {} missed its start time", getName());
170    } else if (stopper.isStopped() || !isScheduled()) {
171      cancel(false);
172      cleanup();
173      LOG.info("Chore: {} was stopped", getName());
174    } else {
175      try {
176        // TODO: Histogram metrics per chore name.
177        // For now, just measure and log if DEBUG level logging is enabled.
178        long start = 0;
179        if (LOG.isDebugEnabled()) {
180          start = System.nanoTime();
181        }
182        if (!initialChoreComplete) {
183          initialChoreComplete = initialChore();
184        } else {
185          chore();
186        }
187        if (LOG.isDebugEnabled() && start > 0) {
188          long end = System.nanoTime();
189          LOG.debug("{} execution time: {} ms.", getName(),
190            TimeUnit.NANOSECONDS.toMillis(end - start));
191        }
192      } catch (Throwable t) {
193        LOG.error("Caught error", t);
194        if (this.stopper.isStopped()) {
195          cancel(false);
196          cleanup();
197        }
198      }
199    }
200  }
201
202  /**
203   * Update our time tracking members. Called at the start of an execution of this chore's run()
204   * method so that a correct decision can be made as to whether or not we missed the start time
205   */
206  private synchronized void updateTimeTrackingBeforeRun() {
207    timeOfLastRun = timeOfThisRun;
208    timeOfThisRun = System.currentTimeMillis();
209  }
210
211  /**
212   * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
213   * make the decision as to whether or not it would be worthwhile to increase the number of core
214   * pool threads
215   */
216  private synchronized void onChoreMissedStartTime() {
217    if (choreServicer != null) choreServicer.onChoreMissedStartTime(this);
218  }
219
220  /**
221   * @return How long in millis has it been since this chore last run. Useful for checking if the
222   *         chore has missed its scheduled start time by too large of a margin
223   */
224  synchronized long getTimeBetweenRuns() {
225    return timeOfThisRun - timeOfLastRun;
226  }
227
228  /**
229   * @return true when the time between runs exceeds the acceptable threshold
230   */
231  private synchronized boolean missedStartTime() {
232    return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun)
233        && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
234  }
235
236  /**
237   * @return max allowed time in millis between runs.
238   */
239  private double getMaximumAllowedTimeBetweenRuns() {
240    // Threshold used to determine if the Chore's current run started too late
241    return 1.5 * timeUnit.toMillis(period);
242  }
243
244  /**
245   * @param time in system millis
246   * @return true if time is earlier or equal to current milli time
247   */
248  private synchronized boolean isValidTime(final long time) {
249    return time > 0 && time <= System.currentTimeMillis();
250  }
251
252  /**
253   * @return false when the Chore is not currently scheduled with a ChoreService
254   */
255  public synchronized boolean triggerNow() {
256    if (choreServicer != null) {
257      return choreServicer.triggerNow(this);
258    } else {
259      return false;
260    }
261  }
262
263  synchronized void setChoreServicer(ChoreServicer service) {
264    // Chores should only ever be scheduled with a single ChoreService. If the choreServicer
265    // is changing, cancel any existing schedules of this chore.
266    if (choreServicer != null && choreServicer != service) {
267      choreServicer.cancelChore(this, false);
268    }
269    choreServicer = service;
270    timeOfThisRun = -1;
271  }
272
273  public synchronized void cancel() {
274    cancel(true);
275  }
276
277  public synchronized void cancel(boolean mayInterruptIfRunning) {
278    if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning);
279
280    choreServicer = null;
281  }
282
283  public String getName() {
284    return name;
285  }
286
287  public Stoppable getStopper() {
288    return stopper;
289  }
290
291  /**
292   * @return period to execute chore in getTimeUnit() units
293   */
294  public int getPeriod() {
295    return period;
296  }
297
298  /**
299   * @return initial delay before executing chore in getTimeUnit() units
300   */
301  public long getInitialDelay() {
302    return initialDelay;
303  }
304
305  public TimeUnit getTimeUnit() {
306    return timeUnit;
307  }
308
309  public synchronized boolean isInitialChoreComplete() {
310    return initialChoreComplete;
311  }
312
313  @InterfaceAudience.Private
314  synchronized ChoreServicer getChoreServicer() {
315    return choreServicer;
316  }
317
318  @InterfaceAudience.Private
319  synchronized long getTimeOfLastRun() {
320    return timeOfLastRun;
321  }
322
323  @InterfaceAudience.Private
324  synchronized long getTimeOfThisRun() {
325    return timeOfThisRun;
326  }
327
328  /**
329   * @return true when this Chore is scheduled with a ChoreService
330   */
331  public synchronized boolean isScheduled() {
332    return choreServicer != null && choreServicer.isChoreScheduled(this);
333  }
334
335  @InterfaceAudience.Private
336  public synchronized void choreForTesting() {
337    chore();
338  }
339
340  /**
341   * The task to execute on each scheduled execution of the Chore
342   */
343  protected abstract void chore();
344
345  /**
346   * Override to run a task before we start looping.
347   * @return true if initial chore was successful
348   */
349  protected boolean initialChore() {
350    // Default does nothing
351    return true;
352  }
353
354  /**
355   * Override to run cleanup tasks when the Chore encounters an error and must stop running
356   */
357  protected synchronized void cleanup() {
358  }
359
360  /**
361   * A summation of this chore in human readable format. Downstream users should not presume
362   * parsing of this string can relaibly be done between versions. Instead, they should rely
363   * on the public accessor methods to get the information they desire.
364   */
365  @InterfaceAudience.Private
366  @Override
367  public String toString() {
368    return "ScheduledChore name=" + getName() + ", period=" + getPeriod() +
369      ", unit=" + getTimeUnit();
370  }
371}