001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.util.concurrent.ScheduledThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027/** 028 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once 029 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The 030 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for 031 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore 032 * cancellation is logged. Implementers should consider whether or not the Chore will be able to 033 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution 034 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s 035 * thread pool. 036 * <p> 037 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as 038 * an entry being added to a queue, etc. 039 */ 040@InterfaceAudience.Public 041public abstract class ScheduledChore implements Runnable { 042 private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class); 043 044 private final String name; 045 046 /** 047 * Default values for scheduling parameters should they be excluded during construction 048 */ 049 private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; 050 private final static long DEFAULT_INITIAL_DELAY = 0; 051 052 /** 053 * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically 054 */ 055 private final int period; // in TimeUnit units 056 private final TimeUnit timeUnit; 057 private final long initialDelay; // in TimeUnit units 058 059 /** 060 * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is 061 * not scheduled. 062 */ 063 private ChoreServicer choreServicer; 064 065 /** 066 * Variables that encapsulate the meaningful state information 067 */ 068 private long timeOfLastRun = -1; // system time millis 069 private long timeOfThisRun = -1; // system time millis 070 private boolean initialChoreComplete = false; 071 072 /** 073 * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been 074 * stopped, it will cancel itself. This is particularly useful in the case where a single stopper 075 * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} 076 * command can cause many chores to stop together. 077 */ 078 private final Stoppable stopper; 079 080 interface ChoreServicer { 081 /** 082 * Cancel any ongoing schedules that this chore has with the implementer of this interface. 083 */ 084 public void cancelChore(ScheduledChore chore); 085 public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); 086 087 /** 088 * @return true when the chore is scheduled with the implementer of this interface 089 */ 090 public boolean isChoreScheduled(ScheduledChore chore); 091 092 /** 093 * This method tries to execute the chore immediately. If the chore is executing at the time of 094 * this call, the chore will begin another execution as soon as the current execution finishes 095 * <p> 096 * If the chore is not scheduled with a ChoreService, this call will fail. 097 * @return false when the chore could not be triggered immediately 098 */ 099 public boolean triggerNow(ScheduledChore chore); 100 101 /** 102 * A callback that tells the implementer of this interface that one of the scheduled chores is 103 * missing its start time. The implication of a chore missing its start time is that the 104 * service's current means of scheduling may not be sufficient to handle the number of ongoing 105 * chores (the other explanation is that the chore's execution time is greater than its 106 * scheduled period). The service should try to increase its concurrency when this callback is 107 * received. 108 * @param chore The chore that missed its start time 109 */ 110 public void onChoreMissedStartTime(ScheduledChore chore); 111 } 112 113 /** 114 * This constructor is for test only. It allows us to create an object and to call chore() on it. 115 */ 116 @InterfaceAudience.Private 117 protected ScheduledChore() { 118 this("TestChore", null, 0, DEFAULT_INITIAL_DELAY, DEFAULT_TIME_UNIT); 119 } 120 121 /** 122 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 123 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 124 * @param period Period in millis with which this Chore repeats execution when scheduled. 125 */ 126 public ScheduledChore(final String name, Stoppable stopper, final int period) { 127 this(name, stopper, period, DEFAULT_INITIAL_DELAY); 128 } 129 130 /** 131 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 132 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 133 * @param period Period in millis with which this Chore repeats execution when scheduled. 134 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A 135 * value of 0 means the chore will begin to execute immediately. Negative delays are 136 * invalid and will be corrected to a value of 0. 137 */ 138 public ScheduledChore(final String name, Stoppable stopper, final int period, 139 final long initialDelay) { 140 this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT); 141 } 142 143 /** 144 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 145 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 146 * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled. 147 * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been 148 * scheduled. A value of 0 means the chore will begin to execute immediately. Negative 149 * delays are invalid and will be corrected to a value of 0. 150 * @param unit The unit that is used to measure period and initialDelay 151 */ 152 public ScheduledChore(final String name, Stoppable stopper, final int period, 153 final long initialDelay, final TimeUnit unit) { 154 this.name = name; 155 this.stopper = stopper; 156 this.period = period; 157 this.initialDelay = initialDelay < 0 ? 0 : initialDelay; 158 this.timeUnit = unit; 159 } 160 161 /** 162 * @see java.lang.Runnable#run() 163 */ 164 @Override 165 public void run() { 166 updateTimeTrackingBeforeRun(); 167 if (missedStartTime() && isScheduled()) { 168 onChoreMissedStartTime(); 169 LOG.info("Chore: {} missed its start time", getName()); 170 } else if (stopper.isStopped() || !isScheduled()) { 171 cancel(false); 172 cleanup(); 173 LOG.info("Chore: {} was stopped", getName()); 174 } else { 175 try { 176 // TODO: Histogram metrics per chore name. 177 // For now, just measure and log if DEBUG level logging is enabled. 178 long start = 0; 179 if (LOG.isDebugEnabled()) { 180 start = System.nanoTime(); 181 } 182 if (!initialChoreComplete) { 183 initialChoreComplete = initialChore(); 184 } else { 185 chore(); 186 } 187 if (LOG.isDebugEnabled() && start > 0) { 188 long end = System.nanoTime(); 189 LOG.debug("{} execution time: {} ms.", getName(), 190 TimeUnit.NANOSECONDS.toMillis(end - start)); 191 } 192 } catch (Throwable t) { 193 LOG.error("Caught error", t); 194 if (this.stopper.isStopped()) { 195 cancel(false); 196 cleanup(); 197 } 198 } 199 } 200 } 201 202 /** 203 * Update our time tracking members. Called at the start of an execution of this chore's run() 204 * method so that a correct decision can be made as to whether or not we missed the start time 205 */ 206 private synchronized void updateTimeTrackingBeforeRun() { 207 timeOfLastRun = timeOfThisRun; 208 timeOfThisRun = System.currentTimeMillis(); 209 } 210 211 /** 212 * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to 213 * make the decision as to whether or not it would be worthwhile to increase the number of core 214 * pool threads 215 */ 216 private synchronized void onChoreMissedStartTime() { 217 if (choreServicer != null) choreServicer.onChoreMissedStartTime(this); 218 } 219 220 /** 221 * @return How long in millis has it been since this chore last run. Useful for checking if the 222 * chore has missed its scheduled start time by too large of a margin 223 */ 224 synchronized long getTimeBetweenRuns() { 225 return timeOfThisRun - timeOfLastRun; 226 } 227 228 /** 229 * @return true when the time between runs exceeds the acceptable threshold 230 */ 231 private synchronized boolean missedStartTime() { 232 return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) 233 && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); 234 } 235 236 /** 237 * @return max allowed time in millis between runs. 238 */ 239 private double getMaximumAllowedTimeBetweenRuns() { 240 // Threshold used to determine if the Chore's current run started too late 241 return 1.5 * timeUnit.toMillis(period); 242 } 243 244 /** 245 * @param time in system millis 246 * @return true if time is earlier or equal to current milli time 247 */ 248 private synchronized boolean isValidTime(final long time) { 249 return time > 0 && time <= System.currentTimeMillis(); 250 } 251 252 /** 253 * @return false when the Chore is not currently scheduled with a ChoreService 254 */ 255 public synchronized boolean triggerNow() { 256 if (choreServicer != null) { 257 return choreServicer.triggerNow(this); 258 } else { 259 return false; 260 } 261 } 262 263 synchronized void setChoreServicer(ChoreServicer service) { 264 // Chores should only ever be scheduled with a single ChoreService. If the choreServicer 265 // is changing, cancel any existing schedules of this chore. 266 if (choreServicer != null && choreServicer != service) { 267 choreServicer.cancelChore(this, false); 268 } 269 choreServicer = service; 270 timeOfThisRun = -1; 271 } 272 273 public synchronized void cancel() { 274 cancel(true); 275 } 276 277 public synchronized void cancel(boolean mayInterruptIfRunning) { 278 if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning); 279 280 choreServicer = null; 281 } 282 283 public String getName() { 284 return name; 285 } 286 287 public Stoppable getStopper() { 288 return stopper; 289 } 290 291 /** 292 * @return period to execute chore in getTimeUnit() units 293 */ 294 public int getPeriod() { 295 return period; 296 } 297 298 /** 299 * @return initial delay before executing chore in getTimeUnit() units 300 */ 301 public long getInitialDelay() { 302 return initialDelay; 303 } 304 305 public TimeUnit getTimeUnit() { 306 return timeUnit; 307 } 308 309 public synchronized boolean isInitialChoreComplete() { 310 return initialChoreComplete; 311 } 312 313 @InterfaceAudience.Private 314 synchronized ChoreServicer getChoreServicer() { 315 return choreServicer; 316 } 317 318 @InterfaceAudience.Private 319 synchronized long getTimeOfLastRun() { 320 return timeOfLastRun; 321 } 322 323 @InterfaceAudience.Private 324 synchronized long getTimeOfThisRun() { 325 return timeOfThisRun; 326 } 327 328 /** 329 * @return true when this Chore is scheduled with a ChoreService 330 */ 331 public synchronized boolean isScheduled() { 332 return choreServicer != null && choreServicer.isChoreScheduled(this); 333 } 334 335 @InterfaceAudience.Private 336 public synchronized void choreForTesting() { 337 chore(); 338 } 339 340 /** 341 * The task to execute on each scheduled execution of the Chore 342 */ 343 protected abstract void chore(); 344 345 /** 346 * Override to run a task before we start looping. 347 * @return true if initial chore was successful 348 */ 349 protected boolean initialChore() { 350 // Default does nothing 351 return true; 352 } 353 354 /** 355 * Override to run cleanup tasks when the Chore encounters an error and must stop running 356 */ 357 protected synchronized void cleanup() { 358 } 359 360 /** 361 * A summation of this chore in human readable format. Downstream users should not presume 362 * parsing of this string can relaibly be done between versions. Instead, they should rely 363 * on the public accessor methods to get the information they desire. 364 */ 365 @InterfaceAudience.Private 366 @Override 367 public String toString() { 368 return "ScheduledChore name=" + getName() + ", period=" + getPeriod() + 369 ", unit=" + getTimeUnit(); 370 } 371}