001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.util.concurrent.ScheduledThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
030 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
031 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
032 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
033 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
034 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
035 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
036 * thread pool.
037 * <p/>
038 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
039 * an entry being added to a queue, etc.
040 */
041@InterfaceAudience.Private
042public abstract class ScheduledChore implements Runnable {
043  private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class);
044
045  private final String name;
046
047  /**
048   * Default values for scheduling parameters should they be excluded during construction
049   */
050  private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
051  private final static long DEFAULT_INITIAL_DELAY = 0;
052
053  /**
054   * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
055   */
056  private final int period; // in TimeUnit units
057  private final TimeUnit timeUnit;
058  private final long initialDelay; // in TimeUnit units
059
060  /**
061   * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
062   * not scheduled.
063   */
064  private ChoreService choreService;
065
066  /**
067   * Variables that encapsulate the meaningful state information
068   */
069  private long timeOfLastRun = -1; // system time millis
070  private long timeOfThisRun = -1; // system time millis
071  private boolean initialChoreComplete = false;
072
073  /**
074   * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
075   * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
076   * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
077   * command can cause many chores to stop together.
078   */
079  private final Stoppable stopper;
080
081  /**
082   * Construct a ScheduledChore
083   * <p>
084   * This constructor is for test only. It allows us to create an object and to call chore() on it.
085   */
086  @InterfaceAudience.Private
087  protected ScheduledChore() {
088    this("TestChore", null, 0, DEFAULT_INITIAL_DELAY, DEFAULT_TIME_UNIT);
089  }
090
091  /**
092   * Construct a ScheduledChore
093   * @param name    Name assigned to Chore. Useful for identification amongst chores of the same
094   *                type
095   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
096   * @param period  Period in millis with which this Chore repeats execution when scheduled.
097   */
098  public ScheduledChore(final String name, Stoppable stopper, final int period) {
099    this(name, stopper, period, DEFAULT_INITIAL_DELAY);
100  }
101
102  /**
103   * Construct a ScheduledChore
104   * @param name         Name assigned to Chore. Useful for identification amongst chores of the
105   *                     same type
106   * @param stopper      When {@link Stoppable#isStopped()} is true, this chore will cancel and
107   *                     cleanup
108   * @param period       Period in millis with which this Chore repeats execution when scheduled.
109   * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
110   *                     value of 0 means the chore will begin to execute immediately. Negative
111   *                     delays are invalid and will be corrected to a value of 0.
112   */
113  public ScheduledChore(final String name, Stoppable stopper, final int period,
114    final long initialDelay) {
115    this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT);
116  }
117
118  /**
119   * Construct a ScheduledChore
120   * @param name         Name assigned to Chore. Useful for identification amongst chores of the
121   *                     same type
122   * @param stopper      When {@link Stoppable#isStopped()} is true, this chore will cancel and
123   *                     cleanup
124   * @param period       Period in Timeunit unit with which this Chore repeats execution when
125   *                     scheduled.
126   * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been
127   *                     scheduled. A value of 0 means the chore will begin to execute immediately.
128   *                     Negative delays are invalid and will be corrected to a value of 0.
129   * @param unit         The unit that is used to measure period and initialDelay
130   */
131  public ScheduledChore(final String name, Stoppable stopper, final int period,
132    final long initialDelay, final TimeUnit unit) {
133    this.name = name;
134    this.stopper = stopper;
135    this.period = period;
136    this.initialDelay = initialDelay < 0 ? 0 : initialDelay;
137    this.timeUnit = unit;
138  }
139
140  @Override
141  public void run() {
142    updateTimeTrackingBeforeRun();
143    if (missedStartTime() && isScheduled()) {
144      onChoreMissedStartTime();
145      LOG.info("Chore: {} missed its start time", getName());
146    } else if (stopper.isStopped() || !isScheduled()) {
147      // call shutdown here to cleanup the ScheduledChore.
148      shutdown(false);
149      LOG.info("Chore: {} was stopped", getName());
150    } else {
151      try {
152        // TODO: Histogram metrics per chore name.
153        // For now, just measure and log if DEBUG level logging is enabled.
154        long start = 0;
155        if (LOG.isDebugEnabled()) {
156          start = System.nanoTime();
157        }
158        if (!initialChoreComplete) {
159          initialChoreComplete = initialChore();
160        } else {
161          chore();
162        }
163        if (LOG.isDebugEnabled() && start > 0) {
164          long end = System.nanoTime();
165          LOG.debug("{} execution time: {} ms.", getName(),
166            TimeUnit.NANOSECONDS.toMillis(end - start));
167        }
168      } catch (Throwable t) {
169        LOG.error("Caught error", t);
170        if (this.stopper.isStopped()) {
171          cancel(false);
172        }
173      }
174    }
175  }
176
177  /**
178   * Update our time tracking members. Called at the start of an execution of this chore's run()
179   * method so that a correct decision can be made as to whether or not we missed the start time
180   */
181  private synchronized void updateTimeTrackingBeforeRun() {
182    timeOfLastRun = timeOfThisRun;
183    timeOfThisRun = EnvironmentEdgeManager.currentTime();
184  }
185
186  /**
187   * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
188   * make the decision as to whether or not it would be worthwhile to increase the number of core
189   * pool threads
190   */
191  private synchronized void onChoreMissedStartTime() {
192    if (choreService != null) {
193      choreService.onChoreMissedStartTime(this);
194    }
195  }
196
197  /**
198   * Return how long in millis has it been since this chore last run. Useful for checking if the
199   * chore has missed its scheduled start time by too large of a margin
200   */
201  synchronized long getTimeBetweenRuns() {
202    return timeOfThisRun - timeOfLastRun;
203  }
204
205  /** Returns true when the time between runs exceeds the acceptable threshold */
206  private synchronized boolean missedStartTime() {
207    return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun)
208      && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
209  }
210
211  /** Returns max allowed time in millis between runs. */
212  private double getMaximumAllowedTimeBetweenRuns() {
213    // Threshold used to determine if the Chore's current run started too late
214    return 1.5 * timeUnit.toMillis(period);
215  }
216
217  /** Return true if time is earlier or equal to current time */
218  private synchronized boolean isValidTime(final long time) {
219    return time > 0 && time <= EnvironmentEdgeManager.currentTime();
220  }
221
222  /** Returns false when the Chore is not currently scheduled with a ChoreService */
223  public synchronized boolean triggerNow() {
224    if (choreService == null) {
225      return false;
226    }
227    choreService.triggerNow(this);
228    return true;
229  }
230
231  @RestrictedApi(explanation = "Should only be called in ChoreService", link = "",
232      allowedOnPath = ".*/org/apache/hadoop/hbase/ChoreService.java")
233  synchronized void setChoreService(ChoreService service) {
234    choreService = service;
235    timeOfThisRun = -1;
236  }
237
238  public synchronized void cancel() {
239    cancel(true);
240  }
241
242  public synchronized void cancel(boolean mayInterruptIfRunning) {
243    if (isScheduled()) {
244      choreService.cancelChore(this, mayInterruptIfRunning);
245    }
246    choreService = null;
247  }
248
249  public String getName() {
250    return name;
251  }
252
253  public Stoppable getStopper() {
254    return stopper;
255  }
256
257  /** Returns period to execute chore in getTimeUnit() units */
258  public int getPeriod() {
259    return period;
260  }
261
262  /** Returns initial delay before executing chore in getTimeUnit() units */
263  public long getInitialDelay() {
264    return initialDelay;
265  }
266
267  public TimeUnit getTimeUnit() {
268    return timeUnit;
269  }
270
271  public synchronized boolean isInitialChoreComplete() {
272    return initialChoreComplete;
273  }
274
275  synchronized ChoreService getChoreService() {
276    return choreService;
277  }
278
279  synchronized long getTimeOfLastRun() {
280    return timeOfLastRun;
281  }
282
283  synchronized long getTimeOfThisRun() {
284    return timeOfThisRun;
285  }
286
287  /** Returns true when this Chore is scheduled with a ChoreService */
288  public synchronized boolean isScheduled() {
289    return choreService != null && choreService.isChoreScheduled(this);
290  }
291
292  @InterfaceAudience.Private
293  @RestrictedApi(explanation = "Should only be called in tests", link = "",
294      allowedOnPath = ".*/src/test/.*")
295  public synchronized void choreForTesting() {
296    chore();
297  }
298
299  /** The task to execute on each scheduled execution of the Chore */
300  protected abstract void chore();
301
302  /**
303   * Override to run a task before we start looping.
304   * @return true if initial chore was successful
305   */
306  protected boolean initialChore() {
307    // Default does nothing
308    return true;
309  }
310
311  /**
312   * Override to run cleanup tasks when the Chore encounters an error and must stop running
313   */
314  protected void cleanup() {
315  }
316
317  /**
318   * Call {@link #shutdown(boolean)} with {@code true}.
319   * @see ScheduledChore#shutdown(boolean)
320   */
321  public synchronized void shutdown() {
322    shutdown(true);
323  }
324
325  /**
326   * Completely shutdown the ScheduleChore, which means we will call cleanup and you should not
327   * schedule it again.
328   * <p/>
329   * This is another path to cleanup the chore, comparing to stop the stopper instance passed in.
330   */
331  public synchronized void shutdown(boolean mayInterruptIfRunning) {
332    cancel(mayInterruptIfRunning);
333    cleanup();
334  }
335
336  /**
337   * A summation of this chore in human readable format. Downstream users should not presume parsing
338   * of this string can relaibly be done between versions. Instead, they should rely on the public
339   * accessor methods to get the information they desire.
340   */
341  @InterfaceAudience.Private
342  @Override
343  public String toString() {
344    return "ScheduledChore name=" + getName() + ", period=" + getPeriod() + ", unit="
345      + getTimeUnit();
346  }
347}