001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Comparator; 027import java.util.Iterator; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.Executors; 033import java.util.concurrent.RejectedExecutionException; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.function.IntSupplier; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.conf.ConfigurationManager; 041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 049import org.apache.hadoop.hbase.security.Superusers; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.StealJobQueue; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.util.StringUtils; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062/** 063 * Compact region on request and then run split if appropriate 064 */ 065@InterfaceAudience.Private 066public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 067 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 068 069 // Configuration key for the large compaction threads. 070 public final static String LARGE_COMPACTION_THREADS = 071 "hbase.regionserver.thread.compaction.large"; 072 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 073 074 // Configuration key for the small compaction threads. 075 public final static String SMALL_COMPACTION_THREADS = 076 "hbase.regionserver.thread.compaction.small"; 077 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 078 079 // Configuration key for split threads 080 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 081 public final static int SPLIT_THREADS_DEFAULT = 1; 082 083 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 084 "hbase.regionserver.regionSplitLimit"; 085 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000; 086 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = 087 "hbase.regionserver.compaction.enabled"; 088 089 private final HRegionServer server; 090 private final Configuration conf; 091 private volatile ThreadPoolExecutor longCompactions; 092 private volatile ThreadPoolExecutor shortCompactions; 093 private volatile ThreadPoolExecutor splits; 094 095 private volatile ThroughputController compactionThroughputController; 096 private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet(); 097 098 private volatile boolean compactionsEnabled; 099 /** 100 * Splitting should not take place if the total number of regions exceed this. This is not a hard 101 * limit to the number of regions but it is a guideline to stop splitting after number of online 102 * regions is greater than this. 103 */ 104 private int regionSplitLimit; 105 106 CompactSplit(HRegionServer server) { 107 this.server = server; 108 this.conf = server.getConfiguration(); 109 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 110 createCompactionExecutors(); 111 createSplitExcecutors(); 112 113 // compaction throughput controller 114 this.compactionThroughputController = 115 CompactionThroughputControllerFactory.create(server, conf); 116 } 117 118 // only for test 119 public CompactSplit(Configuration conf) { 120 this.server = null; 121 this.conf = conf; 122 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 123 createCompactionExecutors(); 124 createSplitExcecutors(); 125 } 126 127 private void createSplitExcecutors() { 128 final String n = Thread.currentThread().getName(); 129 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 130 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 131 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 132 } 133 134 private void createCompactionExecutors() { 135 this.regionSplitLimit = 136 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 137 138 int largeThreads = 139 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 140 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 141 142 // if we have throttle threads, make sure the user also specified size 143 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 144 145 final String n = Thread.currentThread().getName(); 146 147 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 148 // Since the StealJobQueue inner uses the PriorityBlockingQueue, 149 // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for 150 // the long and short compaction thread pool executors since HBASE-27332. 151 // If anyone who what to change the StealJobQueue to a bounded queue, 152 // please add the rejection handler back. 153 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 154 stealJobQueue, 155 new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); 156 this.longCompactions.prestartAllCoreThreads(); 157 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 158 stealJobQueue.getStealFromQueue(), 159 new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 160 } 161 162 @Override 163 public String toString() { 164 return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size() 165 + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue=" 166 + splits.getQueue().size(); 167 } 168 169 public String dumpQueue() { 170 StringBuilder queueLists = new StringBuilder(); 171 queueLists.append("Compaction/Split Queue dump:\n"); 172 queueLists.append(" LargeCompation Queue:\n"); 173 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 174 Iterator<Runnable> it = lq.iterator(); 175 while (it.hasNext()) { 176 queueLists.append(" " + it.next().toString()); 177 queueLists.append("\n"); 178 } 179 180 if (shortCompactions != null) { 181 queueLists.append("\n"); 182 queueLists.append(" SmallCompation Queue:\n"); 183 lq = shortCompactions.getQueue(); 184 it = lq.iterator(); 185 while (it.hasNext()) { 186 queueLists.append(" " + it.next().toString()); 187 queueLists.append("\n"); 188 } 189 } 190 191 queueLists.append("\n"); 192 queueLists.append(" Split Queue:\n"); 193 lq = splits.getQueue(); 194 it = lq.iterator(); 195 while (it.hasNext()) { 196 queueLists.append(" " + it.next().toString()); 197 queueLists.append("\n"); 198 } 199 200 return queueLists.toString(); 201 } 202 203 public synchronized boolean requestSplit(final Region r) { 204 // Don't split regions that are blocking is the default behavior. 205 // But in some circumstances, split here is needed to prevent the region size from 206 // continuously growing, as well as the number of store files, see HBASE-26242. 207 HRegion hr = (HRegion) r; 208 try { 209 if (shouldSplitRegion(r.getRegionInfo()) && hr.getCompactPriority() >= PRIORITY_USER) { 210 byte[] midKey = hr.checkSplit().orElse(null); 211 if (midKey != null) { 212 requestSplit(r, midKey); 213 return true; 214 } 215 } 216 } catch (IndexOutOfBoundsException e) { 217 // We get this sometimes. Not sure why. Catch and return false; no split request. 218 LOG.warn("Catching out-of-bounds; region={}, policy={}", 219 hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e); 220 } 221 return false; 222 } 223 224 private synchronized void requestSplit(final Region r, byte[] midKey) { 225 requestSplit(r, midKey, null); 226 } 227 228 /* 229 * The User parameter allows the split thread to assume the correct user identity 230 */ 231 private synchronized void requestSplit(final Region r, byte[] midKey, User user) { 232 if (midKey == null) { 233 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() 234 + " not splittable because midkey=null"); 235 return; 236 } 237 try { 238 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 239 if (LOG.isDebugEnabled()) { 240 LOG.debug("Splitting " + r + ", " + this); 241 } 242 } catch (RejectedExecutionException ree) { 243 LOG.info("Could not execute split for " + r, ree); 244 } 245 } 246 247 private void interrupt() { 248 longCompactions.shutdownNow(); 249 shortCompactions.shutdownNow(); 250 } 251 252 private void reInitializeCompactionsExecutors() { 253 createCompactionExecutors(); 254 } 255 256 // set protected for test 257 protected interface CompactionCompleteTracker { 258 259 default void completed(Store store) { 260 } 261 } 262 263 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 264 new CompactionCompleteTracker() { 265 }; 266 267 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 268 269 private final CompactionLifeCycleTracker tracker; 270 271 private final AtomicInteger remaining; 272 273 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 274 this.tracker = tracker; 275 this.remaining = new AtomicInteger(numberOfStores); 276 } 277 278 @Override 279 public void completed(Store store) { 280 if (remaining.decrementAndGet() == 0) { 281 tracker.completed(); 282 } 283 } 284 } 285 286 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 287 IntSupplier numberOfStores) { 288 if (tracker == CompactionLifeCycleTracker.DUMMY) { 289 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 290 // the life cycle of a compaction. 291 return DUMMY_COMPLETE_TRACKER; 292 } else { 293 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 294 } 295 } 296 297 @Override 298 public synchronized void requestCompaction(HRegion region, String why, int priority, 299 CompactionLifeCycleTracker tracker, User user) throws IOException { 300 requestCompactionInternal(region, why, priority, true, tracker, 301 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 302 } 303 304 @Override 305 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 306 CompactionLifeCycleTracker tracker, User user) throws IOException { 307 requestCompactionInternal(region, store, why, priority, true, tracker, 308 getCompleteTracker(tracker, () -> 1), user); 309 } 310 311 @Override 312 public void switchCompaction(boolean onOrOff) { 313 if (onOrOff) { 314 // re-create executor pool if compactions are disabled. 315 if (!isCompactionsEnabled()) { 316 LOG.info("Re-Initializing compactions because user switched on compactions"); 317 reInitializeCompactionsExecutors(); 318 } 319 setCompactionsEnabled(onOrOff); 320 return; 321 } 322 323 setCompactionsEnabled(onOrOff); 324 LOG.info("Interrupting running compactions because user switched off compactions"); 325 interrupt(); 326 } 327 328 private void requestCompactionInternal(HRegion region, String why, int priority, 329 boolean selectNow, CompactionLifeCycleTracker tracker, 330 CompactionCompleteTracker completeTracker, User user) throws IOException { 331 // request compaction on all stores 332 for (HStore store : region.stores.values()) { 333 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 334 user); 335 } 336 } 337 338 // set protected for test 339 protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 340 boolean selectNow, CompactionLifeCycleTracker tracker, 341 CompactionCompleteTracker completeTracker, User user) throws IOException { 342 if (!this.isCompactionsEnabled()) { 343 LOG.info("Ignoring compaction request for " + region + ",because compaction is disabled."); 344 return; 345 } 346 347 if ( 348 this.server.isStopped() || (region.getTableDescriptor() != null 349 && !region.getTableDescriptor().isCompactionEnabled()) 350 ) { 351 return; 352 } 353 RegionServerSpaceQuotaManager spaceQuotaManager = 354 this.server.getRegionServerSpaceQuotaManager(); 355 356 if ( 357 user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 358 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName()) 359 ) { 360 // Enter here only when: 361 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 362 String reason = "Ignoring compaction request for " + region 363 + " as an active space quota violation " + " policy disallows compactions."; 364 tracker.notExecuted(store, reason); 365 completeTracker.completed(store); 366 LOG.debug(reason); 367 return; 368 } 369 370 CompactionContext compaction; 371 if (selectNow) { 372 Optional<CompactionContext> c = 373 selectCompaction(region, store, priority, tracker, completeTracker, user); 374 if (!c.isPresent()) { 375 // message logged inside 376 return; 377 } 378 compaction = c.get(); 379 } else { 380 compaction = null; 381 } 382 383 ThreadPoolExecutor pool; 384 if (selectNow) { 385 // compaction.get is safe as we will just return if selectNow is true but no compaction is 386 // selected 387 pool = store.throttleCompaction(compaction.getRequest().getSize()) 388 ? longCompactions 389 : shortCompactions; 390 } else { 391 // We assume that most compactions are small. So, put system compactions into small 392 // pool; we will do selection there, and move to large pool if necessary. 393 pool = shortCompactions; 394 } 395 396 // A simple implementation for under compaction marks. 397 // Since this method is always called in the synchronized methods, we do not need to use the 398 // boolean result to make sure that exactly the one that added here will be removed 399 // in the next steps. 400 underCompactionStores.add(getStoreNameForUnderCompaction(store)); 401 pool.execute( 402 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 403 if (LOG.isDebugEnabled()) { 404 LOG.debug( 405 "Add compact mark for store {}, priority={}, current under compaction " 406 + "store size is {}", 407 getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); 408 } 409 region.incrementCompactionsQueuedCount(); 410 if (LOG.isDebugEnabled()) { 411 String type = (pool == shortCompactions) ? "Small " : "Large "; 412 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 413 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 414 } 415 } 416 417 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 418 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 419 DUMMY_COMPLETE_TRACKER, null); 420 } 421 422 public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException { 423 requestSystemCompaction(region, store, why, false); 424 } 425 426 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why, 427 boolean giveUpIfRequestedOrCompacting) throws IOException { 428 if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) { 429 LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region, 430 store.getColumnFamilyName()); 431 return; 432 } 433 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 434 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 435 } 436 437 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 438 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 439 throws IOException { 440 // don't even select for compaction if disableCompactions is set to true 441 if (!isCompactionsEnabled()) { 442 LOG.info(String.format("User has disabled compactions")); 443 return Optional.empty(); 444 } 445 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 446 if (!compaction.isPresent() && region.getRegionInfo() != null) { 447 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() 448 + " because compaction request was cancelled"; 449 tracker.notExecuted(store, reason); 450 completeTracker.completed(store); 451 LOG.debug(reason); 452 } 453 return compaction; 454 } 455 456 /** 457 * Only interrupt once it's done with a run through the work loop. 458 */ 459 void interruptIfNecessary() { 460 splits.shutdown(); 461 longCompactions.shutdown(); 462 shortCompactions.shutdown(); 463 } 464 465 private void waitFor(ThreadPoolExecutor t, String name) { 466 boolean done = false; 467 while (!done) { 468 try { 469 done = t.awaitTermination(60, TimeUnit.SECONDS); 470 LOG.info("Waiting for " + name + " to finish..."); 471 if (!done) { 472 t.shutdownNow(); 473 } 474 } catch (InterruptedException ie) { 475 LOG.warn("Interrupted waiting for " + name + " to finish..."); 476 t.shutdownNow(); 477 } 478 } 479 } 480 481 void join() { 482 waitFor(splits, "Split Thread"); 483 waitFor(longCompactions, "Large Compaction Thread"); 484 waitFor(shortCompactions, "Small Compaction Thread"); 485 } 486 487 /** 488 * Returns the current size of the queue containing regions that are processed. 489 * @return The current size of the regions queue. 490 */ 491 public int getCompactionQueueSize() { 492 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 493 } 494 495 public int getLargeCompactionQueueSize() { 496 return longCompactions.getQueue().size(); 497 } 498 499 public int getSmallCompactionQueueSize() { 500 return shortCompactions.getQueue().size(); 501 } 502 503 public int getSplitQueueSize() { 504 return splits.getQueue().size(); 505 } 506 507 private boolean shouldSplitRegion(RegionInfo ri) { 508 if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) { 509 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 510 + "Please consider taking a look at " 511 + "https://hbase.apache.org/docs/operational-management/region-and-capacity#region-management"); 512 } 513 return (regionSplitLimit > server.getNumberOfOnlineRegions() 514 // Do not attempt to split secondary region replicas, as this is not allowed and our request 515 // to do so will be rejected 516 && ri.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID); 517 } 518 519 /** Returns the regionSplitLimit */ 520 public int getRegionSplitLimit() { 521 return this.regionSplitLimit; 522 } 523 524 /** 525 * Check if this store is under compaction 526 */ 527 public boolean isUnderCompaction(final HStore s) { 528 return underCompactionStores.contains(getStoreNameForUnderCompaction(s)); 529 } 530 531 private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() { 532 533 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 534 if (r1 == r2) { 535 return 0; // they are the same request 536 } 537 // less first 538 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 539 if (cmp != 0) { 540 return cmp; 541 } 542 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 543 if (cmp != 0) { 544 return cmp; 545 } 546 547 // break the tie based on hash code 548 return System.identityHashCode(r1) - System.identityHashCode(r2); 549 } 550 551 @Override 552 public int compare(Runnable r1, Runnable r2) { 553 // CompactionRunner first 554 if (r1 instanceof CompactionRunner) { 555 if (!(r2 instanceof CompactionRunner)) { 556 return -1; 557 } 558 } else { 559 if (r2 instanceof CompactionRunner) { 560 return 1; 561 } else { 562 // break the tie based on hash code 563 return System.identityHashCode(r1) - System.identityHashCode(r2); 564 } 565 } 566 CompactionRunner o1 = (CompactionRunner) r1; 567 CompactionRunner o2 = (CompactionRunner) r2; 568 // less first 569 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 570 if (cmp != 0) { 571 return cmp; 572 } 573 CompactionContext c1 = o1.compaction; 574 CompactionContext c2 = o2.compaction; 575 if (c1 != null) { 576 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 577 } else { 578 return c2 != null ? 1 : 0; 579 } 580 } 581 }; 582 583 private final class CompactionRunner implements Runnable { 584 private final HStore store; 585 private final HRegion region; 586 private final CompactionContext compaction; 587 private final CompactionLifeCycleTracker tracker; 588 private final CompactionCompleteTracker completeTracker; 589 private int queuedPriority; 590 private ThreadPoolExecutor parent; 591 private User user; 592 private long time; 593 594 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 595 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 596 ThreadPoolExecutor parent, User user) { 597 this.store = store; 598 this.region = region; 599 this.compaction = compaction; 600 this.tracker = tracker; 601 this.completeTracker = completeTracker; 602 this.queuedPriority = 603 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 604 this.parent = parent; 605 this.user = user; 606 this.time = EnvironmentEdgeManager.currentTime(); 607 } 608 609 @Override 610 public String toString() { 611 if (compaction != null) { 612 return "Request=" + compaction.getRequest(); 613 } else { 614 return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority=" 615 + queuedPriority + ", startTime=" + time; 616 } 617 } 618 619 private void doCompaction(User user) { 620 CompactionContext c; 621 // Common case - system compaction without a file selection. Select now. 622 if (compaction == null) { 623 int oldPriority = this.queuedPriority; 624 this.queuedPriority = this.store.getCompactPriority(); 625 if (this.queuedPriority > oldPriority) { 626 // Store priority decreased while we were in queue (due to some other compaction?), 627 // requeue with new priority to avoid blocking potential higher priorities. 628 this.parent.execute(this); 629 return; 630 } 631 Optional<CompactionContext> selected; 632 try { 633 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 634 completeTracker, user); 635 } catch (IOException ex) { 636 LOG.error("Compaction selection failed " + this, ex); 637 server.checkFileSystem(); 638 region.decrementCompactionsQueuedCount(); 639 return; 640 } 641 if (!selected.isPresent()) { 642 region.decrementCompactionsQueuedCount(); 643 return; // nothing to do 644 } 645 c = selected.get(); 646 assert c.hasSelection(); 647 // Now see if we are in correct pool for the size; if not, go to the correct one. 648 // We might end up waiting for a while, so cancel the selection. 649 650 ThreadPoolExecutor pool = 651 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 652 653 // Long compaction pool can process small job 654 // Short compaction pool should not process large job 655 if (this.parent == shortCompactions && pool == longCompactions) { 656 this.store.cancelRequestedCompaction(c); 657 this.parent = pool; 658 this.parent.execute(this); 659 return; 660 } 661 } else { 662 c = compaction; 663 } 664 // Finally we can compact something. 665 assert c != null; 666 667 tracker.beforeExecution(store); 668 try { 669 // Note: please don't put single-compaction logic here; 670 // put it into region/store/etc. This is CST logic. 671 long start = EnvironmentEdgeManager.currentTime(); 672 boolean completed = region.compact(c, store, compactionThroughputController, user); 673 long now = EnvironmentEdgeManager.currentTime(); 674 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration=" 675 + StringUtils.formatTimeDiff(now, start)); 676 if (completed) { 677 // degenerate case: blocked regions require recursive enqueues 678 if ( 679 region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0 680 ) { 681 requestSystemCompaction(region, store, "Recursive enqueue"); 682 } else { 683 // see if the compaction has caused us to exceed max region size 684 if (!requestSplit(region) && store.getCompactPriority() <= 0) { 685 requestSystemCompaction(region, store, "Recursive enqueue"); 686 } 687 } 688 } 689 } catch (IOException ex) { 690 IOException remoteEx = 691 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 692 LOG.error("Compaction failed " + this, remoteEx); 693 if (remoteEx != ex) { 694 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 695 } 696 region.reportCompactionRequestFailure(); 697 server.checkFileSystem(); 698 } catch (Exception ex) { 699 LOG.error("Compaction failed " + this, ex); 700 region.reportCompactionRequestFailure(); 701 server.checkFileSystem(); 702 } finally { 703 tracker.afterExecution(store); 704 completeTracker.completed(store); 705 region.decrementCompactionsQueuedCount(); 706 LOG.debug("Status {}", CompactSplit.this); 707 } 708 } 709 710 @Override 711 public void run() { 712 try { 713 Preconditions.checkNotNull(server); 714 if ( 715 server.isStopped() || (region.getTableDescriptor() != null 716 && !region.getTableDescriptor().isCompactionEnabled()) 717 ) { 718 region.decrementCompactionsQueuedCount(); 719 return; 720 } 721 doCompaction(user); 722 } finally { 723 if (LOG.isDebugEnabled()) { 724 LOG.debug("Remove under compaction mark for store: {}", 725 store.getHRegion().getRegionInfo().getEncodedName() + ":" 726 + store.getColumnFamilyName()); 727 } 728 underCompactionStores.remove(getStoreNameForUnderCompaction(store)); 729 } 730 } 731 732 private String formatStackTrace(Exception ex) { 733 StringWriter sw = new StringWriter(); 734 PrintWriter pw = new PrintWriter(sw); 735 ex.printStackTrace(pw); 736 pw.flush(); 737 return sw.toString(); 738 } 739 } 740 741 /** 742 * {@inheritDoc} 743 */ 744 @Override 745 public void onConfigurationChange(Configuration newConf) { 746 // Check if number of large / small compaction threads has changed, and then 747 // adjust the core pool size of the thread pools, by using the 748 // setCorePoolSize() method. According to the javadocs, it is safe to 749 // change the core pool size on-the-fly. We need to reset the maximum 750 // pool size, as well. 751 int largeThreads = 752 Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 753 if (this.longCompactions.getCorePoolSize() != largeThreads) { 754 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from " 755 + this.longCompactions.getCorePoolSize() + " to " + largeThreads); 756 if (this.longCompactions.getCorePoolSize() < largeThreads) { 757 this.longCompactions.setMaximumPoolSize(largeThreads); 758 this.longCompactions.setCorePoolSize(largeThreads); 759 } else { 760 this.longCompactions.setCorePoolSize(largeThreads); 761 this.longCompactions.setMaximumPoolSize(largeThreads); 762 } 763 } 764 765 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 766 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 767 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from " 768 + this.shortCompactions.getCorePoolSize() + " to " + smallThreads); 769 if (this.shortCompactions.getCorePoolSize() < smallThreads) { 770 this.shortCompactions.setMaximumPoolSize(smallThreads); 771 this.shortCompactions.setCorePoolSize(smallThreads); 772 } else { 773 this.shortCompactions.setCorePoolSize(smallThreads); 774 this.shortCompactions.setMaximumPoolSize(smallThreads); 775 } 776 } 777 778 int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 779 if (this.splits.getCorePoolSize() != splitThreads) { 780 LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize() 781 + " to " + splitThreads); 782 if (this.splits.getCorePoolSize() < splitThreads) { 783 this.splits.setMaximumPoolSize(splitThreads); 784 this.splits.setCorePoolSize(splitThreads); 785 } else { 786 this.splits.setCorePoolSize(splitThreads); 787 this.splits.setMaximumPoolSize(splitThreads); 788 } 789 } 790 791 ThroughputController old = this.compactionThroughputController; 792 if (old != null) { 793 old.stop("configuration change"); 794 } 795 this.compactionThroughputController = 796 CompactionThroughputControllerFactory.create(server, newConf); 797 798 // We change this atomically here instead of reloading the config in order that upstream 799 // would be the only one with the flexibility to reload the config. 800 this.conf.reloadConfiguration(); 801 } 802 803 protected int getSmallCompactionThreadNum() { 804 return this.shortCompactions.getCorePoolSize(); 805 } 806 807 protected int getLargeCompactionThreadNum() { 808 return this.longCompactions.getCorePoolSize(); 809 } 810 811 protected int getSplitThreadNum() { 812 return this.splits.getCorePoolSize(); 813 } 814 815 /** Exposed for unit testing */ 816 long getSubmittedSplitsCount() { 817 return this.splits.getTaskCount(); 818 } 819 820 /** 821 * {@inheritDoc} 822 */ 823 @Override 824 public void registerChildren(ConfigurationManager manager) { 825 // No children to register. 826 } 827 828 /** 829 * {@inheritDoc} 830 */ 831 @Override 832 public void deregisterChildren(ConfigurationManager manager) { 833 // No children to register 834 } 835 836 public ThroughputController getCompactionThroughputController() { 837 return compactionThroughputController; 838 } 839 840 /** 841 * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long 842 * compaction thread pool from stealing job from short compaction queue 843 */ 844 void shutdownLongCompactions() { 845 this.longCompactions.shutdown(); 846 } 847 848 public void clearLongCompactionsQueue() { 849 longCompactions.getQueue().clear(); 850 } 851 852 public void clearShortCompactionsQueue() { 853 shortCompactions.getQueue().clear(); 854 } 855 856 public boolean isCompactionsEnabled() { 857 return compactionsEnabled; 858 } 859 860 public void setCompactionsEnabled(boolean compactionsEnabled) { 861 this.compactionsEnabled = compactionsEnabled; 862 this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled); 863 } 864 865 /** Returns the longCompactions thread pool executor */ 866 ThreadPoolExecutor getLongCompactions() { 867 return longCompactions; 868 } 869 870 /** Returns the shortCompactions thread pool executor */ 871 ThreadPoolExecutor getShortCompactions() { 872 return shortCompactions; 873 } 874 875 private String getStoreNameForUnderCompaction(HStore store) { 876 return String.format("%s:%s", 877 store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "", 878 store.getColumnFamilyName()); 879 } 880 881}