001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 023 024import java.io.IOException; 025import java.io.PrintWriter; 026import java.io.StringWriter; 027import java.util.Comparator; 028import java.util.Iterator; 029import java.util.Optional; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.Executors; 032import java.util.concurrent.RejectedExecutionException; 033import java.util.concurrent.RejectedExecutionHandler; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.function.IntSupplier; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.conf.ConfigurationManager; 040import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 041import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 046import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 048import org.apache.hadoop.hbase.security.Superusers; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.StealJobQueue; 052import org.apache.hadoop.ipc.RemoteException; 053import org.apache.hadoop.util.StringUtils; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 060 061/** 062 * Compact region on request and then run split if appropriate 063 */ 064@InterfaceAudience.Private 065public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 066 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 067 068 // Configuration key for the large compaction threads. 069 public final static String LARGE_COMPACTION_THREADS = 070 "hbase.regionserver.thread.compaction.large"; 071 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 072 073 // Configuration key for the small compaction threads. 074 public final static String SMALL_COMPACTION_THREADS = 075 "hbase.regionserver.thread.compaction.small"; 076 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 077 078 // Configuration key for split threads 079 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 080 public final static int SPLIT_THREADS_DEFAULT = 1; 081 082 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 083 "hbase.regionserver.regionSplitLimit"; 084 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; 085 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = 086 "hbase.regionserver.compaction.enabled"; 087 088 private final HRegionServer server; 089 private final Configuration conf; 090 private volatile ThreadPoolExecutor longCompactions; 091 private volatile ThreadPoolExecutor shortCompactions; 092 private volatile ThreadPoolExecutor splits; 093 094 private volatile ThroughputController compactionThroughputController; 095 096 private volatile boolean compactionsEnabled; 097 /** 098 * Splitting should not take place if the total number of regions exceed this. 099 * This is not a hard limit to the number of regions but it is a guideline to 100 * stop splitting after number of online regions is greater than this. 101 */ 102 private int regionSplitLimit; 103 104 CompactSplit(HRegionServer server) { 105 this.server = server; 106 this.conf = server.getConfiguration(); 107 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true); 108 createCompactionExecutors(); 109 createSplitExcecutors(); 110 111 // compaction throughput controller 112 this.compactionThroughputController = 113 CompactionThroughputControllerFactory.create(server, conf); 114 } 115 116 private void createSplitExcecutors() { 117 final String n = Thread.currentThread().getName(); 118 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 119 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 120 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 121 } 122 123 private void createCompactionExecutors() { 124 this.regionSplitLimit = 125 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 126 127 int largeThreads = 128 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 129 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 130 131 // if we have throttle threads, make sure the user also specified size 132 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 133 134 final String n = Thread.currentThread().getName(); 135 136 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 137 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 138 stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d") 139 .setDaemon(true).build()); 140 this.longCompactions.setRejectedExecutionHandler(new Rejection()); 141 this.longCompactions.prestartAllCoreThreads(); 142 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 143 stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder() 144 .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 145 this.shortCompactions.setRejectedExecutionHandler(new Rejection()); 146 } 147 148 @Override 149 public String toString() { 150 return "compactionQueue=(longCompactions=" 151 + longCompactions.getQueue().size() + ":shortCompactions=" 152 + shortCompactions.getQueue().size() + ")" 153 + ", splitQueue=" + splits.getQueue().size(); 154 } 155 156 public String dumpQueue() { 157 StringBuilder queueLists = new StringBuilder(); 158 queueLists.append("Compaction/Split Queue dump:\n"); 159 queueLists.append(" LargeCompation Queue:\n"); 160 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 161 Iterator<Runnable> it = lq.iterator(); 162 while (it.hasNext()) { 163 queueLists.append(" " + it.next().toString()); 164 queueLists.append("\n"); 165 } 166 167 if (shortCompactions != null) { 168 queueLists.append("\n"); 169 queueLists.append(" SmallCompation Queue:\n"); 170 lq = shortCompactions.getQueue(); 171 it = lq.iterator(); 172 while (it.hasNext()) { 173 queueLists.append(" " + it.next().toString()); 174 queueLists.append("\n"); 175 } 176 } 177 178 queueLists.append("\n"); 179 queueLists.append(" Split Queue:\n"); 180 lq = splits.getQueue(); 181 it = lq.iterator(); 182 while (it.hasNext()) { 183 queueLists.append(" " + it.next().toString()); 184 queueLists.append("\n"); 185 } 186 187 return queueLists.toString(); 188 } 189 190 public synchronized boolean requestSplit(final Region r) { 191 // don't split regions that are blocking 192 HRegion hr = (HRegion)r; 193 try { 194 if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) { 195 byte[] midKey = hr.checkSplit().orElse(null); 196 if (midKey != null) { 197 requestSplit(r, midKey); 198 return true; 199 } 200 } 201 } catch (IndexOutOfBoundsException e) { 202 // We get this sometimes. Not sure why. Catch and return false; no split request. 203 LOG.warn("Catching out-of-bounds; region={}, policy={}", hr == null? null: hr.getRegionInfo(), 204 hr == null? "null": hr.getCompactPriority(), e); 205 } 206 return false; 207 } 208 209 public synchronized void requestSplit(final Region r, byte[] midKey) { 210 requestSplit(r, midKey, null); 211 } 212 213 /* 214 * The User parameter allows the split thread to assume the correct user identity 215 */ 216 public synchronized void requestSplit(final Region r, byte[] midKey, User user) { 217 if (midKey == null) { 218 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + 219 " not splittable because midkey=null"); 220 return; 221 } 222 try { 223 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 224 if (LOG.isDebugEnabled()) { 225 LOG.debug("Splitting " + r + ", " + this); 226 } 227 } catch (RejectedExecutionException ree) { 228 LOG.info("Could not execute split for " + r, ree); 229 } 230 } 231 232 private void interrupt() { 233 longCompactions.shutdownNow(); 234 shortCompactions.shutdownNow(); 235 } 236 237 private void reInitializeCompactionsExecutors() { 238 createCompactionExecutors(); 239 } 240 241 private interface CompactionCompleteTracker { 242 243 default void completed(Store store) { 244 } 245 } 246 247 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 248 new CompactionCompleteTracker() {}; 249 250 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 251 252 private final CompactionLifeCycleTracker tracker; 253 254 private final AtomicInteger remaining; 255 256 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 257 this.tracker = tracker; 258 this.remaining = new AtomicInteger(numberOfStores); 259 } 260 261 @Override 262 public void completed(Store store) { 263 if (remaining.decrementAndGet() == 0) { 264 tracker.completed(); 265 } 266 } 267 } 268 269 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 270 IntSupplier numberOfStores) { 271 if (tracker == CompactionLifeCycleTracker.DUMMY) { 272 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 273 // the life cycle of a compaction. 274 return DUMMY_COMPLETE_TRACKER; 275 } else { 276 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 277 } 278 } 279 280 @Override 281 public synchronized void requestCompaction(HRegion region, String why, int priority, 282 CompactionLifeCycleTracker tracker, User user) throws IOException { 283 requestCompactionInternal(region, why, priority, true, tracker, 284 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 285 } 286 287 @Override 288 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 289 CompactionLifeCycleTracker tracker, User user) throws IOException { 290 requestCompactionInternal(region, store, why, priority, true, tracker, 291 getCompleteTracker(tracker, () -> 1), user); 292 } 293 294 @Override 295 public void switchCompaction(boolean onOrOff) { 296 if (onOrOff) { 297 // re-create executor pool if compactions are disabled. 298 if (!isCompactionsEnabled()) { 299 LOG.info("Re-Initializing compactions because user switched on compactions"); 300 reInitializeCompactionsExecutors(); 301 } 302 } else { 303 LOG.info("Interrupting running compactions because user switched off compactions"); 304 interrupt(); 305 } 306 setCompactionsEnabled(onOrOff); 307 } 308 309 private void requestCompactionInternal(HRegion region, String why, int priority, 310 boolean selectNow, CompactionLifeCycleTracker tracker, 311 CompactionCompleteTracker completeTracker, User user) throws IOException { 312 // request compaction on all stores 313 for (HStore store : region.stores.values()) { 314 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 315 user); 316 } 317 } 318 319 private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 320 boolean selectNow, CompactionLifeCycleTracker tracker, 321 CompactionCompleteTracker completeTracker, User user) throws IOException { 322 if (this.server.isStopped() || (region.getTableDescriptor() != null && 323 !region.getTableDescriptor().isCompactionEnabled())) { 324 return; 325 } 326 RegionServerSpaceQuotaManager spaceQuotaManager = 327 this.server.getRegionServerSpaceQuotaManager(); 328 329 if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 330 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 331 // Enter here only when: 332 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 333 String reason = "Ignoring compaction request for " + region + 334 " as an active space quota violation " + " policy disallows compactions."; 335 tracker.notExecuted(store, reason); 336 completeTracker.completed(store); 337 LOG.debug(reason); 338 return; 339 } 340 341 CompactionContext compaction; 342 if (selectNow) { 343 Optional<CompactionContext> c = 344 selectCompaction(region, store, priority, tracker, completeTracker, user); 345 if (!c.isPresent()) { 346 // message logged inside 347 return; 348 } 349 compaction = c.get(); 350 } else { 351 compaction = null; 352 } 353 354 ThreadPoolExecutor pool; 355 if (selectNow) { 356 // compaction.get is safe as we will just return if selectNow is true but no compaction is 357 // selected 358 pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions 359 : shortCompactions; 360 } else { 361 // We assume that most compactions are small. So, put system compactions into small 362 // pool; we will do selection there, and move to large pool if necessary. 363 pool = shortCompactions; 364 } 365 pool.execute( 366 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 367 region.incrementCompactionsQueuedCount(); 368 if (LOG.isDebugEnabled()) { 369 String type = (pool == shortCompactions) ? "Small " : "Large "; 370 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 371 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 372 } 373 } 374 375 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 376 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 377 DUMMY_COMPLETE_TRACKER, null); 378 } 379 380 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) 381 throws IOException { 382 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 383 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 384 } 385 386 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 387 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 388 throws IOException { 389 // don't even select for compaction if disableCompactions is set to true 390 if (!isCompactionsEnabled()) { 391 LOG.info(String.format("User has disabled compactions")); 392 return Optional.empty(); 393 } 394 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 395 if (!compaction.isPresent() && region.getRegionInfo() != null) { 396 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + 397 " because compaction request was cancelled"; 398 tracker.notExecuted(store, reason); 399 completeTracker.completed(store); 400 LOG.debug(reason); 401 } 402 return compaction; 403 } 404 405 /** 406 * Only interrupt once it's done with a run through the work loop. 407 */ 408 void interruptIfNecessary() { 409 splits.shutdown(); 410 longCompactions.shutdown(); 411 shortCompactions.shutdown(); 412 } 413 414 private void waitFor(ThreadPoolExecutor t, String name) { 415 boolean done = false; 416 while (!done) { 417 try { 418 done = t.awaitTermination(60, TimeUnit.SECONDS); 419 LOG.info("Waiting for " + name + " to finish..."); 420 if (!done) { 421 t.shutdownNow(); 422 } 423 } catch (InterruptedException ie) { 424 LOG.warn("Interrupted waiting for " + name + " to finish..."); 425 t.shutdownNow(); 426 } 427 } 428 } 429 430 void join() { 431 waitFor(splits, "Split Thread"); 432 waitFor(longCompactions, "Large Compaction Thread"); 433 waitFor(shortCompactions, "Small Compaction Thread"); 434 } 435 436 /** 437 * Returns the current size of the queue containing regions that are 438 * processed. 439 * 440 * @return The current size of the regions queue. 441 */ 442 public int getCompactionQueueSize() { 443 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 444 } 445 446 public int getLargeCompactionQueueSize() { 447 return longCompactions.getQueue().size(); 448 } 449 450 451 public int getSmallCompactionQueueSize() { 452 return shortCompactions.getQueue().size(); 453 } 454 455 public int getSplitQueueSize() { 456 return splits.getQueue().size(); 457 } 458 459 private boolean shouldSplitRegion() { 460 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { 461 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 462 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); 463 } 464 return (regionSplitLimit > server.getNumberOfOnlineRegions()); 465 } 466 467 /** 468 * @return the regionSplitLimit 469 */ 470 public int getRegionSplitLimit() { 471 return this.regionSplitLimit; 472 } 473 474 private static final Comparator<Runnable> COMPARATOR = 475 new Comparator<Runnable>() { 476 477 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 478 if (r1 == r2) { 479 return 0; //they are the same request 480 } 481 // less first 482 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 483 if (cmp != 0) { 484 return cmp; 485 } 486 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 487 if (cmp != 0) { 488 return cmp; 489 } 490 491 // break the tie based on hash code 492 return System.identityHashCode(r1) - System.identityHashCode(r2); 493 } 494 495 @Override 496 public int compare(Runnable r1, Runnable r2) { 497 // CompactionRunner first 498 if (r1 instanceof CompactionRunner) { 499 if (!(r2 instanceof CompactionRunner)) { 500 return -1; 501 } 502 } else { 503 if (r2 instanceof CompactionRunner) { 504 return 1; 505 } else { 506 // break the tie based on hash code 507 return System.identityHashCode(r1) - System.identityHashCode(r2); 508 } 509 } 510 CompactionRunner o1 = (CompactionRunner) r1; 511 CompactionRunner o2 = (CompactionRunner) r2; 512 // less first 513 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 514 if (cmp != 0) { 515 return cmp; 516 } 517 CompactionContext c1 = o1.compaction; 518 CompactionContext c2 = o2.compaction; 519 if (c1 != null) { 520 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 521 } else { 522 return c2 != null ? 1 : 0; 523 } 524 } 525 }; 526 527 private final class CompactionRunner implements Runnable { 528 private final HStore store; 529 private final HRegion region; 530 private final CompactionContext compaction; 531 private final CompactionLifeCycleTracker tracker; 532 private final CompactionCompleteTracker completeTracker; 533 private int queuedPriority; 534 private ThreadPoolExecutor parent; 535 private User user; 536 private long time; 537 538 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 539 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 540 ThreadPoolExecutor parent, User user) { 541 this.store = store; 542 this.region = region; 543 this.compaction = compaction; 544 this.tracker = tracker; 545 this.completeTracker = completeTracker; 546 this.queuedPriority = 547 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 548 this.parent = parent; 549 this.user = user; 550 this.time = EnvironmentEdgeManager.currentTime(); 551 } 552 553 @Override 554 public String toString() { 555 if (compaction != null) { 556 return "Request=" + compaction.getRequest(); 557 } else { 558 return "region=" + region.toString() + ", storeName=" + store.toString() + 559 ", priority=" + queuedPriority + ", startTime=" + time; 560 } 561 } 562 563 private void doCompaction(User user) { 564 CompactionContext c; 565 // Common case - system compaction without a file selection. Select now. 566 if (compaction == null) { 567 int oldPriority = this.queuedPriority; 568 this.queuedPriority = this.store.getCompactPriority(); 569 if (this.queuedPriority > oldPriority) { 570 // Store priority decreased while we were in queue (due to some other compaction?), 571 // requeue with new priority to avoid blocking potential higher priorities. 572 this.parent.execute(this); 573 return; 574 } 575 Optional<CompactionContext> selected; 576 try { 577 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 578 completeTracker, user); 579 } catch (IOException ex) { 580 LOG.error("Compaction selection failed " + this, ex); 581 server.checkFileSystem(); 582 region.decrementCompactionsQueuedCount(); 583 return; 584 } 585 if (!selected.isPresent()) { 586 region.decrementCompactionsQueuedCount(); 587 return; // nothing to do 588 } 589 c = selected.get(); 590 assert c.hasSelection(); 591 // Now see if we are in correct pool for the size; if not, go to the correct one. 592 // We might end up waiting for a while, so cancel the selection. 593 594 ThreadPoolExecutor pool = 595 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 596 597 // Long compaction pool can process small job 598 // Short compaction pool should not process large job 599 if (this.parent == shortCompactions && pool == longCompactions) { 600 this.store.cancelRequestedCompaction(c); 601 this.parent = pool; 602 this.parent.execute(this); 603 return; 604 } 605 } else { 606 c = compaction; 607 } 608 // Finally we can compact something. 609 assert c != null; 610 611 tracker.beforeExecution(store); 612 try { 613 // Note: please don't put single-compaction logic here; 614 // put it into region/store/etc. This is CST logic. 615 long start = EnvironmentEdgeManager.currentTime(); 616 boolean completed = 617 region.compact(c, store, compactionThroughputController, user); 618 long now = EnvironmentEdgeManager.currentTime(); 619 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + 620 this + "; duration=" + StringUtils.formatTimeDiff(now, start)); 621 if (completed) { 622 // degenerate case: blocked regions require recursive enqueues 623 if (store.getCompactPriority() <= 0) { 624 requestSystemCompaction(region, store, "Recursive enqueue"); 625 } else { 626 // see if the compaction has caused us to exceed max region size 627 requestSplit(region); 628 } 629 } 630 } catch (IOException ex) { 631 IOException remoteEx = 632 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 633 LOG.error("Compaction failed " + this, remoteEx); 634 if (remoteEx != ex) { 635 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 636 } 637 region.reportCompactionRequestFailure(); 638 server.checkFileSystem(); 639 } catch (Exception ex) { 640 LOG.error("Compaction failed " + this, ex); 641 region.reportCompactionRequestFailure(); 642 server.checkFileSystem(); 643 } finally { 644 tracker.afterExecution(store); 645 completeTracker.completed(store); 646 region.decrementCompactionsQueuedCount(); 647 LOG.debug("Status {}", CompactSplit.this); 648 } 649 } 650 651 @Override 652 public void run() { 653 Preconditions.checkNotNull(server); 654 if (server.isStopped() || (region.getTableDescriptor() != null && 655 !region.getTableDescriptor().isCompactionEnabled())) { 656 region.decrementCompactionsQueuedCount(); 657 return; 658 } 659 doCompaction(user); 660 } 661 662 private String formatStackTrace(Exception ex) { 663 StringWriter sw = new StringWriter(); 664 PrintWriter pw = new PrintWriter(sw); 665 ex.printStackTrace(pw); 666 pw.flush(); 667 return sw.toString(); 668 } 669 } 670 671 /** 672 * Cleanup class to use when rejecting a compaction request from the queue. 673 */ 674 private static class Rejection implements RejectedExecutionHandler { 675 @Override 676 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { 677 if (runnable instanceof CompactionRunner) { 678 CompactionRunner runner = (CompactionRunner) runnable; 679 LOG.debug("Compaction Rejected: " + runner); 680 if (runner.compaction != null) { 681 runner.store.cancelRequestedCompaction(runner.compaction); 682 } 683 } 684 } 685 } 686 687 /** 688 * {@inheritDoc} 689 */ 690 @Override 691 public void onConfigurationChange(Configuration newConf) { 692 // Check if number of large / small compaction threads has changed, and then 693 // adjust the core pool size of the thread pools, by using the 694 // setCorePoolSize() method. According to the javadocs, it is safe to 695 // change the core pool size on-the-fly. We need to reset the maximum 696 // pool size, as well. 697 int largeThreads = Math.max(1, newConf.getInt( 698 LARGE_COMPACTION_THREADS, 699 LARGE_COMPACTION_THREADS_DEFAULT)); 700 if (this.longCompactions.getCorePoolSize() != largeThreads) { 701 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + 702 " from " + this.longCompactions.getCorePoolSize() + " to " + 703 largeThreads); 704 if(this.longCompactions.getCorePoolSize() < largeThreads) { 705 this.longCompactions.setMaximumPoolSize(largeThreads); 706 this.longCompactions.setCorePoolSize(largeThreads); 707 } else { 708 this.longCompactions.setCorePoolSize(largeThreads); 709 this.longCompactions.setMaximumPoolSize(largeThreads); 710 } 711 } 712 713 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, 714 SMALL_COMPACTION_THREADS_DEFAULT); 715 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 716 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + 717 " from " + this.shortCompactions.getCorePoolSize() + " to " + 718 smallThreads); 719 if(this.shortCompactions.getCorePoolSize() < smallThreads) { 720 this.shortCompactions.setMaximumPoolSize(smallThreads); 721 this.shortCompactions.setCorePoolSize(smallThreads); 722 } else { 723 this.shortCompactions.setCorePoolSize(smallThreads); 724 this.shortCompactions.setMaximumPoolSize(smallThreads); 725 } 726 } 727 728 int splitThreads = newConf.getInt(SPLIT_THREADS, 729 SPLIT_THREADS_DEFAULT); 730 if (this.splits.getCorePoolSize() != splitThreads) { 731 LOG.info("Changing the value of " + SPLIT_THREADS + 732 " from " + this.splits.getCorePoolSize() + " to " + 733 splitThreads); 734 if(this.splits.getCorePoolSize() < splitThreads) { 735 this.splits.setMaximumPoolSize(splitThreads); 736 this.splits.setCorePoolSize(splitThreads); 737 } else { 738 this.splits.setCorePoolSize(splitThreads); 739 this.splits.setMaximumPoolSize(splitThreads); 740 } 741 } 742 743 ThroughputController old = this.compactionThroughputController; 744 if (old != null) { 745 old.stop("configuration change"); 746 } 747 this.compactionThroughputController = 748 CompactionThroughputControllerFactory.create(server, newConf); 749 750 // We change this atomically here instead of reloading the config in order that upstream 751 // would be the only one with the flexibility to reload the config. 752 this.conf.reloadConfiguration(); 753 } 754 755 protected int getSmallCompactionThreadNum() { 756 return this.shortCompactions.getCorePoolSize(); 757 } 758 759 protected int getLargeCompactionThreadNum() { 760 return this.longCompactions.getCorePoolSize(); 761 } 762 763 protected int getSplitThreadNum() { 764 return this.splits.getCorePoolSize(); 765 } 766 767 /** 768 * {@inheritDoc} 769 */ 770 @Override 771 public void registerChildren(ConfigurationManager manager) { 772 // No children to register. 773 } 774 775 /** 776 * {@inheritDoc} 777 */ 778 @Override 779 public void deregisterChildren(ConfigurationManager manager) { 780 // No children to register 781 } 782 783 public ThroughputController getCompactionThroughputController() { 784 return compactionThroughputController; 785 } 786 787 /** 788 * Shutdown the long compaction thread pool. 789 * Should only be used in unit test to prevent long compaction thread pool from stealing job 790 * from short compaction queue 791 */ 792 void shutdownLongCompactions(){ 793 this.longCompactions.shutdown(); 794 } 795 796 public void clearLongCompactionsQueue() { 797 longCompactions.getQueue().clear(); 798 } 799 800 public void clearShortCompactionsQueue() { 801 shortCompactions.getQueue().clear(); 802 } 803 804 public boolean isCompactionsEnabled() { 805 return compactionsEnabled; 806 } 807 808 public void setCompactionsEnabled(boolean compactionsEnabled) { 809 this.compactionsEnabled = compactionsEnabled; 810 this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled)); 811 } 812 813 /** 814 * @return the longCompactions thread pool executor 815 */ 816 ThreadPoolExecutor getLongCompactions() { 817 return longCompactions; 818 } 819 820 /** 821 * @return the shortCompactions thread pool executor 822 */ 823 ThreadPoolExecutor getShortCompactions() { 824 return shortCompactions; 825 } 826 827}