001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.concurrent.ScheduledThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once 030 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The 031 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for 032 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore 033 * cancellation is logged. Implementers should consider whether or not the Chore will be able to 034 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution 035 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s 036 * thread pool. 037 * <p/> 038 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as 039 * an entry being added to a queue, etc. 040 */ 041@InterfaceAudience.Private 042public abstract class ScheduledChore implements Runnable { 043 private static final Logger LOG = LoggerFactory.getLogger(ScheduledChore.class); 044 045 private final String name; 046 047 /** 048 * Default values for scheduling parameters should they be excluded during construction 049 */ 050 private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; 051 private final static long DEFAULT_INITIAL_DELAY = 0; 052 053 /** 054 * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically 055 */ 056 private final int period; // in TimeUnit units 057 private final TimeUnit timeUnit; 058 private final long initialDelay; // in TimeUnit units 059 060 /** 061 * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is 062 * not scheduled. 063 */ 064 private ChoreService choreService; 065 066 /** 067 * Variables that encapsulate the meaningful state information 068 */ 069 private long timeOfLastRun = -1; // system time millis 070 private long timeOfThisRun = -1; // system time millis 071 private boolean initialChoreComplete = false; 072 073 /** 074 * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been 075 * stopped, it will cancel itself. This is particularly useful in the case where a single stopper 076 * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} 077 * command can cause many chores to stop together. 078 */ 079 private final Stoppable stopper; 080 081 /** 082 * Construct a ScheduledChore 083 * <p> 084 * This constructor is for test only. It allows us to create an object and to call chore() on it. 085 */ 086 @InterfaceAudience.Private 087 protected ScheduledChore() { 088 this("TestChore", null, 0, DEFAULT_INITIAL_DELAY, DEFAULT_TIME_UNIT); 089 } 090 091 /** 092 * Construct a ScheduledChore 093 * @param name Name assigned to Chore. Useful for identification amongst chores of the same 094 * type 095 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup 096 * @param period Period in millis with which this Chore repeats execution when scheduled. 097 */ 098 public ScheduledChore(final String name, Stoppable stopper, final int period) { 099 this(name, stopper, period, DEFAULT_INITIAL_DELAY); 100 } 101 102 /** 103 * Construct a ScheduledChore 104 * @param name Name assigned to Chore. Useful for identification amongst chores of the 105 * same type 106 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and 107 * cleanup 108 * @param period Period in millis with which this Chore repeats execution when scheduled. 109 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A 110 * value of 0 means the chore will begin to execute immediately. Negative 111 * delays are invalid and will be corrected to a value of 0. 112 */ 113 public ScheduledChore(final String name, Stoppable stopper, final int period, 114 final long initialDelay) { 115 this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT); 116 } 117 118 /** 119 * Construct a ScheduledChore 120 * @param name Name assigned to Chore. Useful for identification amongst chores of the 121 * same type 122 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and 123 * cleanup 124 * @param period Period in Timeunit unit with which this Chore repeats execution when 125 * scheduled. 126 * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been 127 * scheduled. A value of 0 means the chore will begin to execute immediately. 128 * Negative delays are invalid and will be corrected to a value of 0. 129 * @param unit The unit that is used to measure period and initialDelay 130 */ 131 public ScheduledChore(final String name, Stoppable stopper, final int period, 132 final long initialDelay, final TimeUnit unit) { 133 this.name = name; 134 this.stopper = stopper; 135 this.period = period; 136 this.initialDelay = initialDelay < 0 ? 0 : initialDelay; 137 this.timeUnit = unit; 138 } 139 140 @Override 141 public void run() { 142 updateTimeTrackingBeforeRun(); 143 if (missedStartTime() && isScheduled()) { 144 onChoreMissedStartTime(); 145 LOG.info("Chore: {} missed its start time", getName()); 146 } else if (stopper.isStopped() || !isScheduled()) { 147 // call shutdown here to cleanup the ScheduledChore. 148 shutdown(false); 149 LOG.info("Chore: {} was stopped", getName()); 150 } else { 151 try { 152 // TODO: Histogram metrics per chore name. 153 // For now, just measure and log if DEBUG level logging is enabled. 154 long start = 0; 155 if (LOG.isDebugEnabled()) { 156 start = System.nanoTime(); 157 } 158 if (!initialChoreComplete) { 159 initialChoreComplete = initialChore(); 160 } else { 161 chore(); 162 } 163 if (LOG.isDebugEnabled() && start > 0) { 164 long end = System.nanoTime(); 165 LOG.debug("{} execution time: {} ms.", getName(), 166 TimeUnit.NANOSECONDS.toMillis(end - start)); 167 } 168 } catch (Throwable t) { 169 LOG.error("Caught error", t); 170 if (this.stopper.isStopped()) { 171 cancel(false); 172 } 173 } 174 } 175 } 176 177 /** 178 * Update our time tracking members. Called at the start of an execution of this chore's run() 179 * method so that a correct decision can be made as to whether or not we missed the start time 180 */ 181 private synchronized void updateTimeTrackingBeforeRun() { 182 timeOfLastRun = timeOfThisRun; 183 timeOfThisRun = EnvironmentEdgeManager.currentTime(); 184 } 185 186 /** 187 * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to 188 * make the decision as to whether or not it would be worthwhile to increase the number of core 189 * pool threads 190 */ 191 private synchronized void onChoreMissedStartTime() { 192 if (choreService != null) { 193 choreService.onChoreMissedStartTime(this); 194 } 195 } 196 197 /** 198 * Return how long in millis has it been since this chore last run. Useful for checking if the 199 * chore has missed its scheduled start time by too large of a margin 200 */ 201 synchronized long getTimeBetweenRuns() { 202 return timeOfThisRun - timeOfLastRun; 203 } 204 205 /** Returns true when the time between runs exceeds the acceptable threshold */ 206 private synchronized boolean missedStartTime() { 207 return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) 208 && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); 209 } 210 211 /** Returns max allowed time in millis between runs. */ 212 private double getMaximumAllowedTimeBetweenRuns() { 213 // Threshold used to determine if the Chore's current run started too late 214 return 1.5 * timeUnit.toMillis(period); 215 } 216 217 /** Return true if time is earlier or equal to current time */ 218 private synchronized boolean isValidTime(final long time) { 219 return time > 0 && time <= EnvironmentEdgeManager.currentTime(); 220 } 221 222 /** Returns false when the Chore is not currently scheduled with a ChoreService */ 223 public synchronized boolean triggerNow() { 224 if (choreService == null) { 225 return false; 226 } 227 choreService.triggerNow(this); 228 return true; 229 } 230 231 @RestrictedApi(explanation = "Should only be called in ChoreService", link = "", 232 allowedOnPath = ".*/org/apache/hadoop/hbase/ChoreService.java") 233 synchronized void setChoreService(ChoreService service) { 234 choreService = service; 235 timeOfThisRun = -1; 236 } 237 238 public synchronized void cancel() { 239 cancel(true); 240 } 241 242 public synchronized void cancel(boolean mayInterruptIfRunning) { 243 if (isScheduled()) { 244 choreService.cancelChore(this, mayInterruptIfRunning); 245 } 246 choreService = null; 247 } 248 249 public String getName() { 250 return name; 251 } 252 253 public Stoppable getStopper() { 254 return stopper; 255 } 256 257 /** Returns period to execute chore in getTimeUnit() units */ 258 public int getPeriod() { 259 return period; 260 } 261 262 /** Returns initial delay before executing chore in getTimeUnit() units */ 263 public long getInitialDelay() { 264 return initialDelay; 265 } 266 267 public TimeUnit getTimeUnit() { 268 return timeUnit; 269 } 270 271 public synchronized boolean isInitialChoreComplete() { 272 return initialChoreComplete; 273 } 274 275 synchronized ChoreService getChoreService() { 276 return choreService; 277 } 278 279 synchronized long getTimeOfLastRun() { 280 return timeOfLastRun; 281 } 282 283 synchronized long getTimeOfThisRun() { 284 return timeOfThisRun; 285 } 286 287 /** Returns true when this Chore is scheduled with a ChoreService */ 288 public synchronized boolean isScheduled() { 289 return choreService != null && choreService.isChoreScheduled(this); 290 } 291 292 @InterfaceAudience.Private 293 @RestrictedApi(explanation = "Should only be called in tests", link = "", 294 allowedOnPath = ".*/src/test/.*") 295 public synchronized void choreForTesting() { 296 chore(); 297 } 298 299 /** The task to execute on each scheduled execution of the Chore */ 300 protected abstract void chore(); 301 302 /** 303 * Override to run a task before we start looping. 304 * @return true if initial chore was successful 305 */ 306 protected boolean initialChore() { 307 // Default does nothing 308 return true; 309 } 310 311 /** 312 * Override to run cleanup tasks when the Chore encounters an error and must stop running 313 */ 314 protected void cleanup() { 315 } 316 317 /** 318 * Call {@link #shutdown(boolean)} with {@code true}. 319 * @see ScheduledChore#shutdown(boolean) 320 */ 321 public synchronized void shutdown() { 322 shutdown(true); 323 } 324 325 /** 326 * Completely shutdown the ScheduleChore, which means we will call cleanup and you should not 327 * schedule it again. 328 * <p/> 329 * This is another path to cleanup the chore, comparing to stop the stopper instance passed in. 330 */ 331 public synchronized void shutdown(boolean mayInterruptIfRunning) { 332 cancel(mayInterruptIfRunning); 333 cleanup(); 334 } 335 336 /** 337 * A summation of this chore in human readable format. Downstream users should not presume parsing 338 * of this string can relaibly be done between versions. Instead, they should rely on the public 339 * accessor methods to get the information they desire. 340 */ 341 @InterfaceAudience.Private 342 @Override 343 public String toString() { 344 return "ScheduledChore name=" + getName() + ", period=" + getPeriod() + ", unit=" 345 + getTimeUnit(); 346 } 347}