001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.LinkedHashMap; 024import java.util.Map.Entry; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.ScheduledThreadPoolExecutor; 027import java.util.concurrent.ThreadFactory; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.hbase.trace.TraceUtil; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run 036 * periodically while sharing threads. The ChoreService is backed by a 037 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the 038 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the 039 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads. 040 * <p> 041 * The ChoreService provides the ability to schedule, cancel, and trigger instances of 042 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of 043 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling 044 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled, 045 * there may be a need to increase the number of threads if it is noticed that chores are no longer 046 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is 047 * made to reduce the number of running threads to see if chores can still meet their start times 048 * with a smaller thread pool. 049 * <p> 050 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}. 051 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly. 052 */ 053@InterfaceAudience.Private 054public class ChoreService { 055 private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class); 056 057 /** 058 * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor 059 */ 060 @InterfaceAudience.Private 061 public final static int MIN_CORE_POOL_SIZE = 1; 062 /** 063 * The initial number of threads in the core pool for the {@link ChoreService}. 064 */ 065 public static final String CHORE_SERVICE_INITIAL_POOL_SIZE = 066 "hbase.choreservice.initial.pool.size"; 067 public static final int DEFAULT_CHORE_SERVICE_INITIAL_POOL_SIZE = 1; 068 069 /** 070 * This thread pool is used to schedule all of the Chores 071 */ 072 private final ScheduledThreadPoolExecutor scheduler; 073 074 /** 075 * Maps chores to their futures. Futures are used to control a chore's schedule 076 */ 077 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores; 078 079 /** 080 * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the 081 * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to 082 * increase the core pool size by 1 (otherwise a single long running chore whose execution is 083 * longer than its period would be able to spawn too many threads). 084 */ 085 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime; 086 087 /** 088 * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the 089 * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is 090 * running on. The prefix is useful because it allows us to monitor how the thread pool of a 091 * particular service changes over time VIA thread dumps. 092 */ 093 private final String coreThreadPoolPrefix; 094 095 /** 096 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 097 * spawned by this service 098 */ 099 @InterfaceAudience.Private 100 public ChoreService(final String coreThreadPoolPrefix) { 101 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false); 102 } 103 104 /** 105 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 106 * spawned by this service 107 * @param jitter Should chore service add some jitter for all of the scheduled 108 * chores. When set to true this will add -10% to 10% jitter. 109 */ 110 public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) { 111 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter); 112 } 113 114 /** 115 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads 116 * spawned by this service 117 * @param corePoolSize The initial size to set the core pool of the 118 * ScheduledThreadPoolExecutor to during initialization. The default 119 * size is 1, but specifying a larger size may be beneficial if you 120 * know that 1 thread will not be enough. 121 * @param jitter Should chore service add some jitter for all of the scheduled 122 * chores. When set to true this will add -10% to 10% jitter. 123 */ 124 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) { 125 this.coreThreadPoolPrefix = coreThreadPoolPrefix; 126 if (corePoolSize < MIN_CORE_POOL_SIZE) { 127 corePoolSize = MIN_CORE_POOL_SIZE; 128 } 129 130 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); 131 if (jitter) { 132 scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1); 133 } else { 134 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); 135 } 136 137 scheduler.setRemoveOnCancelPolicy(true); 138 scheduledChores = new HashMap<>(); 139 choresMissingStartTime = new HashMap<>(); 140 } 141 142 /** 143 * Schedule a chore. 144 * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService 145 * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled 146 * with a single ChoreService instance). 147 * @return true when the chore was successfully scheduled. false when the scheduling failed 148 * (typically occurs when a chore is scheduled during shutdown of service) 149 */ 150 public boolean scheduleChore(ScheduledChore chore) { 151 if (chore == null) { 152 return false; 153 } 154 // always lock chore first to prevent dead lock 155 synchronized (chore) { 156 synchronized (this) { 157 try { 158 // Chores should only ever be scheduled with a single ChoreService. If the choreService 159 // is changing, cancel any existing schedules of this chore. 160 if (chore.getChoreService() == this) { 161 LOG.warn("Chore {} has already been scheduled with us", chore); 162 return false; 163 } 164 if (chore.getPeriod() <= 0) { 165 LOG.info("Chore {} is disabled because its period is not positive.", chore); 166 return false; 167 } 168 LOG.info("Chore {} is enabled.", chore); 169 if (chore.getChoreService() != null) { 170 LOG.info("Cancel chore {} from its previous service", chore); 171 chore.getChoreService().cancelChore(chore); 172 } 173 chore.setChoreService(this); 174 ScheduledFuture<?> future = 175 scheduler.scheduleAtFixedRate(TraceUtil.tracedRunnable(chore, chore.getName()), 176 chore.getInitialDelay(), chore.getPeriod(), chore.getTimeUnit()); 177 scheduledChores.put(chore, future); 178 return true; 179 } catch (Exception e) { 180 LOG.error("Could not successfully schedule chore: {}", chore.getName(), e); 181 return false; 182 } 183 } 184 } 185 } 186 187 /** 188 * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService 189 * yet then this call is equivalent to a call to scheduleChore. 190 */ 191 private void rescheduleChore(ScheduledChore chore, boolean immediately) { 192 if (scheduledChores.containsKey(chore)) { 193 ScheduledFuture<?> future = scheduledChores.get(chore); 194 future.cancel(false); 195 } 196 // set initial delay to 0 as we want to run it immediately 197 ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, 198 immediately ? 0 : chore.getPeriod(), chore.getPeriod(), chore.getTimeUnit()); 199 scheduledChores.put(chore, future); 200 } 201 202 /** 203 * Cancel any ongoing schedules that this chore has with the implementer of this interface. 204 * <p/> 205 * Call {@link ScheduledChore#cancel()} to cancel a {@link ScheduledChore}, in 206 * {@link ScheduledChore#cancel()} method we will call this method to remove the 207 * {@link ScheduledChore} from this {@link ChoreService}. 208 */ 209 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", 210 allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java") 211 synchronized void cancelChore(ScheduledChore chore) { 212 cancelChore(chore, true); 213 } 214 215 /** 216 * Cancel any ongoing schedules that this chore has with the implementer of this interface. 217 * <p/> 218 * Call {@link ScheduledChore#cancel(boolean)} to cancel a {@link ScheduledChore}, in 219 * {@link ScheduledChore#cancel(boolean)} method we will call this method to remove the 220 * {@link ScheduledChore} from this {@link ChoreService}. 221 */ 222 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", 223 allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java") 224 synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { 225 if (scheduledChores.containsKey(chore)) { 226 ScheduledFuture<?> future = scheduledChores.get(chore); 227 future.cancel(mayInterruptIfRunning); 228 scheduledChores.remove(chore); 229 230 // Removing a chore that was missing its start time means it may be possible 231 // to reduce the number of threads 232 if (choresMissingStartTime.containsKey(chore)) { 233 choresMissingStartTime.remove(chore); 234 requestCorePoolDecrease(); 235 } 236 } 237 } 238 239 /** Returns true when the chore is scheduled with the implementer of this interface */ 240 @InterfaceAudience.Private 241 public synchronized boolean isChoreScheduled(ScheduledChore chore) { 242 return chore != null && scheduledChores.containsKey(chore) 243 && !scheduledChores.get(chore).isDone(); 244 } 245 246 /** 247 * This method tries to execute the chore immediately. If the chore is executing at the time of 248 * this call, the chore will begin another execution as soon as the current execution finishes 249 */ 250 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", 251 allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java") 252 synchronized void triggerNow(ScheduledChore chore) { 253 assert chore.getChoreService() == this; 254 rescheduleChore(chore, true); 255 } 256 257 /** Returns number of chores that this service currently has scheduled */ 258 int getNumberOfScheduledChores() { 259 return scheduledChores.size(); 260 } 261 262 /** 263 * Return number of chores that this service currently has scheduled that are missing their 264 * scheduled start time 265 */ 266 int getNumberOfChoresMissingStartTime() { 267 return choresMissingStartTime.size(); 268 } 269 270 /** Returns number of threads in the core pool of the underlying ScheduledThreadPoolExecutor */ 271 int getCorePoolSize() { 272 return scheduler.getCorePoolSize(); 273 } 274 275 /** 276 * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are 277 * daemon threads, and thus, don't prevent the JVM from shutting down 278 */ 279 static class ChoreServiceThreadFactory implements ThreadFactory { 280 private final String threadPrefix; 281 private final static String THREAD_NAME_SUFFIX = ".Chore."; 282 private AtomicInteger threadNumber = new AtomicInteger(1); 283 284 public ChoreServiceThreadFactory(final String threadPrefix) { 285 this.threadPrefix = threadPrefix; 286 } 287 288 @Override 289 public Thread newThread(Runnable r) { 290 Thread thread = 291 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement()); 292 thread.setDaemon(true); 293 return thread; 294 } 295 } 296 297 /** 298 * Represents a request to increase the number of core pool threads. Typically a request 299 * originates from the fact that the current core pool size is not sufficient to service all of 300 * the currently running Chores 301 * @return true when the request to increase the core pool size succeeds 302 */ 303 private synchronized boolean requestCorePoolIncrease() { 304 // There is no point in creating more threads than scheduledChores.size since scheduled runs 305 // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced 306 // amongst occurrences of the same chore). 307 if (scheduler.getCorePoolSize() < scheduledChores.size()) { 308 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1); 309 printChoreServiceDetails("requestCorePoolIncrease"); 310 return true; 311 } 312 return false; 313 } 314 315 /** 316 * Represents a request to decrease the number of core pool threads. Typically a request 317 * originates from the fact that the current core pool size is more than sufficient to service the 318 * running Chores. 319 */ 320 private synchronized void requestCorePoolDecrease() { 321 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) { 322 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1); 323 printChoreServiceDetails("requestCorePoolDecrease"); 324 } 325 } 326 327 /** 328 * A callback that tells the implementer of this interface that one of the scheduled chores is 329 * missing its start time. The implication of a chore missing its start time is that the service's 330 * current means of scheduling may not be sufficient to handle the number of ongoing chores (the 331 * other explanation is that the chore's execution time is greater than its scheduled period). The 332 * service should try to increase its concurrency when this callback is received. 333 * @param chore The chore that missed its start time 334 */ 335 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "", 336 allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java") 337 synchronized void onChoreMissedStartTime(ScheduledChore chore) { 338 if (!scheduledChores.containsKey(chore)) { 339 return; 340 } 341 342 // If the chore has not caused an increase in the size of the core thread pool then request an 343 // increase. This allows each chore missing its start time to increase the core pool size by 344 // at most 1. 345 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) { 346 choresMissingStartTime.put(chore, requestCorePoolIncrease()); 347 } 348 349 // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If 350 // the chore is NOT rescheduled, future executions of this chore will be delayed more and 351 // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates 352 // idle threads to chores based on how delayed they are. 353 rescheduleChore(chore, false); 354 printChoreDetails("onChoreMissedStartTime", chore); 355 } 356 357 /** 358 * Shut down the service. Any chores that are scheduled for execution will be cancelled. Any 359 * chores in the middle of execution will be interrupted and shutdown. This service will be 360 * unusable after this method has been called (i.e. future scheduling attempts will fail). 361 * <p/> 362 * Notice that, this will only clean the chore from this ChoreService but you could still schedule 363 * the chore with other ChoreService. 364 */ 365 public synchronized void shutdown() { 366 if (isShutdown()) { 367 return; 368 } 369 scheduler.shutdownNow(); 370 LOG.info("Chore service for: {} had {} on shutdown", coreThreadPoolPrefix, 371 scheduledChores.keySet()); 372 cancelAllChores(true); 373 scheduledChores.clear(); 374 choresMissingStartTime.clear(); 375 } 376 377 /** Returns true when the service is shutdown and thus cannot be used anymore */ 378 public boolean isShutdown() { 379 return scheduler.isShutdown(); 380 } 381 382 /** Returns true when the service is shutdown and all threads have terminated */ 383 public boolean isTerminated() { 384 return scheduler.isTerminated(); 385 } 386 387 private void cancelAllChores(final boolean mayInterruptIfRunning) { 388 // Build list of chores to cancel so we can iterate through a set that won't change 389 // as chores are cancelled. If we tried to cancel each chore while iterating through 390 // keySet the results would be undefined because the keySet would be changing 391 ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet()); 392 393 for (ScheduledChore chore : choresToCancel) { 394 cancelChore(chore, mayInterruptIfRunning); 395 } 396 } 397 398 /** Prints a summary of important details about the chore. Used for debugging purposes */ 399 private void printChoreDetails(final String header, ScheduledChore chore) { 400 if (!LOG.isTraceEnabled()) { 401 return; 402 } 403 LinkedHashMap<String, String> output = new LinkedHashMap<>(); 404 output.put(header, ""); 405 output.put("Chore name: ", chore.getName()); 406 output.put("Chore period: ", Integer.toString(chore.getPeriod())); 407 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns())); 408 409 for (Entry<String, String> entry : output.entrySet()) { 410 LOG.trace(entry.getKey() + entry.getValue()); 411 } 412 } 413 414 /** Prints a summary of important details about the service. Used for debugging purposes */ 415 private void printChoreServiceDetails(final String header) { 416 if (!LOG.isTraceEnabled()) { 417 return; 418 } 419 LinkedHashMap<String, String> output = new LinkedHashMap<>(); 420 output.put(header, ""); 421 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize())); 422 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores())); 423 output.put("ChoreService missingStartTimeCount: ", 424 Integer.toString(getNumberOfChoresMissingStartTime())); 425 426 for (Entry<String, String> entry : output.entrySet()) { 427 LOG.trace(entry.getKey() + entry.getValue()); 428 } 429 } 430}