001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.util.concurrent.ScheduledThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 028 029/** 030 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once 031 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The 032 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for 033 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore 034 * cancellation is logged. Implementers should consider whether or not the Chore will be able to 035 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution 036 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s 037 * thread pool. 038 * <p> 039 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as 040 * an entry being added to a queue, etc. 041 */ 042@InterfaceAudience.Public 043public abstract class ScheduledChore implements Runnable { 044 private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class); 045 046 private final String name; 047 048 /** 049 * Default values for scheduling parameters should they be excluded during construction 050 */ 051 private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; 052 private final static long DEFAULT_INITIAL_DELAY = 0; 053 054 /** 055 * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically 056 */ 057 private final int period; // in TimeUnit units 058 private final TimeUnit timeUnit; 059 private final long initialDelay; // in TimeUnit units 060 061 /** 062 * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is 063 * not scheduled. 064 */ 065 private ChoreServicer choreServicer; 066 067 /** 068 * Variables that encapsulate the meaningful state information 069 */ 070 private long timeOfLastRun = -1; // system time millis 071 private long timeOfThisRun = -1; // system time millis 072 private boolean initialChoreComplete = false; 073 074 /** 075 * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been 076 * stopped, it will cancel itself. This is particularly useful in the case where a single stopper 077 * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} 078 * command can cause many chores to stop together. 079 */ 080 private final Stoppable stopper; 081 082 interface ChoreServicer { 083 /** 084 * Cancel any ongoing schedules that this chore has with the implementer of this interface. 085 */ 086 public void cancelChore(ScheduledChore chore); 087 public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); 088 089 /** 090 * @return true when the chore is scheduled with the implementer of this interface 091 */ 092 public boolean isChoreScheduled(ScheduledChore chore); 093 094 /** 095 * This method tries to execute the chore immediately. If the chore is executing at the time of 096 * this call, the chore will begin another execution as soon as the current execution finishes 097 * <p> 098 * If the chore is not scheduled with a ChoreService, this call will fail. 099 * @return false when the chore could not be triggered immediately 100 */ 101 public boolean triggerNow(ScheduledChore chore); 102 103 /** 104 * A callback that tells the implementer of this interface that one of the scheduled chores is 105 * missing its start time. The implication of a chore missing its start time is that the 106 * service's current means of scheduling may not be sufficient to handle the number of ongoing 107 * chores (the other explanation is that the chore's execution time is greater than its 108 * scheduled period). The service should try to increase its concurrency when this callback is 109 * received. 110 * @param chore The chore that missed its start time 111 */ 112 public void onChoreMissedStartTime(ScheduledChore chore); 113 } 114 115 /** 116 * This constructor is for test only. It allows us to create an object and to call chore() on it. 117 */ 118 @InterfaceAudience.Private 119 @VisibleForTesting 120 protected ScheduledChore() { 121 this("TestChore", null, 0, DEFAULT_INITIAL_DELAY, DEFAULT_TIME_UNIT); 122 } 123 124 /** 125 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 126 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 127 * @param period Period in millis with which this Chore repeats execution when scheduled. 128 */ 129 public ScheduledChore(final String name, Stoppable stopper, final int period) { 130 this(name, stopper, period, DEFAULT_INITIAL_DELAY); 131 } 132 133 /** 134 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 135 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 136 * @param period Period in millis with which this Chore repeats execution when scheduled. 137 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A 138 * value of 0 means the chore will begin to execute immediately. Negative delays are 139 * invalid and will be corrected to a value of 0. 140 */ 141 public ScheduledChore(final String name, Stoppable stopper, final int period, 142 final long initialDelay) { 143 this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT); 144 } 145 146 /** 147 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 148 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 149 * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled. 150 * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been 151 * scheduled. A value of 0 means the chore will begin to execute immediately. Negative 152 * delays are invalid and will be corrected to a value of 0. 153 * @param unit The unit that is used to measure period and initialDelay 154 */ 155 public ScheduledChore(final String name, Stoppable stopper, final int period, 156 final long initialDelay, final TimeUnit unit) { 157 this.name = name; 158 this.stopper = stopper; 159 this.period = period; 160 this.initialDelay = initialDelay < 0 ? 0 : initialDelay; 161 this.timeUnit = unit; 162 } 163 164 /** 165 * @see java.lang.Runnable#run() 166 */ 167 @Override 168 public void run() { 169 updateTimeTrackingBeforeRun(); 170 if (missedStartTime() && isScheduled()) { 171 onChoreMissedStartTime(); 172 LOG.info("Chore: {} missed its start time", getName()); 173 } else if (stopper.isStopped() || !isScheduled()) { 174 cancel(false); 175 cleanup(); 176 LOG.info("Chore: {} was stopped", getName()); 177 } else { 178 try { 179 // TODO: Histogram metrics per chore name. 180 // For now, just measure and log if DEBUG level logging is enabled. 181 long start = 0; 182 if (LOG.isDebugEnabled()) { 183 start = System.nanoTime(); 184 } 185 if (!initialChoreComplete) { 186 initialChoreComplete = initialChore(); 187 } else { 188 chore(); 189 } 190 if (LOG.isDebugEnabled() && start > 0) { 191 long end = System.nanoTime(); 192 LOG.debug("{} execution time: {} ms.", getName(), 193 TimeUnit.NANOSECONDS.toMillis(end - start)); 194 } 195 } catch (Throwable t) { 196 LOG.error("Caught error", t); 197 if (this.stopper.isStopped()) { 198 cancel(false); 199 cleanup(); 200 } 201 } 202 } 203 } 204 205 /** 206 * Update our time tracking members. Called at the start of an execution of this chore's run() 207 * method so that a correct decision can be made as to whether or not we missed the start time 208 */ 209 private synchronized void updateTimeTrackingBeforeRun() { 210 timeOfLastRun = timeOfThisRun; 211 timeOfThisRun = System.currentTimeMillis(); 212 } 213 214 /** 215 * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to 216 * make the decision as to whether or not it would be worthwhile to increase the number of core 217 * pool threads 218 */ 219 private synchronized void onChoreMissedStartTime() { 220 if (choreServicer != null) choreServicer.onChoreMissedStartTime(this); 221 } 222 223 /** 224 * @return How long in millis has it been since this chore last run. Useful for checking if the 225 * chore has missed its scheduled start time by too large of a margin 226 */ 227 synchronized long getTimeBetweenRuns() { 228 return timeOfThisRun - timeOfLastRun; 229 } 230 231 /** 232 * @return true when the time between runs exceeds the acceptable threshold 233 */ 234 private synchronized boolean missedStartTime() { 235 return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) 236 && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); 237 } 238 239 /** 240 * @return max allowed time in millis between runs. 241 */ 242 private double getMaximumAllowedTimeBetweenRuns() { 243 // Threshold used to determine if the Chore's current run started too late 244 return 1.5 * timeUnit.toMillis(period); 245 } 246 247 /** 248 * @param time in system millis 249 * @return true if time is earlier or equal to current milli time 250 */ 251 private synchronized boolean isValidTime(final long time) { 252 return time > 0 && time <= System.currentTimeMillis(); 253 } 254 255 /** 256 * @return false when the Chore is not currently scheduled with a ChoreService 257 */ 258 public synchronized boolean triggerNow() { 259 if (choreServicer != null) { 260 return choreServicer.triggerNow(this); 261 } else { 262 return false; 263 } 264 } 265 266 synchronized void setChoreServicer(ChoreServicer service) { 267 // Chores should only ever be scheduled with a single ChoreService. If the choreServicer 268 // is changing, cancel any existing schedules of this chore. 269 if (choreServicer != null && choreServicer != service) { 270 choreServicer.cancelChore(this, false); 271 } 272 choreServicer = service; 273 timeOfThisRun = System.currentTimeMillis(); 274 } 275 276 public synchronized void cancel() { 277 cancel(true); 278 } 279 280 public synchronized void cancel(boolean mayInterruptIfRunning) { 281 if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning); 282 283 choreServicer = null; 284 } 285 286 public String getName() { 287 return name; 288 } 289 290 public Stoppable getStopper() { 291 return stopper; 292 } 293 294 /** 295 * @return period to execute chore in getTimeUnit() units 296 */ 297 public int getPeriod() { 298 return period; 299 } 300 301 /** 302 * @return initial delay before executing chore in getTimeUnit() units 303 */ 304 public long getInitialDelay() { 305 return initialDelay; 306 } 307 308 public TimeUnit getTimeUnit() { 309 return timeUnit; 310 } 311 312 public synchronized boolean isInitialChoreComplete() { 313 return initialChoreComplete; 314 } 315 316 @VisibleForTesting 317 synchronized ChoreServicer getChoreServicer() { 318 return choreServicer; 319 } 320 321 @VisibleForTesting 322 synchronized long getTimeOfLastRun() { 323 return timeOfLastRun; 324 } 325 326 @VisibleForTesting 327 synchronized long getTimeOfThisRun() { 328 return timeOfThisRun; 329 } 330 331 /** 332 * @return true when this Chore is scheduled with a ChoreService 333 */ 334 public synchronized boolean isScheduled() { 335 return choreServicer != null && choreServicer.isChoreScheduled(this); 336 } 337 338 @InterfaceAudience.Private 339 @VisibleForTesting 340 public synchronized void choreForTesting() { 341 chore(); 342 } 343 344 /** 345 * The task to execute on each scheduled execution of the Chore 346 */ 347 protected abstract void chore(); 348 349 /** 350 * Override to run a task before we start looping. 351 * @return true if initial chore was successful 352 */ 353 protected boolean initialChore() { 354 // Default does nothing 355 return true; 356 } 357 358 /** 359 * Override to run cleanup tasks when the Chore encounters an error and must stop running 360 */ 361 protected synchronized void cleanup() { 362 } 363 364 /** 365 * A summation of this chore in human readable format. Downstream users should not presume 366 * parsing of this string can relaibly be done between versions. Instead, they should rely 367 * on the public accessor methods to get the information they desire. 368 */ 369 @InterfaceAudience.Private 370 @Override 371 public String toString() { 372 return "ScheduledChore name=" + getName() + ", period=" + getPeriod() + 373 ", unit=" + getTimeUnit(); 374 } 375}