1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase; 20 21 import java.util.concurrent.ScheduledThreadPoolExecutor; 22 import java.util.concurrent.TimeUnit; 23 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.apache.hadoop.hbase.classification.InterfaceAudience; 27 28 import com.google.common.annotations.VisibleForTesting; 29 30 /** 31 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once 32 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The 33 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for 34 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore 35 * cancellation is logged. Implementers should consider whether or not the Chore will be able to 36 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution 37 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s 38 * thread pool. 39 * <p> 40 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as 41 * an entry being added to a queue, etc. 42 */ 43 @InterfaceAudience.Private 44 public abstract class ScheduledChore implements Runnable { 45 private static final Log LOG = LogFactory.getLog(ScheduledChore.class); 46 47 private final String name; 48 49 /** 50 * Default values for scheduling parameters should they be excluded during construction 51 */ 52 private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; 53 private final static long DEFAULT_INITIAL_DELAY = 0; 54 55 /** 56 * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically 57 */ 58 private final int period; // in TimeUnit units 59 private final TimeUnit timeUnit; 60 private final long initialDelay; // in TimeUnit units 61 62 /** 63 * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is 64 * not scheduled. 65 */ 66 private ChoreServicer choreServicer; 67 68 /** 69 * Variables that encapsulate the meaningful state information 70 */ 71 private long timeOfLastRun = -1; // system time millis 72 private long timeOfThisRun = -1; // system time millis 73 private boolean initialChoreComplete = false; 74 75 /** 76 * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been 77 * stopped, it will cancel itself. This is particularly useful in the case where a single stopper 78 * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} 79 * command can cause many chores to stop together. 80 */ 81 private final Stoppable stopper; 82 83 interface ChoreServicer { 84 /** 85 * Cancel any ongoing schedules that this chore has with the implementer of this interface. 86 */ 87 public void cancelChore(ScheduledChore chore); 88 public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); 89 90 /** 91 * @return true when the chore is scheduled with the implementer of this interface 92 */ 93 public boolean isChoreScheduled(ScheduledChore chore); 94 95 /** 96 * This method tries to execute the chore immediately. If the chore is executing at the time of 97 * this call, the chore will begin another execution as soon as the current execution finishes 98 * <p> 99 * If the chore is not scheduled with a ChoreService, this call will fail. 100 * @return false when the chore could not be triggered immediately 101 */ 102 public boolean triggerNow(ScheduledChore chore); 103 104 /** 105 * A callback that tells the implementer of this interface that one of the scheduled chores is 106 * missing its start time. The implication of a chore missing its start time is that the 107 * service's current means of scheduling may not be sufficient to handle the number of ongoing 108 * chores (the other explanation is that the chore's execution time is greater than its 109 * scheduled period). The service should try to increase its concurrency when this callback is 110 * received. 111 * @param chore The chore that missed its start time 112 */ 113 public void onChoreMissedStartTime(ScheduledChore chore); 114 } 115 116 /** 117 * This constructor is for test only. It allows us to create an object and to call chore() on it. 118 */ 119 protected ScheduledChore() { 120 this.name = null; 121 this.stopper = null; 122 this.period = 0; 123 this.initialDelay = DEFAULT_INITIAL_DELAY; 124 this.timeUnit = DEFAULT_TIME_UNIT; 125 } 126 127 /** 128 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 129 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 130 * @param period Period in millis with which this Chore repeats execution when scheduled. 131 */ 132 public ScheduledChore(final String name, Stoppable stopper, final int period) { 133 this(name, stopper, period, DEFAULT_INITIAL_DELAY); 134 } 135 136 /** 137 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 138 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 139 * @param period Period in millis with which this Chore repeats execution when scheduled. 140 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A 141 * value of 0 means the chore will begin to execute immediately. Negative delays are 142 * invalid and will be corrected to a value of 0. 143 */ 144 public ScheduledChore(final String name, Stoppable stopper, final int period, 145 final long initialDelay) { 146 this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT); 147 } 148 149 /** 150 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 151 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 152 * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled. 153 * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been 154 * scheduled. A value of 0 means the chore will begin to execute immediately. Negative 155 * delays are invalid and will be corrected to a value of 0. 156 * @param unit The unit that is used to measure period and initialDelay 157 */ 158 public ScheduledChore(final String name, Stoppable stopper, final int period, 159 final long initialDelay, final TimeUnit unit) { 160 this.name = name; 161 this.stopper = stopper; 162 this.period = period; 163 this.initialDelay = initialDelay < 0 ? 0 : initialDelay; 164 this.timeUnit = unit; 165 } 166 167 /** 168 * @see java.lang.Thread#run() 169 */ 170 @Override 171 public void run() { 172 updateTimeTrackingBeforeRun(); 173 if (missedStartTime() && isScheduled()) { 174 onChoreMissedStartTime(); 175 if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time"); 176 } else if (stopper.isStopped() || !isScheduled()) { 177 cancel(false); 178 cleanup(); 179 if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped"); 180 } else { 181 try { 182 if (!initialChoreComplete) { 183 initialChoreComplete = initialChore(); 184 } else { 185 chore(); 186 } 187 } catch (Throwable t) { 188 if (LOG.isErrorEnabled()) LOG.error("Caught error", t); 189 if (this.stopper.isStopped()) { 190 cancel(false); 191 cleanup(); 192 } 193 } 194 } 195 } 196 197 /** 198 * Update our time tracking members. Called at the start of an execution of this chore's run() 199 * method so that a correct decision can be made as to whether or not we missed the start time 200 */ 201 private synchronized void updateTimeTrackingBeforeRun() { 202 timeOfLastRun = timeOfThisRun; 203 timeOfThisRun = System.currentTimeMillis(); 204 } 205 206 /** 207 * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to 208 * make the decision as to whether or not it would be worthwhile to increase the number of core 209 * pool threads 210 */ 211 private synchronized void onChoreMissedStartTime() { 212 if (choreServicer != null) choreServicer.onChoreMissedStartTime(this); 213 } 214 215 /** 216 * @return How long in millis has it been since this chore last run. Useful for checking if the 217 * chore has missed its scheduled start time by too large of a margin 218 */ 219 synchronized long getTimeBetweenRuns() { 220 return timeOfThisRun - timeOfLastRun; 221 } 222 223 /** 224 * @return true when the time between runs exceeds the acceptable threshold 225 */ 226 private synchronized boolean missedStartTime() { 227 return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) 228 && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); 229 } 230 231 /** 232 * @return max allowed time in millis between runs. 233 */ 234 private double getMaximumAllowedTimeBetweenRuns() { 235 // Threshold used to determine if the Chore's current run started too late 236 return 1.5 * timeUnit.toMillis(period); 237 } 238 239 /** 240 * @param time in system millis 241 * @return true if time is earlier or equal to current milli time 242 */ 243 private synchronized boolean isValidTime(final long time) { 244 return time > 0 && time <= System.currentTimeMillis(); 245 } 246 247 /** 248 * @return false when the Chore is not currently scheduled with a ChoreService 249 */ 250 public synchronized boolean triggerNow() { 251 if (choreServicer != null) { 252 return choreServicer.triggerNow(this); 253 } else { 254 return false; 255 } 256 } 257 258 synchronized void setChoreServicer(ChoreServicer service) { 259 // Chores should only ever be scheduled with a single ChoreService. If the choreServicer 260 // is changing, cancel any existing schedules of this chore. 261 if (choreServicer != null && choreServicer != service) { 262 choreServicer.cancelChore(this, false); 263 } 264 choreServicer = service; 265 timeOfThisRun = System.currentTimeMillis(); 266 } 267 268 public synchronized void cancel() { 269 cancel(true); 270 } 271 272 public synchronized void cancel(boolean mayInterruptIfRunning) { 273 if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning); 274 275 choreServicer = null; 276 } 277 278 public String getName() { 279 return name; 280 } 281 282 public Stoppable getStopper() { 283 return stopper; 284 } 285 286 /** 287 * @return period to execute chore in getTimeUnit() units 288 */ 289 public int getPeriod() { 290 return period; 291 } 292 293 /** 294 * @return initial delay before executing chore in getTimeUnit() units 295 */ 296 public long getInitialDelay() { 297 return initialDelay; 298 } 299 300 public TimeUnit getTimeUnit() { 301 return timeUnit; 302 } 303 304 public synchronized boolean isInitialChoreComplete() { 305 return initialChoreComplete; 306 } 307 308 @VisibleForTesting 309 synchronized ChoreServicer getChoreServicer() { 310 return choreServicer; 311 } 312 313 @VisibleForTesting 314 synchronized long getTimeOfLastRun() { 315 return timeOfLastRun; 316 } 317 318 @VisibleForTesting 319 synchronized long getTimeOfThisRun() { 320 return timeOfThisRun; 321 } 322 323 /** 324 * @return true when this Chore is scheduled with a ChoreService 325 */ 326 public synchronized boolean isScheduled() { 327 return choreServicer != null && choreServicer.isChoreScheduled(this); 328 } 329 330 @VisibleForTesting 331 public synchronized void choreForTesting() { 332 chore(); 333 } 334 335 /** 336 * The task to execute on each scheduled execution of the Chore 337 */ 338 protected abstract void chore(); 339 340 /** 341 * Override to run a task before we start looping. 342 * @return true if initial chore was successful 343 */ 344 protected boolean initialChore() { 345 // Default does nothing 346 return true; 347 } 348 349 /** 350 * Override to run cleanup tasks when the Chore encounters an error and must stop running 351 */ 352 protected synchronized void cleanup() { 353 } 354 355 @Override 356 public String toString() { 357 return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: " 358 + getTimeUnit() + "]"; 359 } 360 }