001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 023 024import java.io.IOException; 025import java.io.PrintWriter; 026import java.io.StringWriter; 027import java.util.Comparator; 028import java.util.Iterator; 029import java.util.Optional; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.Executors; 032import java.util.concurrent.RejectedExecutionException; 033import java.util.concurrent.RejectedExecutionHandler; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.function.IntSupplier; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.conf.ConfigurationManager; 042import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 043import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 047import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 048import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 049import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.StealJobQueue; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.util.StringUtils; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060 061/** 062 * Compact region on request and then run split if appropriate 063 */ 064@InterfaceAudience.Private 065public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 066 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 067 068 // Configuration key for the large compaction threads. 069 public final static String LARGE_COMPACTION_THREADS = 070 "hbase.regionserver.thread.compaction.large"; 071 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 072 073 // Configuration key for the small compaction threads. 074 public final static String SMALL_COMPACTION_THREADS = 075 "hbase.regionserver.thread.compaction.small"; 076 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 077 078 // Configuration key for split threads 079 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 080 public final static int SPLIT_THREADS_DEFAULT = 1; 081 082 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 083 "hbase.regionserver.regionSplitLimit"; 084 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; 085 086 private final HRegionServer server; 087 private final Configuration conf; 088 089 private final ThreadPoolExecutor longCompactions; 090 private final ThreadPoolExecutor shortCompactions; 091 private final ThreadPoolExecutor splits; 092 093 private volatile ThroughputController compactionThroughputController; 094 095 /** 096 * Splitting should not take place if the total number of regions exceed this. 097 * This is not a hard limit to the number of regions but it is a guideline to 098 * stop splitting after number of online regions is greater than this. 099 */ 100 private int regionSplitLimit; 101 102 /** @param server */ 103 CompactSplit(HRegionServer server) { 104 this.server = server; 105 this.conf = server.getConfiguration(); 106 this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, 107 DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 108 109 int largeThreads = Math.max(1, conf.getInt( 110 LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 111 int smallThreads = conf.getInt( 112 SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 113 114 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 115 116 // if we have throttle threads, make sure the user also specified size 117 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 118 119 final String n = Thread.currentThread().getName(); 120 121 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 122 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 123 60, TimeUnit.SECONDS, stealJobQueue, 124 new ThreadFactory() { 125 @Override 126 public Thread newThread(Runnable r) { 127 String name = n + "-longCompactions-" + System.currentTimeMillis(); 128 return new Thread(r, name); 129 } 130 }); 131 this.longCompactions.setRejectedExecutionHandler(new Rejection()); 132 this.longCompactions.prestartAllCoreThreads(); 133 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 134 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), 135 new ThreadFactory() { 136 @Override 137 public Thread newThread(Runnable r) { 138 String name = n + "-shortCompactions-" + System.currentTimeMillis(); 139 return new Thread(r, name); 140 } 141 }); 142 this.shortCompactions 143 .setRejectedExecutionHandler(new Rejection()); 144 this.splits = (ThreadPoolExecutor) 145 Executors.newFixedThreadPool(splitThreads, 146 new ThreadFactory() { 147 @Override 148 public Thread newThread(Runnable r) { 149 String name = n + "-splits-" + System.currentTimeMillis(); 150 return new Thread(r, name); 151 } 152 }); 153 154 // compaction throughput controller 155 this.compactionThroughputController = 156 CompactionThroughputControllerFactory.create(server, conf); 157 } 158 159 @Override 160 public String toString() { 161 return "compactionQueue=(longCompactions=" 162 + longCompactions.getQueue().size() + ":shortCompactions=" 163 + shortCompactions.getQueue().size() + ")" 164 + ", splitQueue=" + splits.getQueue().size(); 165 } 166 167 public String dumpQueue() { 168 StringBuilder queueLists = new StringBuilder(); 169 queueLists.append("Compaction/Split Queue dump:\n"); 170 queueLists.append(" LargeCompation Queue:\n"); 171 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 172 Iterator<Runnable> it = lq.iterator(); 173 while (it.hasNext()) { 174 queueLists.append(" " + it.next().toString()); 175 queueLists.append("\n"); 176 } 177 178 if (shortCompactions != null) { 179 queueLists.append("\n"); 180 queueLists.append(" SmallCompation Queue:\n"); 181 lq = shortCompactions.getQueue(); 182 it = lq.iterator(); 183 while (it.hasNext()) { 184 queueLists.append(" " + it.next().toString()); 185 queueLists.append("\n"); 186 } 187 } 188 189 queueLists.append("\n"); 190 queueLists.append(" Split Queue:\n"); 191 lq = splits.getQueue(); 192 it = lq.iterator(); 193 while (it.hasNext()) { 194 queueLists.append(" " + it.next().toString()); 195 queueLists.append("\n"); 196 } 197 198 return queueLists.toString(); 199 } 200 201 public synchronized boolean requestSplit(final Region r) { 202 // don't split regions that are blocking 203 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) { 204 byte[] midKey = ((HRegion)r).checkSplit(); 205 if (midKey != null) { 206 requestSplit(r, midKey); 207 return true; 208 } 209 } 210 return false; 211 } 212 213 public synchronized void requestSplit(final Region r, byte[] midKey) { 214 requestSplit(r, midKey, null); 215 } 216 217 /* 218 * The User parameter allows the split thread to assume the correct user identity 219 */ 220 public synchronized void requestSplit(final Region r, byte[] midKey, User user) { 221 if (midKey == null) { 222 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + 223 " not splittable because midkey=null"); 224 if (((HRegion)r).shouldForceSplit()) { 225 ((HRegion)r).clearSplit(); 226 } 227 return; 228 } 229 try { 230 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 231 if (LOG.isDebugEnabled()) { 232 LOG.debug("Splitting " + r + ", " + this); 233 } 234 } catch (RejectedExecutionException ree) { 235 LOG.info("Could not execute split for " + r, ree); 236 } 237 } 238 239 private interface CompactionCompleteTracker { 240 241 default void completed(Store store) { 242 } 243 } 244 245 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 246 new CompactionCompleteTracker() { 247 }; 248 249 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 250 251 private final CompactionLifeCycleTracker tracker; 252 253 private final AtomicInteger remaining; 254 255 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 256 this.tracker = tracker; 257 this.remaining = new AtomicInteger(numberOfStores); 258 } 259 260 @Override 261 public void completed(Store store) { 262 if (remaining.decrementAndGet() == 0) { 263 tracker.completed(); 264 } 265 } 266 } 267 268 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 269 IntSupplier numberOfStores) { 270 if (tracker == CompactionLifeCycleTracker.DUMMY) { 271 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 272 // the life cycle of a compaction. 273 return DUMMY_COMPLETE_TRACKER; 274 } else { 275 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 276 } 277 } 278 279 @Override 280 public synchronized void requestCompaction(HRegion region, String why, int priority, 281 CompactionLifeCycleTracker tracker, User user) throws IOException { 282 requestCompactionInternal(region, why, priority, true, tracker, 283 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 284 } 285 286 @Override 287 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 288 CompactionLifeCycleTracker tracker, User user) throws IOException { 289 requestCompactionInternal(region, store, why, priority, true, tracker, 290 getCompleteTracker(tracker, () -> 1), user); 291 } 292 293 private void requestCompactionInternal(HRegion region, String why, int priority, 294 boolean selectNow, CompactionLifeCycleTracker tracker, 295 CompactionCompleteTracker completeTracker, User user) throws IOException { 296 // request compaction on all stores 297 for (HStore store : region.stores.values()) { 298 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 299 user); 300 } 301 } 302 303 private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 304 boolean selectNow, CompactionLifeCycleTracker tracker, 305 CompactionCompleteTracker completeTracker, User user) throws IOException { 306 if (this.server.isStopped() || (region.getTableDescriptor() != null && 307 !region.getTableDescriptor().isCompactionEnabled())) { 308 return; 309 } 310 RegionServerSpaceQuotaManager spaceQuotaManager = 311 this.server.getRegionServerSpaceQuotaManager(); 312 if (spaceQuotaManager != null && 313 spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 314 String reason = "Ignoring compaction request for " + region + 315 " as an active space quota violation " + " policy disallows compactions."; 316 tracker.notExecuted(store, reason); 317 completeTracker.completed(store); 318 LOG.debug(reason); 319 return; 320 } 321 322 CompactionContext compaction; 323 if (selectNow) { 324 Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user); 325 if (!c.isPresent()) { 326 // message logged inside 327 return; 328 } 329 compaction = c.get(); 330 } else { 331 compaction = null; 332 } 333 334 ThreadPoolExecutor pool; 335 if (selectNow) { 336 // compaction.get is safe as we will just return if selectNow is true but no compaction is 337 // selected 338 pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions 339 : shortCompactions; 340 } else { 341 // We assume that most compactions are small. So, put system compactions into small 342 // pool; we will do selection there, and move to large pool if necessary. 343 pool = shortCompactions; 344 } 345 pool.execute( 346 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 347 region.incrementCompactionsQueuedCount(); 348 if (LOG.isDebugEnabled()) { 349 String type = (pool == shortCompactions) ? "Small " : "Large "; 350 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 351 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 352 } 353 } 354 355 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 356 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 357 DUMMY_COMPLETE_TRACKER, null); 358 } 359 360 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) 361 throws IOException { 362 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 363 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 364 } 365 366 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 367 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 368 throws IOException { 369 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 370 if (!compaction.isPresent() && region.getRegionInfo() != null) { 371 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + 372 " because compaction request was cancelled"; 373 tracker.notExecuted(store, reason); 374 completeTracker.completed(store); 375 LOG.debug(reason); 376 } 377 return compaction; 378 } 379 380 /** 381 * Only interrupt once it's done with a run through the work loop. 382 */ 383 void interruptIfNecessary() { 384 splits.shutdown(); 385 longCompactions.shutdown(); 386 shortCompactions.shutdown(); 387 } 388 389 private void waitFor(ThreadPoolExecutor t, String name) { 390 boolean done = false; 391 while (!done) { 392 try { 393 done = t.awaitTermination(60, TimeUnit.SECONDS); 394 LOG.info("Waiting for " + name + " to finish..."); 395 if (!done) { 396 t.shutdownNow(); 397 } 398 } catch (InterruptedException ie) { 399 LOG.warn("Interrupted waiting for " + name + " to finish..."); 400 t.shutdownNow(); 401 } 402 } 403 } 404 405 void join() { 406 waitFor(splits, "Split Thread"); 407 waitFor(longCompactions, "Large Compaction Thread"); 408 waitFor(shortCompactions, "Small Compaction Thread"); 409 } 410 411 /** 412 * Returns the current size of the queue containing regions that are 413 * processed. 414 * 415 * @return The current size of the regions queue. 416 */ 417 public int getCompactionQueueSize() { 418 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 419 } 420 421 public int getLargeCompactionQueueSize() { 422 return longCompactions.getQueue().size(); 423 } 424 425 426 public int getSmallCompactionQueueSize() { 427 return shortCompactions.getQueue().size(); 428 } 429 430 public int getSplitQueueSize() { 431 return splits.getQueue().size(); 432 } 433 434 private boolean shouldSplitRegion() { 435 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { 436 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 437 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); 438 } 439 return (regionSplitLimit > server.getNumberOfOnlineRegions()); 440 } 441 442 /** 443 * @return the regionSplitLimit 444 */ 445 public int getRegionSplitLimit() { 446 return this.regionSplitLimit; 447 } 448 449 private static final Comparator<Runnable> COMPARATOR = 450 new Comparator<Runnable>() { 451 452 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 453 if (r1 == r2) { 454 return 0; //they are the same request 455 } 456 // less first 457 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 458 if (cmp != 0) { 459 return cmp; 460 } 461 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 462 if (cmp != 0) { 463 return cmp; 464 } 465 466 // break the tie based on hash code 467 return System.identityHashCode(r1) - System.identityHashCode(r2); 468 } 469 470 @Override 471 public int compare(Runnable r1, Runnable r2) { 472 // CompactionRunner first 473 if (r1 instanceof CompactionRunner) { 474 if (!(r2 instanceof CompactionRunner)) { 475 return -1; 476 } 477 } else { 478 if (r2 instanceof CompactionRunner) { 479 return 1; 480 } else { 481 // break the tie based on hash code 482 return System.identityHashCode(r1) - System.identityHashCode(r2); 483 } 484 } 485 CompactionRunner o1 = (CompactionRunner) r1; 486 CompactionRunner o2 = (CompactionRunner) r2; 487 // less first 488 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 489 if (cmp != 0) { 490 return cmp; 491 } 492 CompactionContext c1 = o1.compaction; 493 CompactionContext c2 = o2.compaction; 494 if (c1 != null) { 495 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 496 } else { 497 return c2 != null ? 1 : 0; 498 } 499 } 500 }; 501 502 private final class CompactionRunner implements Runnable { 503 private final HStore store; 504 private final HRegion region; 505 private final CompactionContext compaction; 506 private final CompactionLifeCycleTracker tracker; 507 private final CompactionCompleteTracker completeTracker; 508 private int queuedPriority; 509 private ThreadPoolExecutor parent; 510 private User user; 511 private long time; 512 513 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 514 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 515 ThreadPoolExecutor parent, User user) { 516 this.store = store; 517 this.region = region; 518 this.compaction = compaction; 519 this.tracker = tracker; 520 this.completeTracker = completeTracker; 521 this.queuedPriority = 522 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 523 this.parent = parent; 524 this.user = user; 525 this.time = EnvironmentEdgeManager.currentTime(); 526 } 527 528 @Override 529 public String toString() { 530 if (compaction != null) { 531 return "Request=" + compaction.getRequest(); 532 } else { 533 return "region=" + region.toString() + ", storeName=" + store.toString() + 534 ", priority=" + queuedPriority + ", startTime=" + time; 535 } 536 } 537 538 private void doCompaction(User user) { 539 CompactionContext c; 540 // Common case - system compaction without a file selection. Select now. 541 if (compaction == null) { 542 int oldPriority = this.queuedPriority; 543 this.queuedPriority = this.store.getCompactPriority(); 544 if (this.queuedPriority > oldPriority) { 545 // Store priority decreased while we were in queue (due to some other compaction?), 546 // requeue with new priority to avoid blocking potential higher priorities. 547 this.parent.execute(this); 548 return; 549 } 550 Optional<CompactionContext> selected; 551 try { 552 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 553 completeTracker, user); 554 } catch (IOException ex) { 555 LOG.error("Compaction selection failed " + this, ex); 556 server.checkFileSystem(); 557 region.decrementCompactionsQueuedCount(); 558 return; 559 } 560 if (!selected.isPresent()) { 561 region.decrementCompactionsQueuedCount(); 562 return; // nothing to do 563 } 564 c = selected.get(); 565 assert c.hasSelection(); 566 // Now see if we are in correct pool for the size; if not, go to the correct one. 567 // We might end up waiting for a while, so cancel the selection. 568 569 ThreadPoolExecutor pool = 570 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 571 572 // Long compaction pool can process small job 573 // Short compaction pool should not process large job 574 if (this.parent == shortCompactions && pool == longCompactions) { 575 this.store.cancelRequestedCompaction(c); 576 this.parent = pool; 577 this.parent.execute(this); 578 return; 579 } 580 } else { 581 c = compaction; 582 } 583 // Finally we can compact something. 584 assert c != null; 585 586 tracker.beforeExecution(store); 587 try { 588 // Note: please don't put single-compaction logic here; 589 // put it into region/store/etc. This is CST logic. 590 long start = EnvironmentEdgeManager.currentTime(); 591 boolean completed = 592 region.compact(c, store, compactionThroughputController, user); 593 long now = EnvironmentEdgeManager.currentTime(); 594 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + 595 this + "; duration=" + StringUtils.formatTimeDiff(now, start)); 596 if (completed) { 597 // degenerate case: blocked regions require recursive enqueues 598 if (store.getCompactPriority() <= 0) { 599 requestSystemCompaction(region, store, "Recursive enqueue"); 600 } else { 601 // see if the compaction has caused us to exceed max region size 602 requestSplit(region); 603 } 604 } 605 } catch (IOException ex) { 606 IOException remoteEx = 607 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 608 LOG.error("Compaction failed " + this, remoteEx); 609 if (remoteEx != ex) { 610 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 611 } 612 region.reportCompactionRequestFailure(); 613 server.checkFileSystem(); 614 } catch (Exception ex) { 615 LOG.error("Compaction failed " + this, ex); 616 region.reportCompactionRequestFailure(); 617 server.checkFileSystem(); 618 } finally { 619 tracker.afterExecution(store); 620 completeTracker.completed(store); 621 region.decrementCompactionsQueuedCount(); 622 LOG.debug("Status {}", CompactSplit.this); 623 } 624 } 625 626 @Override 627 public void run() { 628 Preconditions.checkNotNull(server); 629 if (server.isStopped() 630 || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) { 631 region.decrementCompactionsQueuedCount(); 632 return; 633 } 634 doCompaction(user); 635 } 636 637 private String formatStackTrace(Exception ex) { 638 StringWriter sw = new StringWriter(); 639 PrintWriter pw = new PrintWriter(sw); 640 ex.printStackTrace(pw); 641 pw.flush(); 642 return sw.toString(); 643 } 644 } 645 646 /** 647 * Cleanup class to use when rejecting a compaction request from the queue. 648 */ 649 private static class Rejection implements RejectedExecutionHandler { 650 @Override 651 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { 652 if (runnable instanceof CompactionRunner) { 653 CompactionRunner runner = (CompactionRunner) runnable; 654 LOG.debug("Compaction Rejected: " + runner); 655 if (runner.compaction != null) { 656 runner.store.cancelRequestedCompaction(runner.compaction); 657 } 658 } 659 } 660 } 661 662 /** 663 * {@inheritDoc} 664 */ 665 @Override 666 public void onConfigurationChange(Configuration newConf) { 667 // Check if number of large / small compaction threads has changed, and then 668 // adjust the core pool size of the thread pools, by using the 669 // setCorePoolSize() method. According to the javadocs, it is safe to 670 // change the core pool size on-the-fly. We need to reset the maximum 671 // pool size, as well. 672 int largeThreads = Math.max(1, newConf.getInt( 673 LARGE_COMPACTION_THREADS, 674 LARGE_COMPACTION_THREADS_DEFAULT)); 675 if (this.longCompactions.getCorePoolSize() != largeThreads) { 676 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + 677 " from " + this.longCompactions.getCorePoolSize() + " to " + 678 largeThreads); 679 if(this.longCompactions.getCorePoolSize() < largeThreads) { 680 this.longCompactions.setMaximumPoolSize(largeThreads); 681 this.longCompactions.setCorePoolSize(largeThreads); 682 } else { 683 this.longCompactions.setCorePoolSize(largeThreads); 684 this.longCompactions.setMaximumPoolSize(largeThreads); 685 } 686 } 687 688 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, 689 SMALL_COMPACTION_THREADS_DEFAULT); 690 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 691 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + 692 " from " + this.shortCompactions.getCorePoolSize() + " to " + 693 smallThreads); 694 if(this.shortCompactions.getCorePoolSize() < smallThreads) { 695 this.shortCompactions.setMaximumPoolSize(smallThreads); 696 this.shortCompactions.setCorePoolSize(smallThreads); 697 } else { 698 this.shortCompactions.setCorePoolSize(smallThreads); 699 this.shortCompactions.setMaximumPoolSize(smallThreads); 700 } 701 } 702 703 int splitThreads = newConf.getInt(SPLIT_THREADS, 704 SPLIT_THREADS_DEFAULT); 705 if (this.splits.getCorePoolSize() != splitThreads) { 706 LOG.info("Changing the value of " + SPLIT_THREADS + 707 " from " + this.splits.getCorePoolSize() + " to " + 708 splitThreads); 709 if(this.splits.getCorePoolSize() < splitThreads) { 710 this.splits.setMaximumPoolSize(splitThreads); 711 this.splits.setCorePoolSize(splitThreads); 712 } else { 713 this.splits.setCorePoolSize(splitThreads); 714 this.splits.setMaximumPoolSize(splitThreads); 715 } 716 } 717 718 ThroughputController old = this.compactionThroughputController; 719 if (old != null) { 720 old.stop("configuration change"); 721 } 722 this.compactionThroughputController = 723 CompactionThroughputControllerFactory.create(server, newConf); 724 725 // We change this atomically here instead of reloading the config in order that upstream 726 // would be the only one with the flexibility to reload the config. 727 this.conf.reloadConfiguration(); 728 } 729 730 protected int getSmallCompactionThreadNum() { 731 return this.shortCompactions.getCorePoolSize(); 732 } 733 734 protected int getLargeCompactionThreadNum() { 735 return this.longCompactions.getCorePoolSize(); 736 } 737 738 protected int getSplitThreadNum() { 739 return this.splits.getCorePoolSize(); 740 } 741 742 /** 743 * {@inheritDoc} 744 */ 745 @Override 746 public void registerChildren(ConfigurationManager manager) { 747 // No children to register. 748 } 749 750 /** 751 * {@inheritDoc} 752 */ 753 @Override 754 public void deregisterChildren(ConfigurationManager manager) { 755 // No children to register 756 } 757 758 @VisibleForTesting 759 public ThroughputController getCompactionThroughputController() { 760 return compactionThroughputController; 761 } 762 763 @VisibleForTesting 764 /** 765 * Shutdown the long compaction thread pool. 766 * Should only be used in unit test to prevent long compaction thread pool from stealing job 767 * from short compaction queue 768 */ 769 void shutdownLongCompactions(){ 770 this.longCompactions.shutdown(); 771 } 772 773 public void clearLongCompactionsQueue() { 774 longCompactions.getQueue().clear(); 775 } 776 777 public void clearShortCompactionsQueue() { 778 shortCompactions.getQueue().clear(); 779 } 780}