001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Comparator; 027import java.util.Iterator; 028import java.util.Optional; 029import java.util.concurrent.BlockingQueue; 030import java.util.concurrent.Executors; 031import java.util.concurrent.RejectedExecutionException; 032import java.util.concurrent.RejectedExecutionHandler; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.function.IntSupplier; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.conf.ConfigurationManager; 039import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 040import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 041import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 042import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 045import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 047import org.apache.hadoop.hbase.security.Superusers; 048import org.apache.hadoop.hbase.security.User; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.util.StealJobQueue; 051import org.apache.hadoop.ipc.RemoteException; 052import org.apache.hadoop.util.StringUtils; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 059 060/** 061 * Compact region on request and then run split if appropriate 062 */ 063@InterfaceAudience.Private 064public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 065 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 066 067 // Configuration key for the large compaction threads. 068 public final static String LARGE_COMPACTION_THREADS = 069 "hbase.regionserver.thread.compaction.large"; 070 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 071 072 // Configuration key for the small compaction threads. 073 public final static String SMALL_COMPACTION_THREADS = 074 "hbase.regionserver.thread.compaction.small"; 075 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 076 077 // Configuration key for split threads 078 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 079 public final static int SPLIT_THREADS_DEFAULT = 1; 080 081 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 082 "hbase.regionserver.regionSplitLimit"; 083 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; 084 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = 085 "hbase.regionserver.compaction.enabled"; 086 087 private final HRegionServer server; 088 private final Configuration conf; 089 private volatile ThreadPoolExecutor longCompactions; 090 private volatile ThreadPoolExecutor shortCompactions; 091 private volatile ThreadPoolExecutor splits; 092 093 private volatile ThroughputController compactionThroughputController; 094 095 private volatile boolean compactionsEnabled; 096 /** 097 * Splitting should not take place if the total number of regions exceed this. 098 * This is not a hard limit to the number of regions but it is a guideline to 099 * stop splitting after number of online regions is greater than this. 100 */ 101 private int regionSplitLimit; 102 103 CompactSplit(HRegionServer server) { 104 this.server = server; 105 this.conf = server.getConfiguration(); 106 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true); 107 createCompactionExecutors(); 108 createSplitExcecutors(); 109 110 // compaction throughput controller 111 this.compactionThroughputController = 112 CompactionThroughputControllerFactory.create(server, conf); 113 } 114 115 private void createSplitExcecutors() { 116 final String n = Thread.currentThread().getName(); 117 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 118 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 119 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 120 } 121 122 private void createCompactionExecutors() { 123 this.regionSplitLimit = 124 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 125 126 int largeThreads = 127 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 128 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 129 130 // if we have throttle threads, make sure the user also specified size 131 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 132 133 final String n = Thread.currentThread().getName(); 134 135 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 136 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 137 stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d") 138 .setDaemon(true).build()); 139 this.longCompactions.setRejectedExecutionHandler(new Rejection()); 140 this.longCompactions.prestartAllCoreThreads(); 141 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 142 stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder() 143 .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 144 this.shortCompactions.setRejectedExecutionHandler(new Rejection()); 145 } 146 147 @Override 148 public String toString() { 149 return "compactionQueue=(longCompactions=" 150 + longCompactions.getQueue().size() + ":shortCompactions=" 151 + shortCompactions.getQueue().size() + ")" 152 + ", splitQueue=" + splits.getQueue().size(); 153 } 154 155 public String dumpQueue() { 156 StringBuilder queueLists = new StringBuilder(); 157 queueLists.append("Compaction/Split Queue dump:\n"); 158 queueLists.append(" LargeCompation Queue:\n"); 159 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 160 Iterator<Runnable> it = lq.iterator(); 161 while (it.hasNext()) { 162 queueLists.append(" " + it.next().toString()); 163 queueLists.append("\n"); 164 } 165 166 if (shortCompactions != null) { 167 queueLists.append("\n"); 168 queueLists.append(" SmallCompation Queue:\n"); 169 lq = shortCompactions.getQueue(); 170 it = lq.iterator(); 171 while (it.hasNext()) { 172 queueLists.append(" " + it.next().toString()); 173 queueLists.append("\n"); 174 } 175 } 176 177 queueLists.append("\n"); 178 queueLists.append(" Split Queue:\n"); 179 lq = splits.getQueue(); 180 it = lq.iterator(); 181 while (it.hasNext()) { 182 queueLists.append(" " + it.next().toString()); 183 queueLists.append("\n"); 184 } 185 186 return queueLists.toString(); 187 } 188 189 public synchronized boolean requestSplit(final Region r) { 190 // don't split regions that are blocking 191 HRegion hr = (HRegion)r; 192 try { 193 if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) { 194 byte[] midKey = hr.checkSplit(); 195 if (midKey != null) { 196 requestSplit(r, midKey); 197 return true; 198 } 199 } 200 } catch (IndexOutOfBoundsException e) { 201 // We get this sometimes. Not sure why. Catch and return false; no split request. 202 LOG.warn("Catching out-of-bounds; region={}, policy={}", hr == null? null: hr.getRegionInfo(), 203 hr == null? "null": hr.getCompactPriority(), e); 204 } 205 return false; 206 } 207 208 public synchronized void requestSplit(final Region r, byte[] midKey) { 209 requestSplit(r, midKey, null); 210 } 211 212 /* 213 * The User parameter allows the split thread to assume the correct user identity 214 */ 215 public synchronized void requestSplit(final Region r, byte[] midKey, User user) { 216 if (midKey == null) { 217 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + 218 " not splittable because midkey=null"); 219 if (((HRegion)r).shouldForceSplit()) { 220 ((HRegion)r).clearSplit(); 221 } 222 return; 223 } 224 try { 225 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 226 if (LOG.isDebugEnabled()) { 227 LOG.debug("Splitting " + r + ", " + this); 228 } 229 } catch (RejectedExecutionException ree) { 230 LOG.info("Could not execute split for " + r, ree); 231 } 232 } 233 234 private void interrupt() { 235 longCompactions.shutdownNow(); 236 shortCompactions.shutdownNow(); 237 } 238 239 private void reInitializeCompactionsExecutors() { 240 createCompactionExecutors(); 241 } 242 243 private interface CompactionCompleteTracker { 244 245 default void completed(Store store) { 246 } 247 } 248 249 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 250 new CompactionCompleteTracker() {}; 251 252 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 253 254 private final CompactionLifeCycleTracker tracker; 255 256 private final AtomicInteger remaining; 257 258 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 259 this.tracker = tracker; 260 this.remaining = new AtomicInteger(numberOfStores); 261 } 262 263 @Override 264 public void completed(Store store) { 265 if (remaining.decrementAndGet() == 0) { 266 tracker.completed(); 267 } 268 } 269 } 270 271 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 272 IntSupplier numberOfStores) { 273 if (tracker == CompactionLifeCycleTracker.DUMMY) { 274 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 275 // the life cycle of a compaction. 276 return DUMMY_COMPLETE_TRACKER; 277 } else { 278 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 279 } 280 } 281 282 @Override 283 public synchronized void requestCompaction(HRegion region, String why, int priority, 284 CompactionLifeCycleTracker tracker, User user) throws IOException { 285 requestCompactionInternal(region, why, priority, true, tracker, 286 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 287 } 288 289 @Override 290 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 291 CompactionLifeCycleTracker tracker, User user) throws IOException { 292 requestCompactionInternal(region, store, why, priority, true, tracker, 293 getCompleteTracker(tracker, () -> 1), user); 294 } 295 296 @Override 297 public void switchCompaction(boolean onOrOff) { 298 if (onOrOff) { 299 // re-create executor pool if compactions are disabled. 300 if (!isCompactionsEnabled()) { 301 LOG.info("Re-Initializing compactions because user switched on compactions"); 302 reInitializeCompactionsExecutors(); 303 } 304 } else { 305 LOG.info("Interrupting running compactions because user switched off compactions"); 306 interrupt(); 307 } 308 setCompactionsEnabled(onOrOff); 309 } 310 311 private void requestCompactionInternal(HRegion region, String why, int priority, 312 boolean selectNow, CompactionLifeCycleTracker tracker, 313 CompactionCompleteTracker completeTracker, User user) throws IOException { 314 // request compaction on all stores 315 for (HStore store : region.stores.values()) { 316 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 317 user); 318 } 319 } 320 321 private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 322 boolean selectNow, CompactionLifeCycleTracker tracker, 323 CompactionCompleteTracker completeTracker, User user) throws IOException { 324 if (this.server.isStopped() || (region.getTableDescriptor() != null && 325 !region.getTableDescriptor().isCompactionEnabled())) { 326 return; 327 } 328 RegionServerSpaceQuotaManager spaceQuotaManager = 329 this.server.getRegionServerSpaceQuotaManager(); 330 331 if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 332 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 333 // Enter here only when: 334 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 335 String reason = "Ignoring compaction request for " + region + 336 " as an active space quota violation " + " policy disallows compactions."; 337 tracker.notExecuted(store, reason); 338 completeTracker.completed(store); 339 LOG.debug(reason); 340 return; 341 } 342 343 CompactionContext compaction; 344 if (selectNow) { 345 Optional<CompactionContext> c = 346 selectCompaction(region, store, priority, tracker, completeTracker, user); 347 if (!c.isPresent()) { 348 // message logged inside 349 return; 350 } 351 compaction = c.get(); 352 } else { 353 compaction = null; 354 } 355 356 ThreadPoolExecutor pool; 357 if (selectNow) { 358 // compaction.get is safe as we will just return if selectNow is true but no compaction is 359 // selected 360 pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions 361 : shortCompactions; 362 } else { 363 // We assume that most compactions are small. So, put system compactions into small 364 // pool; we will do selection there, and move to large pool if necessary. 365 pool = shortCompactions; 366 } 367 pool.execute( 368 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 369 region.incrementCompactionsQueuedCount(); 370 if (LOG.isDebugEnabled()) { 371 String type = (pool == shortCompactions) ? "Small " : "Large "; 372 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 373 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 374 } 375 } 376 377 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 378 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 379 DUMMY_COMPLETE_TRACKER, null); 380 } 381 382 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) 383 throws IOException { 384 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 385 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 386 } 387 388 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 389 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 390 throws IOException { 391 // don't even select for compaction if disableCompactions is set to true 392 if (!isCompactionsEnabled()) { 393 LOG.info(String.format("User has disabled compactions")); 394 return Optional.empty(); 395 } 396 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 397 if (!compaction.isPresent() && region.getRegionInfo() != null) { 398 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + 399 " because compaction request was cancelled"; 400 tracker.notExecuted(store, reason); 401 completeTracker.completed(store); 402 LOG.debug(reason); 403 } 404 return compaction; 405 } 406 407 /** 408 * Only interrupt once it's done with a run through the work loop. 409 */ 410 void interruptIfNecessary() { 411 splits.shutdown(); 412 longCompactions.shutdown(); 413 shortCompactions.shutdown(); 414 } 415 416 private void waitFor(ThreadPoolExecutor t, String name) { 417 boolean done = false; 418 while (!done) { 419 try { 420 done = t.awaitTermination(60, TimeUnit.SECONDS); 421 LOG.info("Waiting for " + name + " to finish..."); 422 if (!done) { 423 t.shutdownNow(); 424 } 425 } catch (InterruptedException ie) { 426 LOG.warn("Interrupted waiting for " + name + " to finish..."); 427 t.shutdownNow(); 428 } 429 } 430 } 431 432 void join() { 433 waitFor(splits, "Split Thread"); 434 waitFor(longCompactions, "Large Compaction Thread"); 435 waitFor(shortCompactions, "Small Compaction Thread"); 436 } 437 438 /** 439 * Returns the current size of the queue containing regions that are 440 * processed. 441 * 442 * @return The current size of the regions queue. 443 */ 444 public int getCompactionQueueSize() { 445 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 446 } 447 448 public int getLargeCompactionQueueSize() { 449 return longCompactions.getQueue().size(); 450 } 451 452 453 public int getSmallCompactionQueueSize() { 454 return shortCompactions.getQueue().size(); 455 } 456 457 public int getSplitQueueSize() { 458 return splits.getQueue().size(); 459 } 460 461 private boolean shouldSplitRegion() { 462 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { 463 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 464 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); 465 } 466 return (regionSplitLimit > server.getNumberOfOnlineRegions()); 467 } 468 469 /** 470 * @return the regionSplitLimit 471 */ 472 public int getRegionSplitLimit() { 473 return this.regionSplitLimit; 474 } 475 476 private static final Comparator<Runnable> COMPARATOR = 477 new Comparator<Runnable>() { 478 479 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 480 if (r1 == r2) { 481 return 0; //they are the same request 482 } 483 // less first 484 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 485 if (cmp != 0) { 486 return cmp; 487 } 488 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 489 if (cmp != 0) { 490 return cmp; 491 } 492 493 // break the tie based on hash code 494 return System.identityHashCode(r1) - System.identityHashCode(r2); 495 } 496 497 @Override 498 public int compare(Runnable r1, Runnable r2) { 499 // CompactionRunner first 500 if (r1 instanceof CompactionRunner) { 501 if (!(r2 instanceof CompactionRunner)) { 502 return -1; 503 } 504 } else { 505 if (r2 instanceof CompactionRunner) { 506 return 1; 507 } else { 508 // break the tie based on hash code 509 return System.identityHashCode(r1) - System.identityHashCode(r2); 510 } 511 } 512 CompactionRunner o1 = (CompactionRunner) r1; 513 CompactionRunner o2 = (CompactionRunner) r2; 514 // less first 515 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 516 if (cmp != 0) { 517 return cmp; 518 } 519 CompactionContext c1 = o1.compaction; 520 CompactionContext c2 = o2.compaction; 521 if (c1 != null) { 522 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 523 } else { 524 return c2 != null ? 1 : 0; 525 } 526 } 527 }; 528 529 private final class CompactionRunner implements Runnable { 530 private final HStore store; 531 private final HRegion region; 532 private final CompactionContext compaction; 533 private final CompactionLifeCycleTracker tracker; 534 private final CompactionCompleteTracker completeTracker; 535 private int queuedPriority; 536 private ThreadPoolExecutor parent; 537 private User user; 538 private long time; 539 540 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 541 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 542 ThreadPoolExecutor parent, User user) { 543 this.store = store; 544 this.region = region; 545 this.compaction = compaction; 546 this.tracker = tracker; 547 this.completeTracker = completeTracker; 548 this.queuedPriority = 549 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 550 this.parent = parent; 551 this.user = user; 552 this.time = EnvironmentEdgeManager.currentTime(); 553 } 554 555 @Override 556 public String toString() { 557 if (compaction != null) { 558 return "Request=" + compaction.getRequest(); 559 } else { 560 return "region=" + region.toString() + ", storeName=" + store.toString() + 561 ", priority=" + queuedPriority + ", startTime=" + time; 562 } 563 } 564 565 private void doCompaction(User user) { 566 CompactionContext c; 567 // Common case - system compaction without a file selection. Select now. 568 if (compaction == null) { 569 int oldPriority = this.queuedPriority; 570 this.queuedPriority = this.store.getCompactPriority(); 571 if (this.queuedPriority > oldPriority) { 572 // Store priority decreased while we were in queue (due to some other compaction?), 573 // requeue with new priority to avoid blocking potential higher priorities. 574 this.parent.execute(this); 575 return; 576 } 577 Optional<CompactionContext> selected; 578 try { 579 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 580 completeTracker, user); 581 } catch (IOException ex) { 582 LOG.error("Compaction selection failed " + this, ex); 583 server.checkFileSystem(); 584 region.decrementCompactionsQueuedCount(); 585 return; 586 } 587 if (!selected.isPresent()) { 588 region.decrementCompactionsQueuedCount(); 589 return; // nothing to do 590 } 591 c = selected.get(); 592 assert c.hasSelection(); 593 // Now see if we are in correct pool for the size; if not, go to the correct one. 594 // We might end up waiting for a while, so cancel the selection. 595 596 ThreadPoolExecutor pool = 597 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 598 599 // Long compaction pool can process small job 600 // Short compaction pool should not process large job 601 if (this.parent == shortCompactions && pool == longCompactions) { 602 this.store.cancelRequestedCompaction(c); 603 this.parent = pool; 604 this.parent.execute(this); 605 return; 606 } 607 } else { 608 c = compaction; 609 } 610 // Finally we can compact something. 611 assert c != null; 612 613 tracker.beforeExecution(store); 614 try { 615 // Note: please don't put single-compaction logic here; 616 // put it into region/store/etc. This is CST logic. 617 long start = EnvironmentEdgeManager.currentTime(); 618 boolean completed = 619 region.compact(c, store, compactionThroughputController, user); 620 long now = EnvironmentEdgeManager.currentTime(); 621 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + 622 this + "; duration=" + StringUtils.formatTimeDiff(now, start)); 623 if (completed) { 624 // degenerate case: blocked regions require recursive enqueues 625 if (store.getCompactPriority() <= 0) { 626 requestSystemCompaction(region, store, "Recursive enqueue"); 627 } else { 628 // see if the compaction has caused us to exceed max region size 629 requestSplit(region); 630 } 631 } 632 } catch (IOException ex) { 633 IOException remoteEx = 634 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 635 LOG.error("Compaction failed " + this, remoteEx); 636 if (remoteEx != ex) { 637 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 638 } 639 region.reportCompactionRequestFailure(); 640 server.checkFileSystem(); 641 } catch (Exception ex) { 642 LOG.error("Compaction failed " + this, ex); 643 region.reportCompactionRequestFailure(); 644 server.checkFileSystem(); 645 } finally { 646 tracker.afterExecution(store); 647 completeTracker.completed(store); 648 region.decrementCompactionsQueuedCount(); 649 LOG.debug("Status {}", CompactSplit.this); 650 } 651 } 652 653 @Override 654 public void run() { 655 Preconditions.checkNotNull(server); 656 if (server.isStopped() || (region.getTableDescriptor() != null && 657 !region.getTableDescriptor().isCompactionEnabled())) { 658 region.decrementCompactionsQueuedCount(); 659 return; 660 } 661 doCompaction(user); 662 } 663 664 private String formatStackTrace(Exception ex) { 665 StringWriter sw = new StringWriter(); 666 PrintWriter pw = new PrintWriter(sw); 667 ex.printStackTrace(pw); 668 pw.flush(); 669 return sw.toString(); 670 } 671 } 672 673 /** 674 * Cleanup class to use when rejecting a compaction request from the queue. 675 */ 676 private static class Rejection implements RejectedExecutionHandler { 677 @Override 678 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { 679 if (runnable instanceof CompactionRunner) { 680 CompactionRunner runner = (CompactionRunner) runnable; 681 LOG.debug("Compaction Rejected: " + runner); 682 if (runner.compaction != null) { 683 runner.store.cancelRequestedCompaction(runner.compaction); 684 } 685 } 686 } 687 } 688 689 /** 690 * {@inheritDoc} 691 */ 692 @Override 693 public void onConfigurationChange(Configuration newConf) { 694 // Check if number of large / small compaction threads has changed, and then 695 // adjust the core pool size of the thread pools, by using the 696 // setCorePoolSize() method. According to the javadocs, it is safe to 697 // change the core pool size on-the-fly. We need to reset the maximum 698 // pool size, as well. 699 int largeThreads = Math.max(1, newConf.getInt( 700 LARGE_COMPACTION_THREADS, 701 LARGE_COMPACTION_THREADS_DEFAULT)); 702 if (this.longCompactions.getCorePoolSize() != largeThreads) { 703 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + 704 " from " + this.longCompactions.getCorePoolSize() + " to " + 705 largeThreads); 706 if(this.longCompactions.getCorePoolSize() < largeThreads) { 707 this.longCompactions.setMaximumPoolSize(largeThreads); 708 this.longCompactions.setCorePoolSize(largeThreads); 709 } else { 710 this.longCompactions.setCorePoolSize(largeThreads); 711 this.longCompactions.setMaximumPoolSize(largeThreads); 712 } 713 } 714 715 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, 716 SMALL_COMPACTION_THREADS_DEFAULT); 717 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 718 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + 719 " from " + this.shortCompactions.getCorePoolSize() + " to " + 720 smallThreads); 721 if(this.shortCompactions.getCorePoolSize() < smallThreads) { 722 this.shortCompactions.setMaximumPoolSize(smallThreads); 723 this.shortCompactions.setCorePoolSize(smallThreads); 724 } else { 725 this.shortCompactions.setCorePoolSize(smallThreads); 726 this.shortCompactions.setMaximumPoolSize(smallThreads); 727 } 728 } 729 730 int splitThreads = newConf.getInt(SPLIT_THREADS, 731 SPLIT_THREADS_DEFAULT); 732 if (this.splits.getCorePoolSize() != splitThreads) { 733 LOG.info("Changing the value of " + SPLIT_THREADS + 734 " from " + this.splits.getCorePoolSize() + " to " + 735 splitThreads); 736 if(this.splits.getCorePoolSize() < splitThreads) { 737 this.splits.setMaximumPoolSize(splitThreads); 738 this.splits.setCorePoolSize(splitThreads); 739 } else { 740 this.splits.setCorePoolSize(splitThreads); 741 this.splits.setMaximumPoolSize(splitThreads); 742 } 743 } 744 745 ThroughputController old = this.compactionThroughputController; 746 if (old != null) { 747 old.stop("configuration change"); 748 } 749 this.compactionThroughputController = 750 CompactionThroughputControllerFactory.create(server, newConf); 751 752 // We change this atomically here instead of reloading the config in order that upstream 753 // would be the only one with the flexibility to reload the config. 754 this.conf.reloadConfiguration(); 755 } 756 757 protected int getSmallCompactionThreadNum() { 758 return this.shortCompactions.getCorePoolSize(); 759 } 760 761 protected int getLargeCompactionThreadNum() { 762 return this.longCompactions.getCorePoolSize(); 763 } 764 765 protected int getSplitThreadNum() { 766 return this.splits.getCorePoolSize(); 767 } 768 769 /** 770 * {@inheritDoc} 771 */ 772 @Override 773 public void registerChildren(ConfigurationManager manager) { 774 // No children to register. 775 } 776 777 /** 778 * {@inheritDoc} 779 */ 780 @Override 781 public void deregisterChildren(ConfigurationManager manager) { 782 // No children to register 783 } 784 785 @VisibleForTesting 786 public ThroughputController getCompactionThroughputController() { 787 return compactionThroughputController; 788 } 789 790 @VisibleForTesting 791 /** 792 * Shutdown the long compaction thread pool. 793 * Should only be used in unit test to prevent long compaction thread pool from stealing job 794 * from short compaction queue 795 */ 796 void shutdownLongCompactions(){ 797 this.longCompactions.shutdown(); 798 } 799 800 public void clearLongCompactionsQueue() { 801 longCompactions.getQueue().clear(); 802 } 803 804 public void clearShortCompactionsQueue() { 805 shortCompactions.getQueue().clear(); 806 } 807 808 public boolean isCompactionsEnabled() { 809 return compactionsEnabled; 810 } 811 812 public void setCompactionsEnabled(boolean compactionsEnabled) { 813 this.compactionsEnabled = compactionsEnabled; 814 this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled)); 815 } 816 817 /** 818 * @return the longCompactions thread pool executor 819 */ 820 ThreadPoolExecutor getLongCompactions() { 821 return longCompactions; 822 } 823 824 /** 825 * @return the shortCompactions thread pool executor 826 */ 827 ThreadPoolExecutor getShortCompactions() { 828 return shortCompactions; 829 } 830 831}