001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.LinkedHashMap; 024import java.util.Map.Entry; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.ScheduledThreadPoolExecutor; 027import java.util.concurrent.ThreadFactory; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.hbase.trace.TraceUtil; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run 036 * periodically while sharing threads. The ChoreService is backed by a 037 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the 038 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the 039 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads. 040 * <p> 041 * The ChoreService provides the ability to schedule, cancel, and trigger instances of 042 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of 043 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling 044 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled, 045 * there may be a need to increase the number of threads if it is noticed that chores are no longer 046 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is 047 * made to reduce the number of running threads to see if chores can still meet their start times 048 * with a smaller thread pool. 049 * <p> 050 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}. 051 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly. 052 */ 053@InterfaceAudience.Private 054public class ChoreService { 055 private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class); 056 057 /** 058 * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor 059 */ 060 @InterfaceAudience.Private 061 public final static int MIN_CORE_POOL_SIZE = 1; 062 063 /** 064 * This thread pool is used to schedule all of the Chores 065 */ 066 private final ScheduledThreadPoolExecutor scheduler; 067 068 /** 069 * Maps chores to their futures. Futures are used to control a chore's schedule 070 */ 071 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores; 072 073 /** 074 * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the 075 * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to 076 * increase the core pool size by 1 (otherwise a single long running chore whose execution is 077 * longer than its period would be able to spawn too many threads). 078 */ 079 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime; 080 081 /** 082 * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the 083 * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is 084 * running on. The prefix is useful because it allows us to monitor how the thread pool of a 085 * particular service changes over time VIA thread dumps. 086 */ 087 private final String coreThreadPoolPrefix; 088 089 /** 090 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 091 * spawned by this service 092 */ 093 @InterfaceAudience.Private 094 public ChoreService(final String coreThreadPoolPrefix) { 095 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false); 096 } 097 098 /** 099 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 100 * spawned by this service 101 * @param jitter Should chore service add some jitter for all of the scheduled 102 * chores. When set to true this will add -10% to 10% jitter. 103 */ 104 public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) { 105 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter); 106 } 107 108 /** 109 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 110 * spawned by this service 111 * @param corePoolSize The initial size to set the core pool of the 112 * ScheduledThreadPoolExecutor to during initialization. The default 113 * size is 1, but specifying a larger size may be beneficial if you 114 * know that 1 thread will not be enough. 115 * @param jitter Should chore service add some jitter for all of the scheduled 116 * chores. When set to true this will add -10% to 10% jitter. 117 */ 118 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) { 119 this.coreThreadPoolPrefix = coreThreadPoolPrefix; 120 if (corePoolSize < MIN_CORE_POOL_SIZE) { 121 corePoolSize = MIN_CORE_POOL_SIZE; 122 } 123 124 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); 125 if (jitter) { 126 scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1); 127 } else { 128 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); 129 } 130 131 scheduler.setRemoveOnCancelPolicy(true); 132 scheduledChores = new HashMap<>(); 133 choresMissingStartTime = new HashMap<>(); 134 } 135 136 /** 137 * Schedule a chore. 138 * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService 139 * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled 140 * with a single ChoreService instance). 141 * @return true when the chore was successfully scheduled. false when the scheduling failed 142 * (typically occurs when a chore is scheduled during shutdown of service) 143 */ 144 public boolean scheduleChore(ScheduledChore chore) { 145 if (chore == null) { 146 return false; 147 } 148 // always lock chore first to prevent dead lock 149 synchronized (chore) { 150 synchronized (this) { 151 try { 152 // Chores should only ever be scheduled with a single ChoreService. If the choreService 153 // is changing, cancel any existing schedules of this chore. 154 if (chore.getChoreService() == this) { 155 LOG.warn("Chore {} has already been scheduled with us", chore); 156 return false; 157 } 158 if (chore.getPeriod() <= 0) { 159 LOG.info("Chore {} is disabled because its period is not positive.", chore); 160 return false; 161 } 162 LOG.info("Chore {} is enabled.", chore); 163 if (chore.getChoreService() != null) { 164 LOG.info("Cancel chore {} from its previous service", chore); 165 chore.getChoreService().cancelChore(chore); 166 } 167 chore.setChoreService(this); 168 ScheduledFuture<?> future = 169 scheduler.scheduleAtFixedRate(TraceUtil.tracedRunnable(chore, chore.getName()), 170 chore.getInitialDelay(), chore.getPeriod(), chore.getTimeUnit()); 171 scheduledChores.put(chore, future); 172 return true; 173 } catch (Exception e) { 174 LOG.error("Could not successfully schedule chore: {}", chore.getName(), e); 175 return false; 176 } 177 } 178 } 179 } 180 181 /** 182 * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService 183 * yet then this call is equivalent to a call to scheduleChore. 184 */ 185 private void rescheduleChore(ScheduledChore chore) { 186 if (scheduledChores.containsKey(chore)) { 187 ScheduledFuture<?> future = scheduledChores.get(chore); 188 future.cancel(false); 189 } 190 ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), 191 chore.getPeriod(), chore.getTimeUnit()); 192 scheduledChores.put(chore, future); 193 } 194 195 /** 196 * Cancel any ongoing schedules that this chore has with the implementer of this interface. 197 * <p/> 198 * Call {@link ScheduledChore#cancel()} to cancel a {@link ScheduledChore}, in 199 * {@link ScheduledChore#cancel()} method we will call this method to remove the 200 * {@link ScheduledChore} from this {@link ChoreService}. 201 */ 202 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", 203 allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java") 204 synchronized void cancelChore(ScheduledChore chore) { 205 cancelChore(chore, true); 206 } 207 208 /** 209 * Cancel any ongoing schedules that this chore has with the implementer of this interface. 210 * <p/> 211 * Call {@link ScheduledChore#cancel(boolean)} to cancel a {@link ScheduledChore}, in 212 * {@link ScheduledChore#cancel(boolean)} method we will call this method to remove the 213 * {@link ScheduledChore} from this {@link ChoreService}. 214 */ 215 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", 216 allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java") 217 synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { 218 if (scheduledChores.containsKey(chore)) { 219 ScheduledFuture<?> future = scheduledChores.get(chore); 220 future.cancel(mayInterruptIfRunning); 221 scheduledChores.remove(chore); 222 223 // Removing a chore that was missing its start time means it may be possible 224 // to reduce the number of threads 225 if (choresMissingStartTime.containsKey(chore)) { 226 choresMissingStartTime.remove(chore); 227 requestCorePoolDecrease(); 228 } 229 } 230 } 231 232 /** Returns true when the chore is scheduled with the implementer of this interface */ 233 @InterfaceAudience.Private 234 public synchronized boolean isChoreScheduled(ScheduledChore chore) { 235 return chore != null && scheduledChores.containsKey(chore) 236 && !scheduledChores.get(chore).isDone(); 237 } 238 239 /** 240 * This method tries to execute the chore immediately. If the chore is executing at the time of 241 * this call, the chore will begin another execution as soon as the current execution finishes 242 */ 243 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", 244 allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java") 245 synchronized void triggerNow(ScheduledChore chore) { 246 assert chore.getChoreService() == this; 247 rescheduleChore(chore); 248 } 249 250 /** Returns number of chores that this service currently has scheduled */ 251 int getNumberOfScheduledChores() { 252 return scheduledChores.size(); 253 } 254 255 /** 256 * Return number of chores that this service currently has scheduled that are missing their 257 * scheduled start time 258 */ 259 int getNumberOfChoresMissingStartTime() { 260 return choresMissingStartTime.size(); 261 } 262 263 /** Returns number of threads in the core pool of the underlying ScheduledThreadPoolExecutor */ 264 int getCorePoolSize() { 265 return scheduler.getCorePoolSize(); 266 } 267 268 /** 269 * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are 270 * daemon threads, and thus, don't prevent the JVM from shutting down 271 */ 272 static class ChoreServiceThreadFactory implements ThreadFactory { 273 private final String threadPrefix; 274 private final static String THREAD_NAME_SUFFIX = ".Chore."; 275 private AtomicInteger threadNumber = new AtomicInteger(1); 276 277 public ChoreServiceThreadFactory(final String threadPrefix) { 278 this.threadPrefix = threadPrefix; 279 } 280 281 @Override 282 public Thread newThread(Runnable r) { 283 Thread thread = 284 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement()); 285 thread.setDaemon(true); 286 return thread; 287 } 288 } 289 290 /** 291 * Represents a request to increase the number of core pool threads. Typically a request 292 * originates from the fact that the current core pool size is not sufficient to service all of 293 * the currently running Chores 294 * @return true when the request to increase the core pool size succeeds 295 */ 296 private synchronized boolean requestCorePoolIncrease() { 297 // There is no point in creating more threads than scheduledChores.size since scheduled runs 298 // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced 299 // amongst occurrences of the same chore). 300 if (scheduler.getCorePoolSize() < scheduledChores.size()) { 301 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1); 302 printChoreServiceDetails("requestCorePoolIncrease"); 303 return true; 304 } 305 return false; 306 } 307 308 /** 309 * Represents a request to decrease the number of core pool threads. Typically a request 310 * originates from the fact that the current core pool size is more than sufficient to service the 311 * running Chores. 312 */ 313 private synchronized void requestCorePoolDecrease() { 314 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) { 315 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1); 316 printChoreServiceDetails("requestCorePoolDecrease"); 317 } 318 } 319 320 /** 321 * A callback that tells the implementer of this interface that one of the scheduled chores is 322 * missing its start time. The implication of a chore missing its start time is that the service's 323 * current means of scheduling may not be sufficient to handle the number of ongoing chores (the 324 * other explanation is that the chore's execution time is greater than its scheduled period). The 325 * service should try to increase its concurrency when this callback is received. 326 * @param chore The chore that missed its start time 327 */ 328 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", 329 allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java") 330 synchronized void onChoreMissedStartTime(ScheduledChore chore) { 331 if (!scheduledChores.containsKey(chore)) { 332 return; 333 } 334 335 // If the chore has not caused an increase in the size of the core thread pool then request an 336 // increase. This allows each chore missing its start time to increase the core pool size by 337 // at most 1. 338 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) { 339 choresMissingStartTime.put(chore, requestCorePoolIncrease()); 340 } 341 342 // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If 343 // the chore is NOT rescheduled, future executions of this chore will be delayed more and 344 // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates 345 // idle threads to chores based on how delayed they are. 346 rescheduleChore(chore); 347 printChoreDetails("onChoreMissedStartTime", chore); 348 } 349 350 /** 351 * Shut down the service. Any chores that are scheduled for execution will be cancelled. Any 352 * chores in the middle of execution will be interrupted and shutdown. This service will be 353 * unusable after this method has been called (i.e. future scheduling attempts will fail). 354 * <p/> 355 * Notice that, this will only clean the chore from this ChoreService but you could still schedule 356 * the chore with other ChoreService. 357 */ 358 public synchronized void shutdown() { 359 if (isShutdown()) { 360 return; 361 } 362 scheduler.shutdownNow(); 363 LOG.info("Chore service for: {} had {} on shutdown", coreThreadPoolPrefix, 364 scheduledChores.keySet()); 365 cancelAllChores(true); 366 scheduledChores.clear(); 367 choresMissingStartTime.clear(); 368 } 369 370 /** Returns true when the service is shutdown and thus cannot be used anymore */ 371 public boolean isShutdown() { 372 return scheduler.isShutdown(); 373 } 374 375 /** Returns true when the service is shutdown and all threads have terminated */ 376 public boolean isTerminated() { 377 return scheduler.isTerminated(); 378 } 379 380 private void cancelAllChores(final boolean mayInterruptIfRunning) { 381 // Build list of chores to cancel so we can iterate through a set that won't change 382 // as chores are cancelled. If we tried to cancel each chore while iterating through 383 // keySet the results would be undefined because the keySet would be changing 384 ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet()); 385 386 for (ScheduledChore chore : choresToCancel) { 387 cancelChore(chore, mayInterruptIfRunning); 388 } 389 } 390 391 /** Prints a summary of important details about the chore. Used for debugging purposes */ 392 private void printChoreDetails(final String header, ScheduledChore chore) { 393 if (!LOG.isTraceEnabled()) { 394 return; 395 } 396 LinkedHashMap<String, String> output = new LinkedHashMap<>(); 397 output.put(header, ""); 398 output.put("Chore name: ", chore.getName()); 399 output.put("Chore period: ", Integer.toString(chore.getPeriod())); 400 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns())); 401 402 for (Entry<String, String> entry : output.entrySet()) { 403 LOG.trace(entry.getKey() + entry.getValue()); 404 } 405 } 406 407 /** Prints a summary of important details about the service. Used for debugging purposes */ 408 private void printChoreServiceDetails(final String header) { 409 if (!LOG.isTraceEnabled()) { 410 return; 411 } 412 LinkedHashMap<String, String> output = new LinkedHashMap<>(); 413 output.put(header, ""); 414 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize())); 415 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores())); 416 output.put("ChoreService missingStartTimeCount: ", 417 Integer.toString(getNumberOfChoresMissingStartTime())); 418 419 for (Entry<String, String> entry : output.entrySet()) { 420 LOG.trace(entry.getKey() + entry.getValue()); 421 } 422 } 423}