001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.util.concurrent.ScheduledThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.hadoop.hbase.util.MovingAverage; 025import org.apache.hadoop.hbase.util.WindowMovingAverage; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 030 031/** 032 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once 033 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The 034 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for 035 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore 036 * cancellation is logged. Implementers should consider whether or not the Chore will be able to 037 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution 038 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s 039 * thread pool. 040 * <p> 041 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as 042 * an entry being added to a queue, etc. 043 */ 044@InterfaceAudience.Public 045public abstract class ScheduledChore implements Runnable { 046 private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class); 047 048 private final String name; 049 050 /** 051 * Default values for scheduling parameters should they be excluded during construction 052 */ 053 private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; 054 private final static long DEFAULT_INITIAL_DELAY = 0; 055 056 /** 057 * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically 058 */ 059 private final int period; // in TimeUnit units 060 private final TimeUnit timeUnit; 061 private final long initialDelay; // in TimeUnit units 062 063 /** 064 * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is 065 * not scheduled. 066 */ 067 private ChoreServicer choreServicer; 068 069 /** 070 * Variables that encapsulate the meaningful state information 071 */ 072 private long timeOfLastRun = -1; // system time millis 073 private long timeOfThisRun = -1; // system time millis 074 private boolean initialChoreComplete = false; 075 076 /** 077 * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been 078 * stopped, it will cancel itself. This is particularly useful in the case where a single stopper 079 * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} 080 * command can cause many chores to stop together. 081 */ 082 private final Stoppable stopper; 083 084 private final MovingAverage<Void> timeMeasurement; 085 private static final long FIVE_MINUTES_IN_NANOS = TimeUnit.MINUTES.toNanos(5L); 086 private long lastLog = System.nanoTime(); 087 088 interface ChoreServicer { 089 /** 090 * Cancel any ongoing schedules that this chore has with the implementer of this interface. 091 */ 092 public void cancelChore(ScheduledChore chore); 093 public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); 094 095 /** 096 * @return true when the chore is scheduled with the implementer of this interface 097 */ 098 public boolean isChoreScheduled(ScheduledChore chore); 099 100 /** 101 * This method tries to execute the chore immediately. If the chore is executing at the time of 102 * this call, the chore will begin another execution as soon as the current execution finishes 103 * <p> 104 * If the chore is not scheduled with a ChoreService, this call will fail. 105 * @return false when the chore could not be triggered immediately 106 */ 107 public boolean triggerNow(ScheduledChore chore); 108 109 /** 110 * A callback that tells the implementer of this interface that one of the scheduled chores is 111 * missing its start time. The implication of a chore missing its start time is that the 112 * service's current means of scheduling may not be sufficient to handle the number of ongoing 113 * chores (the other explanation is that the chore's execution time is greater than its 114 * scheduled period). The service should try to increase its concurrency when this callback is 115 * received. 116 * @param chore The chore that missed its start time 117 */ 118 public void onChoreMissedStartTime(ScheduledChore chore); 119 } 120 121 /** 122 * This constructor is for test only. It allows us to create an object and to call chore() on it. 123 */ 124 @InterfaceAudience.Private 125 @VisibleForTesting 126 protected ScheduledChore() { 127 this("TestChore", null, 0, DEFAULT_INITIAL_DELAY, DEFAULT_TIME_UNIT); 128 } 129 130 /** 131 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 132 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 133 * @param period Period in millis with which this Chore repeats execution when scheduled. 134 */ 135 public ScheduledChore(final String name, Stoppable stopper, final int period) { 136 this(name, stopper, period, DEFAULT_INITIAL_DELAY); 137 } 138 139 /** 140 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 141 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 142 * @param period Period in millis with which this Chore repeats execution when scheduled. 143 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A 144 * value of 0 means the chore will begin to execute immediately. Negative delays are 145 * invalid and will be corrected to a value of 0. 146 */ 147 public ScheduledChore(final String name, Stoppable stopper, final int period, 148 final long initialDelay) { 149 this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT); 150 } 151 152 /** 153 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type 154 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 155 * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled. 156 * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been 157 * scheduled. A value of 0 means the chore will begin to execute immediately. Negative 158 * delays are invalid and will be corrected to a value of 0. 159 * @param unit The unit that is used to measure period and initialDelay 160 */ 161 public ScheduledChore(final String name, Stoppable stopper, final int period, 162 final long initialDelay, final TimeUnit unit) { 163 this.name = name; 164 this.stopper = stopper; 165 this.period = period; 166 this.initialDelay = initialDelay < 0 ? 0 : initialDelay; 167 this.timeUnit = unit; 168 this.timeMeasurement = new WindowMovingAverage(name); 169 } 170 171 /** 172 * @see java.lang.Runnable#run() 173 */ 174 @Override 175 public void run() { 176 updateTimeTrackingBeforeRun(); 177 if (missedStartTime() && isScheduled()) { 178 onChoreMissedStartTime(); 179 if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time"); 180 } else if (stopper.isStopped() || !isScheduled()) { 181 cancel(false); 182 cleanup(); 183 if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped"); 184 } else { 185 try { 186 if (!initialChoreComplete) { 187 initialChoreComplete = initialChore(); 188 } else { 189 timeMeasurement.measure(() -> { 190 chore(); 191 return null; 192 }); 193 if (LOG.isInfoEnabled() && (System.nanoTime() - lastLog > FIVE_MINUTES_IN_NANOS)) { 194 LOG.info("{} average execution time: {} ns.", getName(), 195 (long)(timeMeasurement.getAverageTime())); 196 lastLog = System.nanoTime(); 197 } 198 } 199 } catch (Throwable t) { 200 if (LOG.isErrorEnabled()) LOG.error("Caught error", t); 201 if (this.stopper.isStopped()) { 202 cancel(false); 203 cleanup(); 204 } 205 } 206 } 207 } 208 209 /** 210 * Update our time tracking members. Called at the start of an execution of this chore's run() 211 * method so that a correct decision can be made as to whether or not we missed the start time 212 */ 213 private synchronized void updateTimeTrackingBeforeRun() { 214 timeOfLastRun = timeOfThisRun; 215 timeOfThisRun = System.currentTimeMillis(); 216 } 217 218 /** 219 * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to 220 * make the decision as to whether or not it would be worthwhile to increase the number of core 221 * pool threads 222 */ 223 private synchronized void onChoreMissedStartTime() { 224 if (choreServicer != null) choreServicer.onChoreMissedStartTime(this); 225 } 226 227 /** 228 * @return How long in millis has it been since this chore last run. Useful for checking if the 229 * chore has missed its scheduled start time by too large of a margin 230 */ 231 synchronized long getTimeBetweenRuns() { 232 return timeOfThisRun - timeOfLastRun; 233 } 234 235 /** 236 * @return true when the time between runs exceeds the acceptable threshold 237 */ 238 private synchronized boolean missedStartTime() { 239 return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) 240 && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); 241 } 242 243 /** 244 * @return max allowed time in millis between runs. 245 */ 246 private double getMaximumAllowedTimeBetweenRuns() { 247 // Threshold used to determine if the Chore's current run started too late 248 return 1.5 * timeUnit.toMillis(period); 249 } 250 251 /** 252 * @param time in system millis 253 * @return true if time is earlier or equal to current milli time 254 */ 255 private synchronized boolean isValidTime(final long time) { 256 return time > 0 && time <= System.currentTimeMillis(); 257 } 258 259 /** 260 * @return false when the Chore is not currently scheduled with a ChoreService 261 */ 262 public synchronized boolean triggerNow() { 263 if (choreServicer != null) { 264 return choreServicer.triggerNow(this); 265 } else { 266 return false; 267 } 268 } 269 270 synchronized void setChoreServicer(ChoreServicer service) { 271 // Chores should only ever be scheduled with a single ChoreService. If the choreServicer 272 // is changing, cancel any existing schedules of this chore. 273 if (choreServicer != null && choreServicer != service) { 274 choreServicer.cancelChore(this, false); 275 } 276 choreServicer = service; 277 timeOfThisRun = System.currentTimeMillis(); 278 } 279 280 public synchronized void cancel() { 281 cancel(true); 282 } 283 284 public synchronized void cancel(boolean mayInterruptIfRunning) { 285 if (isScheduled()) choreServicer.cancelChore(this, mayInterruptIfRunning); 286 287 choreServicer = null; 288 } 289 290 public String getName() { 291 return name; 292 } 293 294 public Stoppable getStopper() { 295 return stopper; 296 } 297 298 /** 299 * @return period to execute chore in getTimeUnit() units 300 */ 301 public int getPeriod() { 302 return period; 303 } 304 305 /** 306 * @return initial delay before executing chore in getTimeUnit() units 307 */ 308 public long getInitialDelay() { 309 return initialDelay; 310 } 311 312 public TimeUnit getTimeUnit() { 313 return timeUnit; 314 } 315 316 public synchronized boolean isInitialChoreComplete() { 317 return initialChoreComplete; 318 } 319 320 @VisibleForTesting 321 synchronized ChoreServicer getChoreServicer() { 322 return choreServicer; 323 } 324 325 @VisibleForTesting 326 synchronized long getTimeOfLastRun() { 327 return timeOfLastRun; 328 } 329 330 @VisibleForTesting 331 synchronized long getTimeOfThisRun() { 332 return timeOfThisRun; 333 } 334 335 /** 336 * @return true when this Chore is scheduled with a ChoreService 337 */ 338 public synchronized boolean isScheduled() { 339 return choreServicer != null && choreServicer.isChoreScheduled(this); 340 } 341 342 @InterfaceAudience.Private 343 @VisibleForTesting 344 public synchronized void choreForTesting() { 345 chore(); 346 } 347 348 /** 349 * The task to execute on each scheduled execution of the Chore 350 */ 351 protected abstract void chore(); 352 353 /** 354 * Override to run a task before we start looping. 355 * @return true if initial chore was successful 356 */ 357 protected boolean initialChore() { 358 // Default does nothing 359 return true; 360 } 361 362 /** 363 * Override to run cleanup tasks when the Chore encounters an error and must stop running 364 */ 365 protected synchronized void cleanup() { 366 } 367 368 /** 369 * A summation of this chore in human readable format. Downstream users should not presume 370 * parsing of this string can relaibly be done between versions. Instead, they should rely 371 * on the public accessor methods to get the information they desire. 372 */ 373 @InterfaceAudience.Private 374 @Override 375 public String toString() { 376 return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: " 377 + getTimeUnit() + "]"; 378 } 379}