001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 023 024import java.io.IOException; 025import java.io.PrintWriter; 026import java.io.StringWriter; 027import java.util.Comparator; 028import java.util.Iterator; 029import java.util.Optional; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.Executors; 032import java.util.concurrent.RejectedExecutionException; 033import java.util.concurrent.RejectedExecutionHandler; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.function.IntSupplier; 038 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.conf.ConfigurationManager; 041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 049import org.apache.hadoop.hbase.security.Superusers; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.StealJobQueue; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.util.StringUtils; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 062 063/** 064 * Compact region on request and then run split if appropriate 065 */ 066@InterfaceAudience.Private 067public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 068 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 069 070 // Configuration key for the large compaction threads. 071 public final static String LARGE_COMPACTION_THREADS = 072 "hbase.regionserver.thread.compaction.large"; 073 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 074 075 // Configuration key for the small compaction threads. 076 public final static String SMALL_COMPACTION_THREADS = 077 "hbase.regionserver.thread.compaction.small"; 078 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 079 080 // Configuration key for split threads 081 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 082 public final static int SPLIT_THREADS_DEFAULT = 1; 083 084 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 085 "hbase.regionserver.regionSplitLimit"; 086 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; 087 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = 088 "hbase.regionserver.compaction.enabled"; 089 090 private final HRegionServer server; 091 private final Configuration conf; 092 private volatile ThreadPoolExecutor longCompactions; 093 private volatile ThreadPoolExecutor shortCompactions; 094 private volatile ThreadPoolExecutor splits; 095 096 private volatile ThroughputController compactionThroughputController; 097 098 private volatile boolean compactionsEnabled; 099 /** 100 * Splitting should not take place if the total number of regions exceed this. 101 * This is not a hard limit to the number of regions but it is a guideline to 102 * stop splitting after number of online regions is greater than this. 103 */ 104 private int regionSplitLimit; 105 106 /** @param server */ 107 CompactSplit(HRegionServer server) { 108 this.server = server; 109 this.conf = server.getConfiguration(); 110 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true); 111 createCompactionExecutors(); 112 createSplitExcecutors(); 113 114 // compaction throughput controller 115 this.compactionThroughputController = 116 CompactionThroughputControllerFactory.create(server, conf); 117 } 118 119 private void createSplitExcecutors() { 120 final String n = Thread.currentThread().getName(); 121 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 122 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 123 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 124 } 125 126 private void createCompactionExecutors() { 127 this.regionSplitLimit = 128 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 129 130 int largeThreads = 131 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 132 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 133 134 // if we have throttle threads, make sure the user also specified size 135 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 136 137 final String n = Thread.currentThread().getName(); 138 139 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 140 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 141 stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d") 142 .setDaemon(true).build()); 143 this.longCompactions.setRejectedExecutionHandler(new Rejection()); 144 this.longCompactions.prestartAllCoreThreads(); 145 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 146 stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder() 147 .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 148 this.shortCompactions.setRejectedExecutionHandler(new Rejection()); 149 } 150 151 @Override 152 public String toString() { 153 return "compactionQueue=(longCompactions=" 154 + longCompactions.getQueue().size() + ":shortCompactions=" 155 + shortCompactions.getQueue().size() + ")" 156 + ", splitQueue=" + splits.getQueue().size(); 157 } 158 159 public String dumpQueue() { 160 StringBuilder queueLists = new StringBuilder(); 161 queueLists.append("Compaction/Split Queue dump:\n"); 162 queueLists.append(" LargeCompation Queue:\n"); 163 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 164 Iterator<Runnable> it = lq.iterator(); 165 while (it.hasNext()) { 166 queueLists.append(" " + it.next().toString()); 167 queueLists.append("\n"); 168 } 169 170 if (shortCompactions != null) { 171 queueLists.append("\n"); 172 queueLists.append(" SmallCompation Queue:\n"); 173 lq = shortCompactions.getQueue(); 174 it = lq.iterator(); 175 while (it.hasNext()) { 176 queueLists.append(" " + it.next().toString()); 177 queueLists.append("\n"); 178 } 179 } 180 181 queueLists.append("\n"); 182 queueLists.append(" Split Queue:\n"); 183 lq = splits.getQueue(); 184 it = lq.iterator(); 185 while (it.hasNext()) { 186 queueLists.append(" " + it.next().toString()); 187 queueLists.append("\n"); 188 } 189 190 return queueLists.toString(); 191 } 192 193 public synchronized boolean requestSplit(final Region r) { 194 // don't split regions that are blocking 195 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) { 196 byte[] midKey = ((HRegion)r).checkSplit(); 197 if (midKey != null) { 198 requestSplit(r, midKey); 199 return true; 200 } 201 } 202 return false; 203 } 204 205 public synchronized void requestSplit(final Region r, byte[] midKey) { 206 requestSplit(r, midKey, null); 207 } 208 209 /* 210 * The User parameter allows the split thread to assume the correct user identity 211 */ 212 public synchronized void requestSplit(final Region r, byte[] midKey, User user) { 213 if (midKey == null) { 214 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + 215 " not splittable because midkey=null"); 216 if (((HRegion)r).shouldForceSplit()) { 217 ((HRegion)r).clearSplit(); 218 } 219 return; 220 } 221 try { 222 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 223 if (LOG.isDebugEnabled()) { 224 LOG.debug("Splitting " + r + ", " + this); 225 } 226 } catch (RejectedExecutionException ree) { 227 LOG.info("Could not execute split for " + r, ree); 228 } 229 } 230 231 private void interrupt() { 232 longCompactions.shutdownNow(); 233 shortCompactions.shutdownNow(); 234 } 235 236 private void reInitializeCompactionsExecutors() { 237 createCompactionExecutors(); 238 } 239 240 private interface CompactionCompleteTracker { 241 242 default void completed(Store store) { 243 } 244 } 245 246 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 247 new CompactionCompleteTracker() { 248 }; 249 250 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 251 252 private final CompactionLifeCycleTracker tracker; 253 254 private final AtomicInteger remaining; 255 256 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 257 this.tracker = tracker; 258 this.remaining = new AtomicInteger(numberOfStores); 259 } 260 261 @Override 262 public void completed(Store store) { 263 if (remaining.decrementAndGet() == 0) { 264 tracker.completed(); 265 } 266 } 267 } 268 269 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 270 IntSupplier numberOfStores) { 271 if (tracker == CompactionLifeCycleTracker.DUMMY) { 272 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 273 // the life cycle of a compaction. 274 return DUMMY_COMPLETE_TRACKER; 275 } else { 276 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 277 } 278 } 279 280 @Override 281 public synchronized void requestCompaction(HRegion region, String why, int priority, 282 CompactionLifeCycleTracker tracker, User user) throws IOException { 283 requestCompactionInternal(region, why, priority, true, tracker, 284 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 285 } 286 287 @Override 288 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 289 CompactionLifeCycleTracker tracker, User user) throws IOException { 290 requestCompactionInternal(region, store, why, priority, true, tracker, 291 getCompleteTracker(tracker, () -> 1), user); 292 } 293 294 @Override 295 public void switchCompaction(boolean onOrOff) { 296 if (onOrOff) { 297 // re-create executor pool if compactions are disabled. 298 if (!isCompactionsEnabled()) { 299 LOG.info("Re-Initializing compactions because user switched on compactions"); 300 reInitializeCompactionsExecutors(); 301 } 302 } else { 303 LOG.info("Interrupting running compactions because user switched off compactions"); 304 interrupt(); 305 } 306 setCompactionsEnabled(onOrOff); 307 } 308 309 private void requestCompactionInternal(HRegion region, String why, int priority, 310 boolean selectNow, CompactionLifeCycleTracker tracker, 311 CompactionCompleteTracker completeTracker, User user) throws IOException { 312 // request compaction on all stores 313 for (HStore store : region.stores.values()) { 314 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 315 user); 316 } 317 } 318 319 private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 320 boolean selectNow, CompactionLifeCycleTracker tracker, 321 CompactionCompleteTracker completeTracker, User user) throws IOException { 322 if (this.server.isStopped() || (region.getTableDescriptor() != null && 323 !region.getTableDescriptor().isCompactionEnabled())) { 324 return; 325 } 326 RegionServerSpaceQuotaManager spaceQuotaManager = 327 this.server.getRegionServerSpaceQuotaManager(); 328 329 if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 330 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 331 // Enter here only when: 332 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 333 String reason = "Ignoring compaction request for " + region + 334 " as an active space quota violation " + " policy disallows compactions."; 335 tracker.notExecuted(store, reason); 336 completeTracker.completed(store); 337 LOG.debug(reason); 338 return; 339 } 340 341 CompactionContext compaction; 342 if (selectNow) { 343 Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user); 344 if (!c.isPresent()) { 345 // message logged inside 346 return; 347 } 348 compaction = c.get(); 349 } else { 350 compaction = null; 351 } 352 353 ThreadPoolExecutor pool; 354 if (selectNow) { 355 // compaction.get is safe as we will just return if selectNow is true but no compaction is 356 // selected 357 pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions 358 : shortCompactions; 359 } else { 360 // We assume that most compactions are small. So, put system compactions into small 361 // pool; we will do selection there, and move to large pool if necessary. 362 pool = shortCompactions; 363 } 364 pool.execute( 365 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 366 region.incrementCompactionsQueuedCount(); 367 if (LOG.isDebugEnabled()) { 368 String type = (pool == shortCompactions) ? "Small " : "Large "; 369 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 370 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 371 } 372 } 373 374 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 375 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 376 DUMMY_COMPLETE_TRACKER, null); 377 } 378 379 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) 380 throws IOException { 381 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 382 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 383 } 384 385 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 386 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 387 throws IOException { 388 // don't even select for compaction if disableCompactions is set to true 389 if (!isCompactionsEnabled()) { 390 LOG.info(String.format("User has disabled compactions")); 391 return Optional.empty(); 392 } 393 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 394 if (!compaction.isPresent() && region.getRegionInfo() != null) { 395 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + 396 " because compaction request was cancelled"; 397 tracker.notExecuted(store, reason); 398 completeTracker.completed(store); 399 LOG.debug(reason); 400 } 401 return compaction; 402 } 403 404 /** 405 * Only interrupt once it's done with a run through the work loop. 406 */ 407 void interruptIfNecessary() { 408 splits.shutdown(); 409 longCompactions.shutdown(); 410 shortCompactions.shutdown(); 411 } 412 413 private void waitFor(ThreadPoolExecutor t, String name) { 414 boolean done = false; 415 while (!done) { 416 try { 417 done = t.awaitTermination(60, TimeUnit.SECONDS); 418 LOG.info("Waiting for " + name + " to finish..."); 419 if (!done) { 420 t.shutdownNow(); 421 } 422 } catch (InterruptedException ie) { 423 LOG.warn("Interrupted waiting for " + name + " to finish..."); 424 t.shutdownNow(); 425 } 426 } 427 } 428 429 void join() { 430 waitFor(splits, "Split Thread"); 431 waitFor(longCompactions, "Large Compaction Thread"); 432 waitFor(shortCompactions, "Small Compaction Thread"); 433 } 434 435 /** 436 * Returns the current size of the queue containing regions that are 437 * processed. 438 * 439 * @return The current size of the regions queue. 440 */ 441 public int getCompactionQueueSize() { 442 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 443 } 444 445 public int getLargeCompactionQueueSize() { 446 return longCompactions.getQueue().size(); 447 } 448 449 450 public int getSmallCompactionQueueSize() { 451 return shortCompactions.getQueue().size(); 452 } 453 454 public int getSplitQueueSize() { 455 return splits.getQueue().size(); 456 } 457 458 private boolean shouldSplitRegion() { 459 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { 460 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 461 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); 462 } 463 return (regionSplitLimit > server.getNumberOfOnlineRegions()); 464 } 465 466 /** 467 * @return the regionSplitLimit 468 */ 469 public int getRegionSplitLimit() { 470 return this.regionSplitLimit; 471 } 472 473 private static final Comparator<Runnable> COMPARATOR = 474 new Comparator<Runnable>() { 475 476 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 477 if (r1 == r2) { 478 return 0; //they are the same request 479 } 480 // less first 481 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 482 if (cmp != 0) { 483 return cmp; 484 } 485 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 486 if (cmp != 0) { 487 return cmp; 488 } 489 490 // break the tie based on hash code 491 return System.identityHashCode(r1) - System.identityHashCode(r2); 492 } 493 494 @Override 495 public int compare(Runnable r1, Runnable r2) { 496 // CompactionRunner first 497 if (r1 instanceof CompactionRunner) { 498 if (!(r2 instanceof CompactionRunner)) { 499 return -1; 500 } 501 } else { 502 if (r2 instanceof CompactionRunner) { 503 return 1; 504 } else { 505 // break the tie based on hash code 506 return System.identityHashCode(r1) - System.identityHashCode(r2); 507 } 508 } 509 CompactionRunner o1 = (CompactionRunner) r1; 510 CompactionRunner o2 = (CompactionRunner) r2; 511 // less first 512 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 513 if (cmp != 0) { 514 return cmp; 515 } 516 CompactionContext c1 = o1.compaction; 517 CompactionContext c2 = o2.compaction; 518 if (c1 != null) { 519 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 520 } else { 521 return c2 != null ? 1 : 0; 522 } 523 } 524 }; 525 526 private final class CompactionRunner implements Runnable { 527 private final HStore store; 528 private final HRegion region; 529 private final CompactionContext compaction; 530 private final CompactionLifeCycleTracker tracker; 531 private final CompactionCompleteTracker completeTracker; 532 private int queuedPriority; 533 private ThreadPoolExecutor parent; 534 private User user; 535 private long time; 536 537 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 538 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 539 ThreadPoolExecutor parent, User user) { 540 this.store = store; 541 this.region = region; 542 this.compaction = compaction; 543 this.tracker = tracker; 544 this.completeTracker = completeTracker; 545 this.queuedPriority = 546 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 547 this.parent = parent; 548 this.user = user; 549 this.time = EnvironmentEdgeManager.currentTime(); 550 } 551 552 @Override 553 public String toString() { 554 if (compaction != null) { 555 return "Request=" + compaction.getRequest(); 556 } else { 557 return "region=" + region.toString() + ", storeName=" + store.toString() + 558 ", priority=" + queuedPriority + ", startTime=" + time; 559 } 560 } 561 562 private void doCompaction(User user) { 563 CompactionContext c; 564 // Common case - system compaction without a file selection. Select now. 565 if (compaction == null) { 566 int oldPriority = this.queuedPriority; 567 this.queuedPriority = this.store.getCompactPriority(); 568 if (this.queuedPriority > oldPriority) { 569 // Store priority decreased while we were in queue (due to some other compaction?), 570 // requeue with new priority to avoid blocking potential higher priorities. 571 this.parent.execute(this); 572 return; 573 } 574 Optional<CompactionContext> selected; 575 try { 576 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 577 completeTracker, user); 578 } catch (IOException ex) { 579 LOG.error("Compaction selection failed " + this, ex); 580 server.checkFileSystem(); 581 region.decrementCompactionsQueuedCount(); 582 return; 583 } 584 if (!selected.isPresent()) { 585 region.decrementCompactionsQueuedCount(); 586 return; // nothing to do 587 } 588 c = selected.get(); 589 assert c.hasSelection(); 590 // Now see if we are in correct pool for the size; if not, go to the correct one. 591 // We might end up waiting for a while, so cancel the selection. 592 593 ThreadPoolExecutor pool = 594 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 595 596 // Long compaction pool can process small job 597 // Short compaction pool should not process large job 598 if (this.parent == shortCompactions && pool == longCompactions) { 599 this.store.cancelRequestedCompaction(c); 600 this.parent = pool; 601 this.parent.execute(this); 602 return; 603 } 604 } else { 605 c = compaction; 606 } 607 // Finally we can compact something. 608 assert c != null; 609 610 tracker.beforeExecution(store); 611 try { 612 // Note: please don't put single-compaction logic here; 613 // put it into region/store/etc. This is CST logic. 614 long start = EnvironmentEdgeManager.currentTime(); 615 boolean completed = 616 region.compact(c, store, compactionThroughputController, user); 617 long now = EnvironmentEdgeManager.currentTime(); 618 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + 619 this + "; duration=" + StringUtils.formatTimeDiff(now, start)); 620 if (completed) { 621 // degenerate case: blocked regions require recursive enqueues 622 if (store.getCompactPriority() <= 0) { 623 requestSystemCompaction(region, store, "Recursive enqueue"); 624 } else { 625 // see if the compaction has caused us to exceed max region size 626 requestSplit(region); 627 } 628 } 629 } catch (IOException ex) { 630 IOException remoteEx = 631 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 632 LOG.error("Compaction failed " + this, remoteEx); 633 if (remoteEx != ex) { 634 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 635 } 636 region.reportCompactionRequestFailure(); 637 server.checkFileSystem(); 638 } catch (Exception ex) { 639 LOG.error("Compaction failed " + this, ex); 640 region.reportCompactionRequestFailure(); 641 server.checkFileSystem(); 642 } finally { 643 tracker.afterExecution(store); 644 completeTracker.completed(store); 645 region.decrementCompactionsQueuedCount(); 646 LOG.debug("Status {}", CompactSplit.this); 647 } 648 } 649 650 @Override 651 public void run() { 652 Preconditions.checkNotNull(server); 653 if (server.isStopped() 654 || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) { 655 region.decrementCompactionsQueuedCount(); 656 return; 657 } 658 doCompaction(user); 659 } 660 661 private String formatStackTrace(Exception ex) { 662 StringWriter sw = new StringWriter(); 663 PrintWriter pw = new PrintWriter(sw); 664 ex.printStackTrace(pw); 665 pw.flush(); 666 return sw.toString(); 667 } 668 } 669 670 /** 671 * Cleanup class to use when rejecting a compaction request from the queue. 672 */ 673 private static class Rejection implements RejectedExecutionHandler { 674 @Override 675 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { 676 if (runnable instanceof CompactionRunner) { 677 CompactionRunner runner = (CompactionRunner) runnable; 678 LOG.debug("Compaction Rejected: " + runner); 679 if (runner.compaction != null) { 680 runner.store.cancelRequestedCompaction(runner.compaction); 681 } 682 } 683 } 684 } 685 686 /** 687 * {@inheritDoc} 688 */ 689 @Override 690 public void onConfigurationChange(Configuration newConf) { 691 // Check if number of large / small compaction threads has changed, and then 692 // adjust the core pool size of the thread pools, by using the 693 // setCorePoolSize() method. According to the javadocs, it is safe to 694 // change the core pool size on-the-fly. We need to reset the maximum 695 // pool size, as well. 696 int largeThreads = Math.max(1, newConf.getInt( 697 LARGE_COMPACTION_THREADS, 698 LARGE_COMPACTION_THREADS_DEFAULT)); 699 if (this.longCompactions.getCorePoolSize() != largeThreads) { 700 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + 701 " from " + this.longCompactions.getCorePoolSize() + " to " + 702 largeThreads); 703 if(this.longCompactions.getCorePoolSize() < largeThreads) { 704 this.longCompactions.setMaximumPoolSize(largeThreads); 705 this.longCompactions.setCorePoolSize(largeThreads); 706 } else { 707 this.longCompactions.setCorePoolSize(largeThreads); 708 this.longCompactions.setMaximumPoolSize(largeThreads); 709 } 710 } 711 712 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, 713 SMALL_COMPACTION_THREADS_DEFAULT); 714 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 715 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + 716 " from " + this.shortCompactions.getCorePoolSize() + " to " + 717 smallThreads); 718 if(this.shortCompactions.getCorePoolSize() < smallThreads) { 719 this.shortCompactions.setMaximumPoolSize(smallThreads); 720 this.shortCompactions.setCorePoolSize(smallThreads); 721 } else { 722 this.shortCompactions.setCorePoolSize(smallThreads); 723 this.shortCompactions.setMaximumPoolSize(smallThreads); 724 } 725 } 726 727 int splitThreads = newConf.getInt(SPLIT_THREADS, 728 SPLIT_THREADS_DEFAULT); 729 if (this.splits.getCorePoolSize() != splitThreads) { 730 LOG.info("Changing the value of " + SPLIT_THREADS + 731 " from " + this.splits.getCorePoolSize() + " to " + 732 splitThreads); 733 if(this.splits.getCorePoolSize() < splitThreads) { 734 this.splits.setMaximumPoolSize(splitThreads); 735 this.splits.setCorePoolSize(splitThreads); 736 } else { 737 this.splits.setCorePoolSize(splitThreads); 738 this.splits.setMaximumPoolSize(splitThreads); 739 } 740 } 741 742 ThroughputController old = this.compactionThroughputController; 743 if (old != null) { 744 old.stop("configuration change"); 745 } 746 this.compactionThroughputController = 747 CompactionThroughputControllerFactory.create(server, newConf); 748 749 // We change this atomically here instead of reloading the config in order that upstream 750 // would be the only one with the flexibility to reload the config. 751 this.conf.reloadConfiguration(); 752 } 753 754 protected int getSmallCompactionThreadNum() { 755 return this.shortCompactions.getCorePoolSize(); 756 } 757 758 protected int getLargeCompactionThreadNum() { 759 return this.longCompactions.getCorePoolSize(); 760 } 761 762 protected int getSplitThreadNum() { 763 return this.splits.getCorePoolSize(); 764 } 765 766 /** 767 * {@inheritDoc} 768 */ 769 @Override 770 public void registerChildren(ConfigurationManager manager) { 771 // No children to register. 772 } 773 774 /** 775 * {@inheritDoc} 776 */ 777 @Override 778 public void deregisterChildren(ConfigurationManager manager) { 779 // No children to register 780 } 781 782 @VisibleForTesting 783 public ThroughputController getCompactionThroughputController() { 784 return compactionThroughputController; 785 } 786 787 @VisibleForTesting 788 /** 789 * Shutdown the long compaction thread pool. 790 * Should only be used in unit test to prevent long compaction thread pool from stealing job 791 * from short compaction queue 792 */ 793 void shutdownLongCompactions(){ 794 this.longCompactions.shutdown(); 795 } 796 797 public void clearLongCompactionsQueue() { 798 longCompactions.getQueue().clear(); 799 } 800 801 public void clearShortCompactionsQueue() { 802 shortCompactions.getQueue().clear(); 803 } 804 805 public boolean isCompactionsEnabled() { 806 return compactionsEnabled; 807 } 808 809 public void setCompactionsEnabled(boolean compactionsEnabled) { 810 this.compactionsEnabled = compactionsEnabled; 811 this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled)); 812 } 813 814 /** 815 * @return the longCompactions thread pool executor 816 */ 817 ThreadPoolExecutor getLongCompactions() { 818 return longCompactions; 819 } 820 821 /** 822 * @return the shortCompactions thread pool executor 823 */ 824 ThreadPoolExecutor getShortCompactions() { 825 return shortCompactions; 826 } 827 828}