001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Comparator; 027import java.util.Iterator; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.Executors; 033import java.util.concurrent.RejectedExecutionException; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.function.IntSupplier; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.conf.ConfigurationManager; 041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 049import org.apache.hadoop.hbase.security.Superusers; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.StealJobQueue; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.util.StringUtils; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062/** 063 * Compact region on request and then run split if appropriate 064 */ 065@InterfaceAudience.Private 066public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 067 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 068 069 // Configuration key for the large compaction threads. 070 public final static String LARGE_COMPACTION_THREADS = 071 "hbase.regionserver.thread.compaction.large"; 072 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 073 074 // Configuration key for the small compaction threads. 075 public final static String SMALL_COMPACTION_THREADS = 076 "hbase.regionserver.thread.compaction.small"; 077 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 078 079 // Configuration key for split threads 080 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 081 public final static int SPLIT_THREADS_DEFAULT = 1; 082 083 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 084 "hbase.regionserver.regionSplitLimit"; 085 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000; 086 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = 087 "hbase.regionserver.compaction.enabled"; 088 089 private final HRegionServer server; 090 private final Configuration conf; 091 private volatile ThreadPoolExecutor longCompactions; 092 private volatile ThreadPoolExecutor shortCompactions; 093 private volatile ThreadPoolExecutor splits; 094 095 private volatile ThroughputController compactionThroughputController; 096 private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet(); 097 098 private volatile boolean compactionsEnabled; 099 /** 100 * Splitting should not take place if the total number of regions exceed this. This is not a hard 101 * limit to the number of regions but it is a guideline to stop splitting after number of online 102 * regions is greater than this. 103 */ 104 private int regionSplitLimit; 105 106 CompactSplit(HRegionServer server) { 107 this.server = server; 108 this.conf = server.getConfiguration(); 109 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 110 createCompactionExecutors(); 111 createSplitExcecutors(); 112 113 // compaction throughput controller 114 this.compactionThroughputController = 115 CompactionThroughputControllerFactory.create(server, conf); 116 } 117 118 // only for test 119 public CompactSplit(Configuration conf) { 120 this.server = null; 121 this.conf = conf; 122 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 123 createCompactionExecutors(); 124 createSplitExcecutors(); 125 } 126 127 private void createSplitExcecutors() { 128 final String n = Thread.currentThread().getName(); 129 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 130 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 131 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 132 } 133 134 private void createCompactionExecutors() { 135 this.regionSplitLimit = 136 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 137 138 int largeThreads = 139 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 140 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 141 142 // if we have throttle threads, make sure the user also specified size 143 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 144 145 final String n = Thread.currentThread().getName(); 146 147 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 148 // Since the StealJobQueue inner uses the PriorityBlockingQueue, 149 // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for 150 // the long and short compaction thread pool executors since HBASE-27332. 151 // If anyone who what to change the StealJobQueue to a bounded queue, 152 // please add the rejection handler back. 153 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 154 stealJobQueue, 155 new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); 156 this.longCompactions.prestartAllCoreThreads(); 157 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 158 stealJobQueue.getStealFromQueue(), 159 new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 160 } 161 162 @Override 163 public String toString() { 164 return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size() 165 + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue=" 166 + splits.getQueue().size(); 167 } 168 169 public String dumpQueue() { 170 StringBuilder queueLists = new StringBuilder(); 171 queueLists.append("Compaction/Split Queue dump:\n"); 172 queueLists.append(" LargeCompation Queue:\n"); 173 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 174 Iterator<Runnable> it = lq.iterator(); 175 while (it.hasNext()) { 176 queueLists.append(" " + it.next().toString()); 177 queueLists.append("\n"); 178 } 179 180 if (shortCompactions != null) { 181 queueLists.append("\n"); 182 queueLists.append(" SmallCompation Queue:\n"); 183 lq = shortCompactions.getQueue(); 184 it = lq.iterator(); 185 while (it.hasNext()) { 186 queueLists.append(" " + it.next().toString()); 187 queueLists.append("\n"); 188 } 189 } 190 191 queueLists.append("\n"); 192 queueLists.append(" Split Queue:\n"); 193 lq = splits.getQueue(); 194 it = lq.iterator(); 195 while (it.hasNext()) { 196 queueLists.append(" " + it.next().toString()); 197 queueLists.append("\n"); 198 } 199 200 return queueLists.toString(); 201 } 202 203 public synchronized boolean requestSplit(final Region r) { 204 // Don't split regions that are blocking is the default behavior. 205 // But in some circumstances, split here is needed to prevent the region size from 206 // continuously growing, as well as the number of store files, see HBASE-26242. 207 HRegion hr = (HRegion) r; 208 try { 209 if (shouldSplitRegion(r.getRegionInfo()) && hr.getCompactPriority() >= PRIORITY_USER) { 210 byte[] midKey = hr.checkSplit().orElse(null); 211 if (midKey != null) { 212 requestSplit(r, midKey); 213 return true; 214 } 215 } 216 } catch (IndexOutOfBoundsException e) { 217 // We get this sometimes. Not sure why. Catch and return false; no split request. 218 LOG.warn("Catching out-of-bounds; region={}, policy={}", 219 hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e); 220 } 221 return false; 222 } 223 224 private synchronized void requestSplit(final Region r, byte[] midKey) { 225 requestSplit(r, midKey, null); 226 } 227 228 /* 229 * The User parameter allows the split thread to assume the correct user identity 230 */ 231 private synchronized void requestSplit(final Region r, byte[] midKey, User user) { 232 if (midKey == null) { 233 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() 234 + " not splittable because midkey=null"); 235 return; 236 } 237 try { 238 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 239 if (LOG.isDebugEnabled()) { 240 LOG.debug("Splitting " + r + ", " + this); 241 } 242 } catch (RejectedExecutionException ree) { 243 LOG.info("Could not execute split for " + r, ree); 244 } 245 } 246 247 private void interrupt() { 248 longCompactions.shutdownNow(); 249 shortCompactions.shutdownNow(); 250 } 251 252 private void reInitializeCompactionsExecutors() { 253 createCompactionExecutors(); 254 } 255 256 // set protected for test 257 protected interface CompactionCompleteTracker { 258 259 default void completed(Store store) { 260 } 261 } 262 263 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 264 new CompactionCompleteTracker() { 265 }; 266 267 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 268 269 private final CompactionLifeCycleTracker tracker; 270 271 private final AtomicInteger remaining; 272 273 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 274 this.tracker = tracker; 275 this.remaining = new AtomicInteger(numberOfStores); 276 } 277 278 @Override 279 public void completed(Store store) { 280 if (remaining.decrementAndGet() == 0) { 281 tracker.completed(); 282 } 283 } 284 } 285 286 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 287 IntSupplier numberOfStores) { 288 if (tracker == CompactionLifeCycleTracker.DUMMY) { 289 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 290 // the life cycle of a compaction. 291 return DUMMY_COMPLETE_TRACKER; 292 } else { 293 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 294 } 295 } 296 297 @Override 298 public synchronized void requestCompaction(HRegion region, String why, int priority, 299 CompactionLifeCycleTracker tracker, User user) throws IOException { 300 requestCompactionInternal(region, why, priority, true, tracker, 301 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 302 } 303 304 @Override 305 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 306 CompactionLifeCycleTracker tracker, User user) throws IOException { 307 requestCompactionInternal(region, store, why, priority, true, tracker, 308 getCompleteTracker(tracker, () -> 1), user); 309 } 310 311 @Override 312 public void switchCompaction(boolean onOrOff) { 313 if (onOrOff) { 314 // re-create executor pool if compactions are disabled. 315 if (!isCompactionsEnabled()) { 316 LOG.info("Re-Initializing compactions because user switched on compactions"); 317 reInitializeCompactionsExecutors(); 318 } 319 setCompactionsEnabled(onOrOff); 320 return; 321 } 322 323 setCompactionsEnabled(onOrOff); 324 LOG.info("Interrupting running compactions because user switched off compactions"); 325 interrupt(); 326 } 327 328 private void requestCompactionInternal(HRegion region, String why, int priority, 329 boolean selectNow, CompactionLifeCycleTracker tracker, 330 CompactionCompleteTracker completeTracker, User user) throws IOException { 331 // request compaction on all stores 332 for (HStore store : region.stores.values()) { 333 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 334 user); 335 } 336 } 337 338 // set protected for test 339 protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 340 boolean selectNow, CompactionLifeCycleTracker tracker, 341 CompactionCompleteTracker completeTracker, User user) throws IOException { 342 if (!this.isCompactionsEnabled()) { 343 LOG.info("Ignoring compaction request for " + region + ",because compaction is disabled."); 344 return; 345 } 346 347 if ( 348 this.server.isStopped() || (region.getTableDescriptor() != null 349 && !region.getTableDescriptor().isCompactionEnabled()) 350 ) { 351 return; 352 } 353 RegionServerSpaceQuotaManager spaceQuotaManager = 354 this.server.getRegionServerSpaceQuotaManager(); 355 356 if ( 357 user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 358 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName()) 359 ) { 360 // Enter here only when: 361 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 362 String reason = "Ignoring compaction request for " + region 363 + " as an active space quota violation " + " policy disallows compactions."; 364 tracker.notExecuted(store, reason); 365 completeTracker.completed(store); 366 LOG.debug(reason); 367 return; 368 } 369 370 CompactionContext compaction; 371 if (selectNow) { 372 Optional<CompactionContext> c = 373 selectCompaction(region, store, priority, tracker, completeTracker, user); 374 if (!c.isPresent()) { 375 // message logged inside 376 return; 377 } 378 compaction = c.get(); 379 } else { 380 compaction = null; 381 } 382 383 ThreadPoolExecutor pool; 384 if (selectNow) { 385 // compaction.get is safe as we will just return if selectNow is true but no compaction is 386 // selected 387 pool = store.throttleCompaction(compaction.getRequest().getSize()) 388 ? longCompactions 389 : shortCompactions; 390 } else { 391 // We assume that most compactions are small. So, put system compactions into small 392 // pool; we will do selection there, and move to large pool if necessary. 393 pool = shortCompactions; 394 } 395 396 // A simple implementation for under compaction marks. 397 // Since this method is always called in the synchronized methods, we do not need to use the 398 // boolean result to make sure that exactly the one that added here will be removed 399 // in the next steps. 400 underCompactionStores.add(getStoreNameForUnderCompaction(store)); 401 pool.execute( 402 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 403 if (LOG.isDebugEnabled()) { 404 LOG.debug( 405 "Add compact mark for store {}, priority={}, current under compaction " 406 + "store size is {}", 407 getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); 408 } 409 region.incrementCompactionsQueuedCount(); 410 if (LOG.isDebugEnabled()) { 411 String type = (pool == shortCompactions) ? "Small " : "Large "; 412 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 413 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 414 } 415 } 416 417 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 418 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 419 DUMMY_COMPLETE_TRACKER, null); 420 } 421 422 public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException { 423 requestSystemCompaction(region, store, why, false); 424 } 425 426 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why, 427 boolean giveUpIfRequestedOrCompacting) throws IOException { 428 if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) { 429 LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region, 430 store.getColumnFamilyName()); 431 return; 432 } 433 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 434 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 435 } 436 437 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 438 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 439 throws IOException { 440 // don't even select for compaction if disableCompactions is set to true 441 if (!isCompactionsEnabled()) { 442 LOG.info(String.format("User has disabled compactions")); 443 return Optional.empty(); 444 } 445 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 446 if (!compaction.isPresent() && region.getRegionInfo() != null) { 447 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() 448 + " because compaction request was cancelled"; 449 tracker.notExecuted(store, reason); 450 completeTracker.completed(store); 451 LOG.debug(reason); 452 } 453 return compaction; 454 } 455 456 /** 457 * Only interrupt once it's done with a run through the work loop. 458 */ 459 void interruptIfNecessary() { 460 splits.shutdown(); 461 longCompactions.shutdown(); 462 shortCompactions.shutdown(); 463 } 464 465 private void waitFor(ThreadPoolExecutor t, String name) { 466 boolean done = false; 467 while (!done) { 468 try { 469 done = t.awaitTermination(60, TimeUnit.SECONDS); 470 LOG.info("Waiting for " + name + " to finish..."); 471 if (!done) { 472 t.shutdownNow(); 473 } 474 } catch (InterruptedException ie) { 475 LOG.warn("Interrupted waiting for " + name + " to finish..."); 476 t.shutdownNow(); 477 } 478 } 479 } 480 481 void join() { 482 waitFor(splits, "Split Thread"); 483 waitFor(longCompactions, "Large Compaction Thread"); 484 waitFor(shortCompactions, "Small Compaction Thread"); 485 } 486 487 /** 488 * Returns the current size of the queue containing regions that are processed. 489 * @return The current size of the regions queue. 490 */ 491 public int getCompactionQueueSize() { 492 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 493 } 494 495 public int getLargeCompactionQueueSize() { 496 return longCompactions.getQueue().size(); 497 } 498 499 public int getSmallCompactionQueueSize() { 500 return shortCompactions.getQueue().size(); 501 } 502 503 public int getSplitQueueSize() { 504 return splits.getQueue().size(); 505 } 506 507 private boolean shouldSplitRegion(RegionInfo ri) { 508 if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) { 509 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 510 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); 511 } 512 return (regionSplitLimit > server.getNumberOfOnlineRegions() 513 // Do not attempt to split secondary region replicas, as this is not allowed and our request 514 // to do so will be rejected 515 && ri.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID); 516 } 517 518 /** Returns the regionSplitLimit */ 519 public int getRegionSplitLimit() { 520 return this.regionSplitLimit; 521 } 522 523 /** 524 * Check if this store is under compaction 525 */ 526 public boolean isUnderCompaction(final HStore s) { 527 return underCompactionStores.contains(getStoreNameForUnderCompaction(s)); 528 } 529 530 private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() { 531 532 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 533 if (r1 == r2) { 534 return 0; // they are the same request 535 } 536 // less first 537 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 538 if (cmp != 0) { 539 return cmp; 540 } 541 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 542 if (cmp != 0) { 543 return cmp; 544 } 545 546 // break the tie based on hash code 547 return System.identityHashCode(r1) - System.identityHashCode(r2); 548 } 549 550 @Override 551 public int compare(Runnable r1, Runnable r2) { 552 // CompactionRunner first 553 if (r1 instanceof CompactionRunner) { 554 if (!(r2 instanceof CompactionRunner)) { 555 return -1; 556 } 557 } else { 558 if (r2 instanceof CompactionRunner) { 559 return 1; 560 } else { 561 // break the tie based on hash code 562 return System.identityHashCode(r1) - System.identityHashCode(r2); 563 } 564 } 565 CompactionRunner o1 = (CompactionRunner) r1; 566 CompactionRunner o2 = (CompactionRunner) r2; 567 // less first 568 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 569 if (cmp != 0) { 570 return cmp; 571 } 572 CompactionContext c1 = o1.compaction; 573 CompactionContext c2 = o2.compaction; 574 if (c1 != null) { 575 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 576 } else { 577 return c2 != null ? 1 : 0; 578 } 579 } 580 }; 581 582 private final class CompactionRunner implements Runnable { 583 private final HStore store; 584 private final HRegion region; 585 private final CompactionContext compaction; 586 private final CompactionLifeCycleTracker tracker; 587 private final CompactionCompleteTracker completeTracker; 588 private int queuedPriority; 589 private ThreadPoolExecutor parent; 590 private User user; 591 private long time; 592 593 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 594 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 595 ThreadPoolExecutor parent, User user) { 596 this.store = store; 597 this.region = region; 598 this.compaction = compaction; 599 this.tracker = tracker; 600 this.completeTracker = completeTracker; 601 this.queuedPriority = 602 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 603 this.parent = parent; 604 this.user = user; 605 this.time = EnvironmentEdgeManager.currentTime(); 606 } 607 608 @Override 609 public String toString() { 610 if (compaction != null) { 611 return "Request=" + compaction.getRequest(); 612 } else { 613 return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority=" 614 + queuedPriority + ", startTime=" + time; 615 } 616 } 617 618 private void doCompaction(User user) { 619 CompactionContext c; 620 // Common case - system compaction without a file selection. Select now. 621 if (compaction == null) { 622 int oldPriority = this.queuedPriority; 623 this.queuedPriority = this.store.getCompactPriority(); 624 if (this.queuedPriority > oldPriority) { 625 // Store priority decreased while we were in queue (due to some other compaction?), 626 // requeue with new priority to avoid blocking potential higher priorities. 627 this.parent.execute(this); 628 return; 629 } 630 Optional<CompactionContext> selected; 631 try { 632 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 633 completeTracker, user); 634 } catch (IOException ex) { 635 LOG.error("Compaction selection failed " + this, ex); 636 server.checkFileSystem(); 637 region.decrementCompactionsQueuedCount(); 638 return; 639 } 640 if (!selected.isPresent()) { 641 region.decrementCompactionsQueuedCount(); 642 return; // nothing to do 643 } 644 c = selected.get(); 645 assert c.hasSelection(); 646 // Now see if we are in correct pool for the size; if not, go to the correct one. 647 // We might end up waiting for a while, so cancel the selection. 648 649 ThreadPoolExecutor pool = 650 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 651 652 // Long compaction pool can process small job 653 // Short compaction pool should not process large job 654 if (this.parent == shortCompactions && pool == longCompactions) { 655 this.store.cancelRequestedCompaction(c); 656 this.parent = pool; 657 this.parent.execute(this); 658 return; 659 } 660 } else { 661 c = compaction; 662 } 663 // Finally we can compact something. 664 assert c != null; 665 666 tracker.beforeExecution(store); 667 try { 668 // Note: please don't put single-compaction logic here; 669 // put it into region/store/etc. This is CST logic. 670 long start = EnvironmentEdgeManager.currentTime(); 671 boolean completed = region.compact(c, store, compactionThroughputController, user); 672 long now = EnvironmentEdgeManager.currentTime(); 673 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration=" 674 + StringUtils.formatTimeDiff(now, start)); 675 if (completed) { 676 // degenerate case: blocked regions require recursive enqueues 677 if ( 678 region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0 679 ) { 680 requestSystemCompaction(region, store, "Recursive enqueue"); 681 } else { 682 // see if the compaction has caused us to exceed max region size 683 if (!requestSplit(region) && store.getCompactPriority() <= 0) { 684 requestSystemCompaction(region, store, "Recursive enqueue"); 685 } 686 } 687 } 688 } catch (IOException ex) { 689 IOException remoteEx = 690 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 691 LOG.error("Compaction failed " + this, remoteEx); 692 if (remoteEx != ex) { 693 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 694 } 695 region.reportCompactionRequestFailure(); 696 server.checkFileSystem(); 697 } catch (Exception ex) { 698 LOG.error("Compaction failed " + this, ex); 699 region.reportCompactionRequestFailure(); 700 server.checkFileSystem(); 701 } finally { 702 tracker.afterExecution(store); 703 completeTracker.completed(store); 704 region.decrementCompactionsQueuedCount(); 705 LOG.debug("Status {}", CompactSplit.this); 706 } 707 } 708 709 @Override 710 public void run() { 711 try { 712 Preconditions.checkNotNull(server); 713 if ( 714 server.isStopped() || (region.getTableDescriptor() != null 715 && !region.getTableDescriptor().isCompactionEnabled()) 716 ) { 717 region.decrementCompactionsQueuedCount(); 718 return; 719 } 720 doCompaction(user); 721 } finally { 722 if (LOG.isDebugEnabled()) { 723 LOG.debug("Remove under compaction mark for store: {}", 724 store.getHRegion().getRegionInfo().getEncodedName() + ":" 725 + store.getColumnFamilyName()); 726 } 727 underCompactionStores.remove(getStoreNameForUnderCompaction(store)); 728 } 729 } 730 731 private String formatStackTrace(Exception ex) { 732 StringWriter sw = new StringWriter(); 733 PrintWriter pw = new PrintWriter(sw); 734 ex.printStackTrace(pw); 735 pw.flush(); 736 return sw.toString(); 737 } 738 } 739 740 /** 741 * {@inheritDoc} 742 */ 743 @Override 744 public void onConfigurationChange(Configuration newConf) { 745 // Check if number of large / small compaction threads has changed, and then 746 // adjust the core pool size of the thread pools, by using the 747 // setCorePoolSize() method. According to the javadocs, it is safe to 748 // change the core pool size on-the-fly. We need to reset the maximum 749 // pool size, as well. 750 int largeThreads = 751 Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 752 if (this.longCompactions.getCorePoolSize() != largeThreads) { 753 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from " 754 + this.longCompactions.getCorePoolSize() + " to " + largeThreads); 755 if (this.longCompactions.getCorePoolSize() < largeThreads) { 756 this.longCompactions.setMaximumPoolSize(largeThreads); 757 this.longCompactions.setCorePoolSize(largeThreads); 758 } else { 759 this.longCompactions.setCorePoolSize(largeThreads); 760 this.longCompactions.setMaximumPoolSize(largeThreads); 761 } 762 } 763 764 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 765 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 766 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from " 767 + this.shortCompactions.getCorePoolSize() + " to " + smallThreads); 768 if (this.shortCompactions.getCorePoolSize() < smallThreads) { 769 this.shortCompactions.setMaximumPoolSize(smallThreads); 770 this.shortCompactions.setCorePoolSize(smallThreads); 771 } else { 772 this.shortCompactions.setCorePoolSize(smallThreads); 773 this.shortCompactions.setMaximumPoolSize(smallThreads); 774 } 775 } 776 777 int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 778 if (this.splits.getCorePoolSize() != splitThreads) { 779 LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize() 780 + " to " + splitThreads); 781 if (this.splits.getCorePoolSize() < splitThreads) { 782 this.splits.setMaximumPoolSize(splitThreads); 783 this.splits.setCorePoolSize(splitThreads); 784 } else { 785 this.splits.setCorePoolSize(splitThreads); 786 this.splits.setMaximumPoolSize(splitThreads); 787 } 788 } 789 790 ThroughputController old = this.compactionThroughputController; 791 if (old != null) { 792 old.stop("configuration change"); 793 } 794 this.compactionThroughputController = 795 CompactionThroughputControllerFactory.create(server, newConf); 796 797 // We change this atomically here instead of reloading the config in order that upstream 798 // would be the only one with the flexibility to reload the config. 799 this.conf.reloadConfiguration(); 800 } 801 802 protected int getSmallCompactionThreadNum() { 803 return this.shortCompactions.getCorePoolSize(); 804 } 805 806 protected int getLargeCompactionThreadNum() { 807 return this.longCompactions.getCorePoolSize(); 808 } 809 810 protected int getSplitThreadNum() { 811 return this.splits.getCorePoolSize(); 812 } 813 814 /** Exposed for unit testing */ 815 long getSubmittedSplitsCount() { 816 return this.splits.getTaskCount(); 817 } 818 819 /** 820 * {@inheritDoc} 821 */ 822 @Override 823 public void registerChildren(ConfigurationManager manager) { 824 // No children to register. 825 } 826 827 /** 828 * {@inheritDoc} 829 */ 830 @Override 831 public void deregisterChildren(ConfigurationManager manager) { 832 // No children to register 833 } 834 835 public ThroughputController getCompactionThroughputController() { 836 return compactionThroughputController; 837 } 838 839 /** 840 * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long 841 * compaction thread pool from stealing job from short compaction queue 842 */ 843 void shutdownLongCompactions() { 844 this.longCompactions.shutdown(); 845 } 846 847 public void clearLongCompactionsQueue() { 848 longCompactions.getQueue().clear(); 849 } 850 851 public void clearShortCompactionsQueue() { 852 shortCompactions.getQueue().clear(); 853 } 854 855 public boolean isCompactionsEnabled() { 856 return compactionsEnabled; 857 } 858 859 public void setCompactionsEnabled(boolean compactionsEnabled) { 860 this.compactionsEnabled = compactionsEnabled; 861 this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled); 862 } 863 864 /** Returns the longCompactions thread pool executor */ 865 ThreadPoolExecutor getLongCompactions() { 866 return longCompactions; 867 } 868 869 /** Returns the shortCompactions thread pool executor */ 870 ThreadPoolExecutor getShortCompactions() { 871 return shortCompactions; 872 } 873 874 private String getStoreNameForUnderCompaction(HStore store) { 875 return String.format("%s:%s", 876 store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "", 877 store.getColumnFamilyName()); 878 } 879 880}