001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 023 024import java.io.IOException; 025import java.io.PrintWriter; 026import java.io.StringWriter; 027import java.util.Comparator; 028import java.util.Iterator; 029import java.util.Optional; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.Executors; 032import java.util.concurrent.RejectedExecutionException; 033import java.util.concurrent.RejectedExecutionHandler; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.function.IntSupplier; 038 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.conf.ConfigurationManager; 041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 049import org.apache.hadoop.hbase.security.Superusers; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.StealJobQueue; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.util.StringUtils; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 062 063/** 064 * Compact region on request and then run split if appropriate 065 */ 066@InterfaceAudience.Private 067public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 068 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 069 070 // Configuration key for the large compaction threads. 071 public final static String LARGE_COMPACTION_THREADS = 072 "hbase.regionserver.thread.compaction.large"; 073 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 074 075 // Configuration key for the small compaction threads. 076 public final static String SMALL_COMPACTION_THREADS = 077 "hbase.regionserver.thread.compaction.small"; 078 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 079 080 // Configuration key for split threads 081 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 082 public final static int SPLIT_THREADS_DEFAULT = 1; 083 084 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 085 "hbase.regionserver.regionSplitLimit"; 086 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; 087 088 private final HRegionServer server; 089 private final Configuration conf; 090 091 private final ThreadPoolExecutor longCompactions; 092 private final ThreadPoolExecutor shortCompactions; 093 private final ThreadPoolExecutor splits; 094 095 private volatile ThroughputController compactionThroughputController; 096 097 /** 098 * Splitting should not take place if the total number of regions exceed this. 099 * This is not a hard limit to the number of regions but it is a guideline to 100 * stop splitting after number of online regions is greater than this. 101 */ 102 private int regionSplitLimit; 103 104 /** @param server */ 105 CompactSplit(HRegionServer server) { 106 this.server = server; 107 this.conf = server.getConfiguration(); 108 this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, 109 DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 110 111 int largeThreads = Math.max(1, conf.getInt( 112 LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 113 int smallThreads = conf.getInt( 114 SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 115 116 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 117 118 // if we have throttle threads, make sure the user also specified size 119 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 120 121 final String n = Thread.currentThread().getName(); 122 123 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 124 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 125 stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d") 126 .setDaemon(true).build()); 127 this.longCompactions.setRejectedExecutionHandler(new Rejection()); 128 this.longCompactions.prestartAllCoreThreads(); 129 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 130 stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder() 131 .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 132 this.shortCompactions.setRejectedExecutionHandler(new Rejection()); 133 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 134 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 135 136 // compaction throughput controller 137 this.compactionThroughputController = 138 CompactionThroughputControllerFactory.create(server, conf); 139 } 140 141 @Override 142 public String toString() { 143 return "compactionQueue=(longCompactions=" 144 + longCompactions.getQueue().size() + ":shortCompactions=" 145 + shortCompactions.getQueue().size() + ")" 146 + ", splitQueue=" + splits.getQueue().size(); 147 } 148 149 public String dumpQueue() { 150 StringBuilder queueLists = new StringBuilder(); 151 queueLists.append("Compaction/Split Queue dump:\n"); 152 queueLists.append(" LargeCompation Queue:\n"); 153 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 154 Iterator<Runnable> it = lq.iterator(); 155 while (it.hasNext()) { 156 queueLists.append(" " + it.next().toString()); 157 queueLists.append("\n"); 158 } 159 160 if (shortCompactions != null) { 161 queueLists.append("\n"); 162 queueLists.append(" SmallCompation Queue:\n"); 163 lq = shortCompactions.getQueue(); 164 it = lq.iterator(); 165 while (it.hasNext()) { 166 queueLists.append(" " + it.next().toString()); 167 queueLists.append("\n"); 168 } 169 } 170 171 queueLists.append("\n"); 172 queueLists.append(" Split Queue:\n"); 173 lq = splits.getQueue(); 174 it = lq.iterator(); 175 while (it.hasNext()) { 176 queueLists.append(" " + it.next().toString()); 177 queueLists.append("\n"); 178 } 179 180 return queueLists.toString(); 181 } 182 183 public synchronized boolean requestSplit(final Region r) { 184 // don't split regions that are blocking 185 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) { 186 byte[] midKey = ((HRegion)r).checkSplit(); 187 if (midKey != null) { 188 requestSplit(r, midKey); 189 return true; 190 } 191 } 192 return false; 193 } 194 195 public synchronized void requestSplit(final Region r, byte[] midKey) { 196 requestSplit(r, midKey, null); 197 } 198 199 /* 200 * The User parameter allows the split thread to assume the correct user identity 201 */ 202 public synchronized void requestSplit(final Region r, byte[] midKey, User user) { 203 if (midKey == null) { 204 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + 205 " not splittable because midkey=null"); 206 if (((HRegion)r).shouldForceSplit()) { 207 ((HRegion)r).clearSplit(); 208 } 209 return; 210 } 211 try { 212 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("Splitting " + r + ", " + this); 215 } 216 } catch (RejectedExecutionException ree) { 217 LOG.info("Could not execute split for " + r, ree); 218 } 219 } 220 221 private interface CompactionCompleteTracker { 222 223 default void completed(Store store) { 224 } 225 } 226 227 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 228 new CompactionCompleteTracker() { 229 }; 230 231 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 232 233 private final CompactionLifeCycleTracker tracker; 234 235 private final AtomicInteger remaining; 236 237 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 238 this.tracker = tracker; 239 this.remaining = new AtomicInteger(numberOfStores); 240 } 241 242 @Override 243 public void completed(Store store) { 244 if (remaining.decrementAndGet() == 0) { 245 tracker.completed(); 246 } 247 } 248 } 249 250 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 251 IntSupplier numberOfStores) { 252 if (tracker == CompactionLifeCycleTracker.DUMMY) { 253 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 254 // the life cycle of a compaction. 255 return DUMMY_COMPLETE_TRACKER; 256 } else { 257 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 258 } 259 } 260 261 @Override 262 public synchronized void requestCompaction(HRegion region, String why, int priority, 263 CompactionLifeCycleTracker tracker, User user) throws IOException { 264 requestCompactionInternal(region, why, priority, true, tracker, 265 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 266 } 267 268 @Override 269 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 270 CompactionLifeCycleTracker tracker, User user) throws IOException { 271 requestCompactionInternal(region, store, why, priority, true, tracker, 272 getCompleteTracker(tracker, () -> 1), user); 273 } 274 275 private void requestCompactionInternal(HRegion region, String why, int priority, 276 boolean selectNow, CompactionLifeCycleTracker tracker, 277 CompactionCompleteTracker completeTracker, User user) throws IOException { 278 // request compaction on all stores 279 for (HStore store : region.stores.values()) { 280 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 281 user); 282 } 283 } 284 285 private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 286 boolean selectNow, CompactionLifeCycleTracker tracker, 287 CompactionCompleteTracker completeTracker, User user) throws IOException { 288 if (this.server.isStopped() || (region.getTableDescriptor() != null && 289 !region.getTableDescriptor().isCompactionEnabled())) { 290 return; 291 } 292 RegionServerSpaceQuotaManager spaceQuotaManager = 293 this.server.getRegionServerSpaceQuotaManager(); 294 295 if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 296 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 297 // Enter here only when: 298 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 299 String reason = "Ignoring compaction request for " + region + 300 " as an active space quota violation " + " policy disallows compactions."; 301 tracker.notExecuted(store, reason); 302 completeTracker.completed(store); 303 LOG.debug(reason); 304 return; 305 } 306 307 CompactionContext compaction; 308 if (selectNow) { 309 Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user); 310 if (!c.isPresent()) { 311 // message logged inside 312 return; 313 } 314 compaction = c.get(); 315 } else { 316 compaction = null; 317 } 318 319 ThreadPoolExecutor pool; 320 if (selectNow) { 321 // compaction.get is safe as we will just return if selectNow is true but no compaction is 322 // selected 323 pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions 324 : shortCompactions; 325 } else { 326 // We assume that most compactions are small. So, put system compactions into small 327 // pool; we will do selection there, and move to large pool if necessary. 328 pool = shortCompactions; 329 } 330 pool.execute( 331 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 332 region.incrementCompactionsQueuedCount(); 333 if (LOG.isDebugEnabled()) { 334 String type = (pool == shortCompactions) ? "Small " : "Large "; 335 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 336 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 337 } 338 } 339 340 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 341 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 342 DUMMY_COMPLETE_TRACKER, null); 343 } 344 345 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) 346 throws IOException { 347 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 348 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 349 } 350 351 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 352 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 353 throws IOException { 354 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 355 if (!compaction.isPresent() && region.getRegionInfo() != null) { 356 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + 357 " because compaction request was cancelled"; 358 tracker.notExecuted(store, reason); 359 completeTracker.completed(store); 360 LOG.debug(reason); 361 } 362 return compaction; 363 } 364 365 /** 366 * Only interrupt once it's done with a run through the work loop. 367 */ 368 void interruptIfNecessary() { 369 splits.shutdown(); 370 longCompactions.shutdown(); 371 shortCompactions.shutdown(); 372 } 373 374 private void waitFor(ThreadPoolExecutor t, String name) { 375 boolean done = false; 376 while (!done) { 377 try { 378 done = t.awaitTermination(60, TimeUnit.SECONDS); 379 LOG.info("Waiting for " + name + " to finish..."); 380 if (!done) { 381 t.shutdownNow(); 382 } 383 } catch (InterruptedException ie) { 384 LOG.warn("Interrupted waiting for " + name + " to finish..."); 385 t.shutdownNow(); 386 } 387 } 388 } 389 390 void join() { 391 waitFor(splits, "Split Thread"); 392 waitFor(longCompactions, "Large Compaction Thread"); 393 waitFor(shortCompactions, "Small Compaction Thread"); 394 } 395 396 /** 397 * Returns the current size of the queue containing regions that are 398 * processed. 399 * 400 * @return The current size of the regions queue. 401 */ 402 public int getCompactionQueueSize() { 403 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 404 } 405 406 public int getLargeCompactionQueueSize() { 407 return longCompactions.getQueue().size(); 408 } 409 410 411 public int getSmallCompactionQueueSize() { 412 return shortCompactions.getQueue().size(); 413 } 414 415 public int getSplitQueueSize() { 416 return splits.getQueue().size(); 417 } 418 419 private boolean shouldSplitRegion() { 420 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { 421 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 422 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); 423 } 424 return (regionSplitLimit > server.getNumberOfOnlineRegions()); 425 } 426 427 /** 428 * @return the regionSplitLimit 429 */ 430 public int getRegionSplitLimit() { 431 return this.regionSplitLimit; 432 } 433 434 private static final Comparator<Runnable> COMPARATOR = 435 new Comparator<Runnable>() { 436 437 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 438 if (r1 == r2) { 439 return 0; //they are the same request 440 } 441 // less first 442 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 443 if (cmp != 0) { 444 return cmp; 445 } 446 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 447 if (cmp != 0) { 448 return cmp; 449 } 450 451 // break the tie based on hash code 452 return System.identityHashCode(r1) - System.identityHashCode(r2); 453 } 454 455 @Override 456 public int compare(Runnable r1, Runnable r2) { 457 // CompactionRunner first 458 if (r1 instanceof CompactionRunner) { 459 if (!(r2 instanceof CompactionRunner)) { 460 return -1; 461 } 462 } else { 463 if (r2 instanceof CompactionRunner) { 464 return 1; 465 } else { 466 // break the tie based on hash code 467 return System.identityHashCode(r1) - System.identityHashCode(r2); 468 } 469 } 470 CompactionRunner o1 = (CompactionRunner) r1; 471 CompactionRunner o2 = (CompactionRunner) r2; 472 // less first 473 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 474 if (cmp != 0) { 475 return cmp; 476 } 477 CompactionContext c1 = o1.compaction; 478 CompactionContext c2 = o2.compaction; 479 if (c1 != null) { 480 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 481 } else { 482 return c2 != null ? 1 : 0; 483 } 484 } 485 }; 486 487 private final class CompactionRunner implements Runnable { 488 private final HStore store; 489 private final HRegion region; 490 private final CompactionContext compaction; 491 private final CompactionLifeCycleTracker tracker; 492 private final CompactionCompleteTracker completeTracker; 493 private int queuedPriority; 494 private ThreadPoolExecutor parent; 495 private User user; 496 private long time; 497 498 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 499 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 500 ThreadPoolExecutor parent, User user) { 501 this.store = store; 502 this.region = region; 503 this.compaction = compaction; 504 this.tracker = tracker; 505 this.completeTracker = completeTracker; 506 this.queuedPriority = 507 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 508 this.parent = parent; 509 this.user = user; 510 this.time = EnvironmentEdgeManager.currentTime(); 511 } 512 513 @Override 514 public String toString() { 515 if (compaction != null) { 516 return "Request=" + compaction.getRequest(); 517 } else { 518 return "region=" + region.toString() + ", storeName=" + store.toString() + 519 ", priority=" + queuedPriority + ", startTime=" + time; 520 } 521 } 522 523 private void doCompaction(User user) { 524 CompactionContext c; 525 // Common case - system compaction without a file selection. Select now. 526 if (compaction == null) { 527 int oldPriority = this.queuedPriority; 528 this.queuedPriority = this.store.getCompactPriority(); 529 if (this.queuedPriority > oldPriority) { 530 // Store priority decreased while we were in queue (due to some other compaction?), 531 // requeue with new priority to avoid blocking potential higher priorities. 532 this.parent.execute(this); 533 return; 534 } 535 Optional<CompactionContext> selected; 536 try { 537 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 538 completeTracker, user); 539 } catch (IOException ex) { 540 LOG.error("Compaction selection failed " + this, ex); 541 server.checkFileSystem(); 542 region.decrementCompactionsQueuedCount(); 543 return; 544 } 545 if (!selected.isPresent()) { 546 region.decrementCompactionsQueuedCount(); 547 return; // nothing to do 548 } 549 c = selected.get(); 550 assert c.hasSelection(); 551 // Now see if we are in correct pool for the size; if not, go to the correct one. 552 // We might end up waiting for a while, so cancel the selection. 553 554 ThreadPoolExecutor pool = 555 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 556 557 // Long compaction pool can process small job 558 // Short compaction pool should not process large job 559 if (this.parent == shortCompactions && pool == longCompactions) { 560 this.store.cancelRequestedCompaction(c); 561 this.parent = pool; 562 this.parent.execute(this); 563 return; 564 } 565 } else { 566 c = compaction; 567 } 568 // Finally we can compact something. 569 assert c != null; 570 571 tracker.beforeExecution(store); 572 try { 573 // Note: please don't put single-compaction logic here; 574 // put it into region/store/etc. This is CST logic. 575 long start = EnvironmentEdgeManager.currentTime(); 576 boolean completed = 577 region.compact(c, store, compactionThroughputController, user); 578 long now = EnvironmentEdgeManager.currentTime(); 579 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + 580 this + "; duration=" + StringUtils.formatTimeDiff(now, start)); 581 if (completed) { 582 // degenerate case: blocked regions require recursive enqueues 583 if (store.getCompactPriority() <= 0) { 584 requestSystemCompaction(region, store, "Recursive enqueue"); 585 } else { 586 // see if the compaction has caused us to exceed max region size 587 requestSplit(region); 588 } 589 } 590 } catch (IOException ex) { 591 IOException remoteEx = 592 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 593 LOG.error("Compaction failed " + this, remoteEx); 594 if (remoteEx != ex) { 595 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 596 } 597 region.reportCompactionRequestFailure(); 598 server.checkFileSystem(); 599 } catch (Exception ex) { 600 LOG.error("Compaction failed " + this, ex); 601 region.reportCompactionRequestFailure(); 602 server.checkFileSystem(); 603 } finally { 604 tracker.afterExecution(store); 605 completeTracker.completed(store); 606 region.decrementCompactionsQueuedCount(); 607 LOG.debug("Status {}", CompactSplit.this); 608 } 609 } 610 611 @Override 612 public void run() { 613 Preconditions.checkNotNull(server); 614 if (server.isStopped() 615 || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) { 616 region.decrementCompactionsQueuedCount(); 617 return; 618 } 619 doCompaction(user); 620 } 621 622 private String formatStackTrace(Exception ex) { 623 StringWriter sw = new StringWriter(); 624 PrintWriter pw = new PrintWriter(sw); 625 ex.printStackTrace(pw); 626 pw.flush(); 627 return sw.toString(); 628 } 629 } 630 631 /** 632 * Cleanup class to use when rejecting a compaction request from the queue. 633 */ 634 private static class Rejection implements RejectedExecutionHandler { 635 @Override 636 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { 637 if (runnable instanceof CompactionRunner) { 638 CompactionRunner runner = (CompactionRunner) runnable; 639 LOG.debug("Compaction Rejected: " + runner); 640 if (runner.compaction != null) { 641 runner.store.cancelRequestedCompaction(runner.compaction); 642 } 643 } 644 } 645 } 646 647 /** 648 * {@inheritDoc} 649 */ 650 @Override 651 public void onConfigurationChange(Configuration newConf) { 652 // Check if number of large / small compaction threads has changed, and then 653 // adjust the core pool size of the thread pools, by using the 654 // setCorePoolSize() method. According to the javadocs, it is safe to 655 // change the core pool size on-the-fly. We need to reset the maximum 656 // pool size, as well. 657 int largeThreads = Math.max(1, newConf.getInt( 658 LARGE_COMPACTION_THREADS, 659 LARGE_COMPACTION_THREADS_DEFAULT)); 660 if (this.longCompactions.getCorePoolSize() != largeThreads) { 661 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + 662 " from " + this.longCompactions.getCorePoolSize() + " to " + 663 largeThreads); 664 if(this.longCompactions.getCorePoolSize() < largeThreads) { 665 this.longCompactions.setMaximumPoolSize(largeThreads); 666 this.longCompactions.setCorePoolSize(largeThreads); 667 } else { 668 this.longCompactions.setCorePoolSize(largeThreads); 669 this.longCompactions.setMaximumPoolSize(largeThreads); 670 } 671 } 672 673 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, 674 SMALL_COMPACTION_THREADS_DEFAULT); 675 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 676 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + 677 " from " + this.shortCompactions.getCorePoolSize() + " to " + 678 smallThreads); 679 if(this.shortCompactions.getCorePoolSize() < smallThreads) { 680 this.shortCompactions.setMaximumPoolSize(smallThreads); 681 this.shortCompactions.setCorePoolSize(smallThreads); 682 } else { 683 this.shortCompactions.setCorePoolSize(smallThreads); 684 this.shortCompactions.setMaximumPoolSize(smallThreads); 685 } 686 } 687 688 int splitThreads = newConf.getInt(SPLIT_THREADS, 689 SPLIT_THREADS_DEFAULT); 690 if (this.splits.getCorePoolSize() != splitThreads) { 691 LOG.info("Changing the value of " + SPLIT_THREADS + 692 " from " + this.splits.getCorePoolSize() + " to " + 693 splitThreads); 694 if(this.splits.getCorePoolSize() < splitThreads) { 695 this.splits.setMaximumPoolSize(splitThreads); 696 this.splits.setCorePoolSize(splitThreads); 697 } else { 698 this.splits.setCorePoolSize(splitThreads); 699 this.splits.setMaximumPoolSize(splitThreads); 700 } 701 } 702 703 ThroughputController old = this.compactionThroughputController; 704 if (old != null) { 705 old.stop("configuration change"); 706 } 707 this.compactionThroughputController = 708 CompactionThroughputControllerFactory.create(server, newConf); 709 710 // We change this atomically here instead of reloading the config in order that upstream 711 // would be the only one with the flexibility to reload the config. 712 this.conf.reloadConfiguration(); 713 } 714 715 protected int getSmallCompactionThreadNum() { 716 return this.shortCompactions.getCorePoolSize(); 717 } 718 719 protected int getLargeCompactionThreadNum() { 720 return this.longCompactions.getCorePoolSize(); 721 } 722 723 protected int getSplitThreadNum() { 724 return this.splits.getCorePoolSize(); 725 } 726 727 /** 728 * {@inheritDoc} 729 */ 730 @Override 731 public void registerChildren(ConfigurationManager manager) { 732 // No children to register. 733 } 734 735 /** 736 * {@inheritDoc} 737 */ 738 @Override 739 public void deregisterChildren(ConfigurationManager manager) { 740 // No children to register 741 } 742 743 @VisibleForTesting 744 public ThroughputController getCompactionThroughputController() { 745 return compactionThroughputController; 746 } 747 748 @VisibleForTesting 749 /** 750 * Shutdown the long compaction thread pool. 751 * Should only be used in unit test to prevent long compaction thread pool from stealing job 752 * from short compaction queue 753 */ 754 void shutdownLongCompactions(){ 755 this.longCompactions.shutdown(); 756 } 757 758 public void clearLongCompactionsQueue() { 759 longCompactions.getQueue().clear(); 760 } 761 762 public void clearShortCompactionsQueue() { 763 shortCompactions.getQueue().clear(); 764 } 765}