001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.LinkedHashMap; 024import java.util.Map.Entry; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.ScheduledThreadPoolExecutor; 027import java.util.concurrent.ThreadFactory; 028import java.util.concurrent.atomic.AtomicInteger; 029 030import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run 037 * periodically while sharing threads. The ChoreService is backed by a 038 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the 039 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the 040 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads. 041 * <p> 042 * The ChoreService provides the ability to schedule, cancel, and trigger instances of 043 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of 044 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling 045 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled, 046 * there may be a need to increase the number of threads if it is noticed that chores are no longer 047 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is 048 * made to reduce the number of running threads to see if chores can still meet their start times 049 * with a smaller thread pool. 050 * <p> 051 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}. 052 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly. 053 */ 054@InterfaceAudience.Public 055public class ChoreService implements ChoreServicer { 056 private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class); 057 058 /** 059 * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor 060 */ 061 @InterfaceAudience.Private 062 public final static int MIN_CORE_POOL_SIZE = 1; 063 064 /** 065 * This thread pool is used to schedule all of the Chores 066 */ 067 private final ScheduledThreadPoolExecutor scheduler; 068 069 /** 070 * Maps chores to their futures. Futures are used to control a chore's schedule 071 */ 072 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores; 073 074 /** 075 * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the 076 * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to 077 * increase the core pool size by 1 (otherwise a single long running chore whose execution is 078 * longer than its period would be able to spawn too many threads). 079 */ 080 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime; 081 082 /** 083 * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the 084 * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is 085 * running on. The prefix is useful because it allows us to monitor how the thread pool of a 086 * particular service changes over time VIA thread dumps. 087 */ 088 private final String coreThreadPoolPrefix; 089 090 /** 091 * 092 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 093 * spawned by this service 094 */ 095 @InterfaceAudience.Private 096 public ChoreService(final String coreThreadPoolPrefix) { 097 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false); 098 } 099 100 /** 101 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 102 * spawned by this service 103 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set 104 * to true this will add -10% to 10% jitter. 105 */ 106 public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) { 107 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter); 108 } 109 110 /** 111 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 112 * spawned by this service 113 * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor 114 * to during initialization. The default size is 1, but specifying a larger size may be 115 * beneficial if you know that 1 thread will not be enough. 116 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set 117 * to true this will add -10% to 10% jitter. 118 */ 119 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) { 120 this.coreThreadPoolPrefix = coreThreadPoolPrefix; 121 if (corePoolSize < MIN_CORE_POOL_SIZE) { 122 corePoolSize = MIN_CORE_POOL_SIZE; 123 } 124 125 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); 126 if (jitter) { 127 scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1); 128 } else { 129 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); 130 } 131 132 scheduler.setRemoveOnCancelPolicy(true); 133 scheduledChores = new HashMap<>(); 134 choresMissingStartTime = new HashMap<>(); 135 } 136 137 /** 138 * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService 139 * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled 140 * with a single ChoreService instance). 141 * @return true when the chore was successfully scheduled. false when the scheduling failed 142 * (typically occurs when a chore is scheduled during shutdown of service) 143 */ 144 public synchronized boolean scheduleChore(ScheduledChore chore) { 145 if (chore == null) { 146 return false; 147 } 148 149 try { 150 if (chore.getPeriod() <= 0) { 151 LOG.info("Chore {} is disabled because its period is not positive.", chore); 152 return false; 153 } 154 LOG.info("Chore {} is enabled.", chore); 155 chore.setChoreServicer(this); 156 ScheduledFuture<?> future = 157 scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(), 158 chore.getTimeUnit()); 159 scheduledChores.put(chore, future); 160 return true; 161 } catch (Exception exception) { 162 if (LOG.isInfoEnabled()) { 163 LOG.info("Could not successfully schedule chore: " + chore.getName()); 164 } 165 return false; 166 } 167 } 168 169 /** 170 * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService 171 * yet then this call is equivalent to a call to scheduleChore. 172 */ 173 private void rescheduleChore(ScheduledChore chore) { 174 if (scheduledChores.containsKey(chore)) { 175 ScheduledFuture<?> future = scheduledChores.get(chore); 176 future.cancel(false); 177 } 178 scheduleChore(chore); 179 } 180 181 @InterfaceAudience.Private 182 @Override 183 public synchronized void cancelChore(ScheduledChore chore) { 184 cancelChore(chore, true); 185 } 186 187 @InterfaceAudience.Private 188 @Override 189 public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { 190 if (chore != null && scheduledChores.containsKey(chore)) { 191 ScheduledFuture<?> future = scheduledChores.get(chore); 192 future.cancel(mayInterruptIfRunning); 193 scheduledChores.remove(chore); 194 195 // Removing a chore that was missing its start time means it may be possible 196 // to reduce the number of threads 197 if (choresMissingStartTime.containsKey(chore)) { 198 choresMissingStartTime.remove(chore); 199 requestCorePoolDecrease(); 200 } 201 } 202 } 203 204 @InterfaceAudience.Private 205 @Override 206 public synchronized boolean isChoreScheduled(ScheduledChore chore) { 207 return chore != null && scheduledChores.containsKey(chore) 208 && !scheduledChores.get(chore).isDone(); 209 } 210 211 @InterfaceAudience.Private 212 @Override 213 public synchronized boolean triggerNow(ScheduledChore chore) { 214 if (chore != null) { 215 rescheduleChore(chore); 216 return true; 217 } 218 return false; 219 } 220 221 /** 222 * @return number of chores that this service currently has scheduled 223 */ 224 int getNumberOfScheduledChores() { 225 return scheduledChores.size(); 226 } 227 228 /** 229 * @return number of chores that this service currently has scheduled that are missing their 230 * scheduled start time 231 */ 232 int getNumberOfChoresMissingStartTime() { 233 return choresMissingStartTime.size(); 234 } 235 236 /** 237 * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor 238 */ 239 int getCorePoolSize() { 240 return scheduler.getCorePoolSize(); 241 } 242 243 /** 244 * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are 245 * daemon threads, and thus, don't prevent the JVM from shutting down 246 */ 247 static class ChoreServiceThreadFactory implements ThreadFactory { 248 private final String threadPrefix; 249 private final static String THREAD_NAME_SUFFIX = ".Chore."; 250 private AtomicInteger threadNumber = new AtomicInteger(1); 251 252 /** 253 * @param threadPrefix The prefix given to all threads created by this factory 254 */ 255 public ChoreServiceThreadFactory(final String threadPrefix) { 256 this.threadPrefix = threadPrefix; 257 } 258 259 @Override 260 public Thread newThread(Runnable r) { 261 Thread thread = 262 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement()); 263 thread.setDaemon(true); 264 return thread; 265 } 266 } 267 268 /** 269 * Represents a request to increase the number of core pool threads. Typically a request 270 * originates from the fact that the current core pool size is not sufficient to service all of 271 * the currently running Chores 272 * @return true when the request to increase the core pool size succeeds 273 */ 274 private synchronized boolean requestCorePoolIncrease() { 275 // There is no point in creating more threads than scheduledChores.size since scheduled runs 276 // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced 277 // amongst occurrences of the same chore). 278 if (scheduler.getCorePoolSize() < scheduledChores.size()) { 279 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1); 280 printChoreServiceDetails("requestCorePoolIncrease"); 281 return true; 282 } 283 return false; 284 } 285 286 /** 287 * Represents a request to decrease the number of core pool threads. Typically a request 288 * originates from the fact that the current core pool size is more than sufficient to service the 289 * running Chores. 290 */ 291 private synchronized void requestCorePoolDecrease() { 292 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) { 293 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1); 294 printChoreServiceDetails("requestCorePoolDecrease"); 295 } 296 } 297 298 @InterfaceAudience.Private 299 @Override 300 public synchronized void onChoreMissedStartTime(ScheduledChore chore) { 301 if (chore == null || !scheduledChores.containsKey(chore)) return; 302 303 // If the chore has not caused an increase in the size of the core thread pool then request an 304 // increase. This allows each chore missing its start time to increase the core pool size by 305 // at most 1. 306 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) { 307 choresMissingStartTime.put(chore, requestCorePoolIncrease()); 308 } 309 310 // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If 311 // the chore is NOT rescheduled, future executions of this chore will be delayed more and 312 // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates 313 // idle threads to chores based on how delayed they are. 314 rescheduleChore(chore); 315 printChoreDetails("onChoreMissedStartTime", chore); 316 } 317 318 /** 319 * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores 320 * in the middle of execution will be interrupted and shutdown. This service will be unusable 321 * after this method has been called (i.e. future scheduling attempts will fail). 322 */ 323 public synchronized void shutdown() { 324 scheduler.shutdownNow(); 325 if (LOG.isInfoEnabled()) { 326 LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet() 327 + " on shutdown"); 328 } 329 cancelAllChores(true); 330 scheduledChores.clear(); 331 choresMissingStartTime.clear(); 332 } 333 334 /** 335 * @return true when the service is shutdown and thus cannot be used anymore 336 */ 337 public boolean isShutdown() { 338 return scheduler.isShutdown(); 339 } 340 341 /** 342 * @return true when the service is shutdown and all threads have terminated 343 */ 344 public boolean isTerminated() { 345 return scheduler.isTerminated(); 346 } 347 348 private void cancelAllChores(final boolean mayInterruptIfRunning) { 349 // Build list of chores to cancel so we can iterate through a set that won't change 350 // as chores are cancelled. If we tried to cancel each chore while iterating through 351 // keySet the results would be undefined because the keySet would be changing 352 ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet()); 353 354 for (ScheduledChore chore : choresToCancel) { 355 cancelChore(chore, mayInterruptIfRunning); 356 } 357 } 358 359 /** 360 * Prints a summary of important details about the chore. Used for debugging purposes 361 */ 362 private void printChoreDetails(final String header, ScheduledChore chore) { 363 LinkedHashMap<String, String> output = new LinkedHashMap<>(); 364 output.put(header, ""); 365 output.put("Chore name: ", chore.getName()); 366 output.put("Chore period: ", Integer.toString(chore.getPeriod())); 367 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns())); 368 369 for (Entry<String, String> entry : output.entrySet()) { 370 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue()); 371 } 372 } 373 374 /** 375 * Prints a summary of important details about the service. Used for debugging purposes 376 */ 377 private void printChoreServiceDetails(final String header) { 378 LinkedHashMap<String, String> output = new LinkedHashMap<>(); 379 output.put(header, ""); 380 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize())); 381 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores())); 382 output.put("ChoreService missingStartTimeCount: ", 383 Integer.toString(getNumberOfChoresMissingStartTime())); 384 385 for (Entry<String, String> entry : output.entrySet()) { 386 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue()); 387 } 388 } 389}