001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.LinkedHashMap; 024import java.util.Map.Entry; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.ScheduledThreadPoolExecutor; 027import java.util.concurrent.ThreadFactory; 028import java.util.concurrent.atomic.AtomicInteger; 029 030import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 036 037/** 038 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run 039 * periodically while sharing threads. The ChoreService is backed by a 040 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the 041 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the 042 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads. 043 * <p> 044 * The ChoreService provides the ability to schedule, cancel, and trigger instances of 045 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of 046 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling 047 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled, 048 * there may be a need to increase the number of threads if it is noticed that chores are no longer 049 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is 050 * made to reduce the number of running threads to see if chores can still meet their start times 051 * with a smaller thread pool. 052 * <p> 053 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}. 054 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly. 055 */ 056@InterfaceAudience.Public 057public class ChoreService implements ChoreServicer { 058 private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class); 059 060 /** 061 * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor 062 */ 063 @InterfaceAudience.Private 064 public final static int MIN_CORE_POOL_SIZE = 1; 065 066 /** 067 * This thread pool is used to schedule all of the Chores 068 */ 069 private final ScheduledThreadPoolExecutor scheduler; 070 071 /** 072 * Maps chores to their futures. Futures are used to control a chore's schedule 073 */ 074 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores; 075 076 /** 077 * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the 078 * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to 079 * increase the core pool size by 1 (otherwise a single long running chore whose execution is 080 * longer than its period would be able to spawn too many threads). 081 */ 082 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime; 083 084 /** 085 * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the 086 * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is 087 * running on. The prefix is useful because it allows us to monitor how the thread pool of a 088 * particular service changes over time VIA thread dumps. 089 */ 090 private final String coreThreadPoolPrefix; 091 092 /** 093 * 094 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 095 * spawned by this service 096 */ 097 @InterfaceAudience.Private 098 @VisibleForTesting 099 public ChoreService(final String coreThreadPoolPrefix) { 100 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false); 101 } 102 103 /** 104 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 105 * spawned by this service 106 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set 107 * to true this will add -10% to 10% jitter. 108 */ 109 public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) { 110 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter); 111 } 112 113 /** 114 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 115 * spawned by this service 116 * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor 117 * to during initialization. The default size is 1, but specifying a larger size may be 118 * beneficial if you know that 1 thread will not be enough. 119 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set 120 * to true this will add -10% to 10% jitter. 121 */ 122 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) { 123 this.coreThreadPoolPrefix = coreThreadPoolPrefix; 124 if (corePoolSize < MIN_CORE_POOL_SIZE) { 125 corePoolSize = MIN_CORE_POOL_SIZE; 126 } 127 128 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); 129 if (jitter) { 130 scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1); 131 } else { 132 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); 133 } 134 135 scheduler.setRemoveOnCancelPolicy(true); 136 scheduledChores = new HashMap<>(); 137 choresMissingStartTime = new HashMap<>(); 138 } 139 140 /** 141 * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService 142 * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled 143 * with a single ChoreService instance). 144 * @return true when the chore was successfully scheduled. false when the scheduling failed 145 * (typically occurs when a chore is scheduled during shutdown of service) 146 */ 147 public synchronized boolean scheduleChore(ScheduledChore chore) { 148 if (chore == null) { 149 return false; 150 } 151 152 try { 153 if (chore.getPeriod() <= 0) { 154 LOG.info("Chore {} is disabled because its period is not positive.", chore); 155 return false; 156 } 157 LOG.info("Chore {} is enabled.", chore); 158 chore.setChoreServicer(this); 159 ScheduledFuture<?> future = 160 scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(), 161 chore.getTimeUnit()); 162 scheduledChores.put(chore, future); 163 return true; 164 } catch (Exception exception) { 165 if (LOG.isInfoEnabled()) { 166 LOG.info("Could not successfully schedule chore: " + chore.getName()); 167 } 168 return false; 169 } 170 } 171 172 /** 173 * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService 174 * yet then this call is equivalent to a call to scheduleChore. 175 */ 176 private synchronized void rescheduleChore(ScheduledChore chore) { 177 if (chore == null) return; 178 179 if (scheduledChores.containsKey(chore)) { 180 ScheduledFuture<?> future = scheduledChores.get(chore); 181 future.cancel(false); 182 } 183 scheduleChore(chore); 184 } 185 186 @InterfaceAudience.Private 187 @Override 188 public synchronized void cancelChore(ScheduledChore chore) { 189 cancelChore(chore, true); 190 } 191 192 @InterfaceAudience.Private 193 @Override 194 public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { 195 if (chore != null && scheduledChores.containsKey(chore)) { 196 ScheduledFuture<?> future = scheduledChores.get(chore); 197 future.cancel(mayInterruptIfRunning); 198 scheduledChores.remove(chore); 199 200 // Removing a chore that was missing its start time means it may be possible 201 // to reduce the number of threads 202 if (choresMissingStartTime.containsKey(chore)) { 203 choresMissingStartTime.remove(chore); 204 requestCorePoolDecrease(); 205 } 206 } 207 } 208 209 @InterfaceAudience.Private 210 @Override 211 public synchronized boolean isChoreScheduled(ScheduledChore chore) { 212 return chore != null && scheduledChores.containsKey(chore) 213 && !scheduledChores.get(chore).isDone(); 214 } 215 216 @InterfaceAudience.Private 217 @Override 218 public synchronized boolean triggerNow(ScheduledChore chore) { 219 if (chore == null) { 220 return false; 221 } else { 222 rescheduleChore(chore); 223 return true; 224 } 225 } 226 227 /** 228 * @return number of chores that this service currently has scheduled 229 */ 230 int getNumberOfScheduledChores() { 231 return scheduledChores.size(); 232 } 233 234 /** 235 * @return number of chores that this service currently has scheduled that are missing their 236 * scheduled start time 237 */ 238 int getNumberOfChoresMissingStartTime() { 239 return choresMissingStartTime.size(); 240 } 241 242 /** 243 * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor 244 */ 245 int getCorePoolSize() { 246 return scheduler.getCorePoolSize(); 247 } 248 249 /** 250 * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are 251 * daemon threads, and thus, don't prevent the JVM from shutting down 252 */ 253 static class ChoreServiceThreadFactory implements ThreadFactory { 254 private final String threadPrefix; 255 private final static String THREAD_NAME_SUFFIX = ".Chore."; 256 private AtomicInteger threadNumber = new AtomicInteger(1); 257 258 /** 259 * @param threadPrefix The prefix given to all threads created by this factory 260 */ 261 public ChoreServiceThreadFactory(final String threadPrefix) { 262 this.threadPrefix = threadPrefix; 263 } 264 265 @Override 266 public Thread newThread(Runnable r) { 267 Thread thread = 268 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement()); 269 thread.setDaemon(true); 270 return thread; 271 } 272 } 273 274 /** 275 * Represents a request to increase the number of core pool threads. Typically a request 276 * originates from the fact that the current core pool size is not sufficient to service all of 277 * the currently running Chores 278 * @return true when the request to increase the core pool size succeeds 279 */ 280 private synchronized boolean requestCorePoolIncrease() { 281 // There is no point in creating more threads than scheduledChores.size since scheduled runs 282 // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced 283 // amongst occurrences of the same chore). 284 if (scheduler.getCorePoolSize() < scheduledChores.size()) { 285 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1); 286 printChoreServiceDetails("requestCorePoolIncrease"); 287 return true; 288 } 289 return false; 290 } 291 292 /** 293 * Represents a request to decrease the number of core pool threads. Typically a request 294 * originates from the fact that the current core pool size is more than sufficient to service the 295 * running Chores. 296 */ 297 private synchronized void requestCorePoolDecrease() { 298 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) { 299 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1); 300 printChoreServiceDetails("requestCorePoolDecrease"); 301 } 302 } 303 304 @InterfaceAudience.Private 305 @Override 306 public synchronized void onChoreMissedStartTime(ScheduledChore chore) { 307 if (chore == null || !scheduledChores.containsKey(chore)) return; 308 309 // If the chore has not caused an increase in the size of the core thread pool then request an 310 // increase. This allows each chore missing its start time to increase the core pool size by 311 // at most 1. 312 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) { 313 choresMissingStartTime.put(chore, requestCorePoolIncrease()); 314 } 315 316 // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If 317 // the chore is NOT rescheduled, future executions of this chore will be delayed more and 318 // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates 319 // idle threads to chores based on how delayed they are. 320 rescheduleChore(chore); 321 printChoreDetails("onChoreMissedStartTime", chore); 322 } 323 324 /** 325 * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores 326 * in the middle of execution will be interrupted and shutdown. This service will be unusable 327 * after this method has been called (i.e. future scheduling attempts will fail). 328 */ 329 public synchronized void shutdown() { 330 scheduler.shutdownNow(); 331 if (LOG.isInfoEnabled()) { 332 LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet() 333 + " on shutdown"); 334 } 335 cancelAllChores(true); 336 scheduledChores.clear(); 337 choresMissingStartTime.clear(); 338 } 339 340 /** 341 * @return true when the service is shutdown and thus cannot be used anymore 342 */ 343 public boolean isShutdown() { 344 return scheduler.isShutdown(); 345 } 346 347 /** 348 * @return true when the service is shutdown and all threads have terminated 349 */ 350 public boolean isTerminated() { 351 return scheduler.isTerminated(); 352 } 353 354 private void cancelAllChores(final boolean mayInterruptIfRunning) { 355 ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet().size()); 356 // Build list of chores to cancel so we can iterate through a set that won't change 357 // as chores are cancelled. If we tried to cancel each chore while iterating through 358 // keySet the results would be undefined because the keySet would be changing 359 for (ScheduledChore chore : scheduledChores.keySet()) { 360 choresToCancel.add(chore); 361 } 362 for (ScheduledChore chore : choresToCancel) { 363 cancelChore(chore, mayInterruptIfRunning); 364 } 365 choresToCancel.clear(); 366 } 367 368 /** 369 * Prints a summary of important details about the chore. Used for debugging purposes 370 */ 371 private void printChoreDetails(final String header, ScheduledChore chore) { 372 LinkedHashMap<String, String> output = new LinkedHashMap<>(); 373 output.put(header, ""); 374 output.put("Chore name: ", chore.getName()); 375 output.put("Chore period: ", Integer.toString(chore.getPeriod())); 376 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns())); 377 378 for (Entry<String, String> entry : output.entrySet()) { 379 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue()); 380 } 381 } 382 383 /** 384 * Prints a summary of important details about the service. Used for debugging purposes 385 */ 386 private void printChoreServiceDetails(final String header) { 387 LinkedHashMap<String, String> output = new LinkedHashMap<>(); 388 output.put(header, ""); 389 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize())); 390 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores())); 391 output.put("ChoreService missingStartTimeCount: ", 392 Integer.toString(getNumberOfChoresMissingStartTime())); 393 394 for (Entry<String, String> entry : output.entrySet()) { 395 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue()); 396 } 397 } 398}