001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Comparator; 027import java.util.Iterator; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.Executors; 033import java.util.concurrent.RejectedExecutionException; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.function.IntSupplier; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.conf.ConfigurationManager; 040import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 041import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 046import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 048import org.apache.hadoop.hbase.security.Superusers; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.StealJobQueue; 052import org.apache.hadoop.ipc.RemoteException; 053import org.apache.hadoop.util.StringUtils; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 060 061/** 062 * Compact region on request and then run split if appropriate 063 */ 064@InterfaceAudience.Private 065public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 066 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 067 068 // Configuration key for the large compaction threads. 069 public final static String LARGE_COMPACTION_THREADS = 070 "hbase.regionserver.thread.compaction.large"; 071 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 072 073 // Configuration key for the small compaction threads. 074 public final static String SMALL_COMPACTION_THREADS = 075 "hbase.regionserver.thread.compaction.small"; 076 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 077 078 // Configuration key for split threads 079 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 080 public final static int SPLIT_THREADS_DEFAULT = 1; 081 082 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 083 "hbase.regionserver.regionSplitLimit"; 084 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000; 085 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = 086 "hbase.regionserver.compaction.enabled"; 087 088 private final HRegionServer server; 089 private final Configuration conf; 090 private volatile ThreadPoolExecutor longCompactions; 091 private volatile ThreadPoolExecutor shortCompactions; 092 private volatile ThreadPoolExecutor splits; 093 094 private volatile ThroughputController compactionThroughputController; 095 private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet(); 096 097 private volatile boolean compactionsEnabled; 098 /** 099 * Splitting should not take place if the total number of regions exceed this. This is not a hard 100 * limit to the number of regions but it is a guideline to stop splitting after number of online 101 * regions is greater than this. 102 */ 103 private int regionSplitLimit; 104 105 CompactSplit(HRegionServer server) { 106 this.server = server; 107 this.conf = server.getConfiguration(); 108 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 109 createCompactionExecutors(); 110 createSplitExcecutors(); 111 112 // compaction throughput controller 113 this.compactionThroughputController = 114 CompactionThroughputControllerFactory.create(server, conf); 115 } 116 117 // only for test 118 public CompactSplit(Configuration conf) { 119 this.server = null; 120 this.conf = conf; 121 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 122 createCompactionExecutors(); 123 createSplitExcecutors(); 124 } 125 126 private void createSplitExcecutors() { 127 final String n = Thread.currentThread().getName(); 128 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 129 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 130 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 131 } 132 133 private void createCompactionExecutors() { 134 this.regionSplitLimit = 135 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 136 137 int largeThreads = 138 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 139 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 140 141 // if we have throttle threads, make sure the user also specified size 142 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 143 144 final String n = Thread.currentThread().getName(); 145 146 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 147 // Since the StealJobQueue inner uses the PriorityBlockingQueue, 148 // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for 149 // the long and short compaction thread pool executors since HBASE-27332. 150 // If anyone who what to change the StealJobQueue to a bounded queue, 151 // please add the rejection handler back. 152 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 153 stealJobQueue, 154 new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); 155 this.longCompactions.prestartAllCoreThreads(); 156 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 157 stealJobQueue.getStealFromQueue(), 158 new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 159 } 160 161 @Override 162 public String toString() { 163 return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size() 164 + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue=" 165 + splits.getQueue().size(); 166 } 167 168 public String dumpQueue() { 169 StringBuilder queueLists = new StringBuilder(); 170 queueLists.append("Compaction/Split Queue dump:\n"); 171 queueLists.append(" LargeCompation Queue:\n"); 172 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 173 Iterator<Runnable> it = lq.iterator(); 174 while (it.hasNext()) { 175 queueLists.append(" " + it.next().toString()); 176 queueLists.append("\n"); 177 } 178 179 if (shortCompactions != null) { 180 queueLists.append("\n"); 181 queueLists.append(" SmallCompation Queue:\n"); 182 lq = shortCompactions.getQueue(); 183 it = lq.iterator(); 184 while (it.hasNext()) { 185 queueLists.append(" " + it.next().toString()); 186 queueLists.append("\n"); 187 } 188 } 189 190 queueLists.append("\n"); 191 queueLists.append(" Split Queue:\n"); 192 lq = splits.getQueue(); 193 it = lq.iterator(); 194 while (it.hasNext()) { 195 queueLists.append(" " + it.next().toString()); 196 queueLists.append("\n"); 197 } 198 199 return queueLists.toString(); 200 } 201 202 public synchronized boolean requestSplit(final Region r) { 203 // Don't split regions that are blocking is the default behavior. 204 // But in some circumstances, split here is needed to prevent the region size from 205 // continuously growing, as well as the number of store files, see HBASE-26242. 206 HRegion hr = (HRegion) r; 207 try { 208 if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) { 209 byte[] midKey = hr.checkSplit().orElse(null); 210 if (midKey != null) { 211 requestSplit(r, midKey); 212 return true; 213 } 214 } 215 } catch (IndexOutOfBoundsException e) { 216 // We get this sometimes. Not sure why. Catch and return false; no split request. 217 LOG.warn("Catching out-of-bounds; region={}, policy={}", 218 hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e); 219 } 220 return false; 221 } 222 223 private synchronized void requestSplit(final Region r, byte[] midKey) { 224 requestSplit(r, midKey, null); 225 } 226 227 /* 228 * The User parameter allows the split thread to assume the correct user identity 229 */ 230 private synchronized void requestSplit(final Region r, byte[] midKey, User user) { 231 if (midKey == null) { 232 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() 233 + " not splittable because midkey=null"); 234 return; 235 } 236 try { 237 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 238 if (LOG.isDebugEnabled()) { 239 LOG.debug("Splitting " + r + ", " + this); 240 } 241 } catch (RejectedExecutionException ree) { 242 LOG.info("Could not execute split for " + r, ree); 243 } 244 } 245 246 private void interrupt() { 247 longCompactions.shutdownNow(); 248 shortCompactions.shutdownNow(); 249 } 250 251 private void reInitializeCompactionsExecutors() { 252 createCompactionExecutors(); 253 } 254 255 // set protected for test 256 protected interface CompactionCompleteTracker { 257 258 default void completed(Store store) { 259 } 260 } 261 262 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 263 new CompactionCompleteTracker() { 264 }; 265 266 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 267 268 private final CompactionLifeCycleTracker tracker; 269 270 private final AtomicInteger remaining; 271 272 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 273 this.tracker = tracker; 274 this.remaining = new AtomicInteger(numberOfStores); 275 } 276 277 @Override 278 public void completed(Store store) { 279 if (remaining.decrementAndGet() == 0) { 280 tracker.completed(); 281 } 282 } 283 } 284 285 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 286 IntSupplier numberOfStores) { 287 if (tracker == CompactionLifeCycleTracker.DUMMY) { 288 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 289 // the life cycle of a compaction. 290 return DUMMY_COMPLETE_TRACKER; 291 } else { 292 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 293 } 294 } 295 296 @Override 297 public synchronized void requestCompaction(HRegion region, String why, int priority, 298 CompactionLifeCycleTracker tracker, User user) throws IOException { 299 requestCompactionInternal(region, why, priority, true, tracker, 300 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 301 } 302 303 @Override 304 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 305 CompactionLifeCycleTracker tracker, User user) throws IOException { 306 requestCompactionInternal(region, store, why, priority, true, tracker, 307 getCompleteTracker(tracker, () -> 1), user); 308 } 309 310 @Override 311 public void switchCompaction(boolean onOrOff) { 312 if (onOrOff) { 313 // re-create executor pool if compactions are disabled. 314 if (!isCompactionsEnabled()) { 315 LOG.info("Re-Initializing compactions because user switched on compactions"); 316 reInitializeCompactionsExecutors(); 317 } 318 setCompactionsEnabled(onOrOff); 319 return; 320 } 321 322 setCompactionsEnabled(onOrOff); 323 LOG.info("Interrupting running compactions because user switched off compactions"); 324 interrupt(); 325 } 326 327 private void requestCompactionInternal(HRegion region, String why, int priority, 328 boolean selectNow, CompactionLifeCycleTracker tracker, 329 CompactionCompleteTracker completeTracker, User user) throws IOException { 330 // request compaction on all stores 331 for (HStore store : region.stores.values()) { 332 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 333 user); 334 } 335 } 336 337 // set protected for test 338 protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 339 boolean selectNow, CompactionLifeCycleTracker tracker, 340 CompactionCompleteTracker completeTracker, User user) throws IOException { 341 if (!this.isCompactionsEnabled()) { 342 LOG.info("Ignoring compaction request for " + region + ",because compaction is disabled."); 343 return; 344 } 345 346 if ( 347 this.server.isStopped() || (region.getTableDescriptor() != null 348 && !region.getTableDescriptor().isCompactionEnabled()) 349 ) { 350 return; 351 } 352 RegionServerSpaceQuotaManager spaceQuotaManager = 353 this.server.getRegionServerSpaceQuotaManager(); 354 355 if ( 356 user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 357 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName()) 358 ) { 359 // Enter here only when: 360 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 361 String reason = "Ignoring compaction request for " + region 362 + " as an active space quota violation " + " policy disallows compactions."; 363 tracker.notExecuted(store, reason); 364 completeTracker.completed(store); 365 LOG.debug(reason); 366 return; 367 } 368 369 CompactionContext compaction; 370 if (selectNow) { 371 Optional<CompactionContext> c = 372 selectCompaction(region, store, priority, tracker, completeTracker, user); 373 if (!c.isPresent()) { 374 // message logged inside 375 return; 376 } 377 compaction = c.get(); 378 } else { 379 compaction = null; 380 } 381 382 ThreadPoolExecutor pool; 383 if (selectNow) { 384 // compaction.get is safe as we will just return if selectNow is true but no compaction is 385 // selected 386 pool = store.throttleCompaction(compaction.getRequest().getSize()) 387 ? longCompactions 388 : shortCompactions; 389 } else { 390 // We assume that most compactions are small. So, put system compactions into small 391 // pool; we will do selection there, and move to large pool if necessary. 392 pool = shortCompactions; 393 } 394 395 // A simple implementation for under compaction marks. 396 // Since this method is always called in the synchronized methods, we do not need to use the 397 // boolean result to make sure that exactly the one that added here will be removed 398 // in the next steps. 399 underCompactionStores.add(getStoreNameForUnderCompaction(store)); 400 pool.execute( 401 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 402 if (LOG.isDebugEnabled()) { 403 LOG.debug( 404 "Add compact mark for store {}, priority={}, current under compaction " 405 + "store size is {}", 406 getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); 407 } 408 region.incrementCompactionsQueuedCount(); 409 if (LOG.isDebugEnabled()) { 410 String type = (pool == shortCompactions) ? "Small " : "Large "; 411 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 412 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 413 } 414 } 415 416 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 417 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 418 DUMMY_COMPLETE_TRACKER, null); 419 } 420 421 public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException { 422 requestSystemCompaction(region, store, why, false); 423 } 424 425 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why, 426 boolean giveUpIfRequestedOrCompacting) throws IOException { 427 if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) { 428 LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region, 429 store.getColumnFamilyName()); 430 return; 431 } 432 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 433 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 434 } 435 436 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 437 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 438 throws IOException { 439 // don't even select for compaction if disableCompactions is set to true 440 if (!isCompactionsEnabled()) { 441 LOG.info(String.format("User has disabled compactions")); 442 return Optional.empty(); 443 } 444 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 445 if (!compaction.isPresent() && region.getRegionInfo() != null) { 446 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() 447 + " because compaction request was cancelled"; 448 tracker.notExecuted(store, reason); 449 completeTracker.completed(store); 450 LOG.debug(reason); 451 } 452 return compaction; 453 } 454 455 /** 456 * Only interrupt once it's done with a run through the work loop. 457 */ 458 void interruptIfNecessary() { 459 splits.shutdown(); 460 longCompactions.shutdown(); 461 shortCompactions.shutdown(); 462 } 463 464 private void waitFor(ThreadPoolExecutor t, String name) { 465 boolean done = false; 466 while (!done) { 467 try { 468 done = t.awaitTermination(60, TimeUnit.SECONDS); 469 LOG.info("Waiting for " + name + " to finish..."); 470 if (!done) { 471 t.shutdownNow(); 472 } 473 } catch (InterruptedException ie) { 474 LOG.warn("Interrupted waiting for " + name + " to finish..."); 475 t.shutdownNow(); 476 } 477 } 478 } 479 480 void join() { 481 waitFor(splits, "Split Thread"); 482 waitFor(longCompactions, "Large Compaction Thread"); 483 waitFor(shortCompactions, "Small Compaction Thread"); 484 } 485 486 /** 487 * Returns the current size of the queue containing regions that are processed. 488 * @return The current size of the regions queue. 489 */ 490 public int getCompactionQueueSize() { 491 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 492 } 493 494 public int getLargeCompactionQueueSize() { 495 return longCompactions.getQueue().size(); 496 } 497 498 public int getSmallCompactionQueueSize() { 499 return shortCompactions.getQueue().size(); 500 } 501 502 public int getSplitQueueSize() { 503 return splits.getQueue().size(); 504 } 505 506 private boolean shouldSplitRegion() { 507 if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) { 508 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 509 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); 510 } 511 return (regionSplitLimit > server.getNumberOfOnlineRegions()); 512 } 513 514 /** Returns the regionSplitLimit */ 515 public int getRegionSplitLimit() { 516 return this.regionSplitLimit; 517 } 518 519 /** 520 * Check if this store is under compaction 521 */ 522 public boolean isUnderCompaction(final HStore s) { 523 return underCompactionStores.contains(getStoreNameForUnderCompaction(s)); 524 } 525 526 private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() { 527 528 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 529 if (r1 == r2) { 530 return 0; // they are the same request 531 } 532 // less first 533 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 534 if (cmp != 0) { 535 return cmp; 536 } 537 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 538 if (cmp != 0) { 539 return cmp; 540 } 541 542 // break the tie based on hash code 543 return System.identityHashCode(r1) - System.identityHashCode(r2); 544 } 545 546 @Override 547 public int compare(Runnable r1, Runnable r2) { 548 // CompactionRunner first 549 if (r1 instanceof CompactionRunner) { 550 if (!(r2 instanceof CompactionRunner)) { 551 return -1; 552 } 553 } else { 554 if (r2 instanceof CompactionRunner) { 555 return 1; 556 } else { 557 // break the tie based on hash code 558 return System.identityHashCode(r1) - System.identityHashCode(r2); 559 } 560 } 561 CompactionRunner o1 = (CompactionRunner) r1; 562 CompactionRunner o2 = (CompactionRunner) r2; 563 // less first 564 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 565 if (cmp != 0) { 566 return cmp; 567 } 568 CompactionContext c1 = o1.compaction; 569 CompactionContext c2 = o2.compaction; 570 if (c1 != null) { 571 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 572 } else { 573 return c2 != null ? 1 : 0; 574 } 575 } 576 }; 577 578 private final class CompactionRunner implements Runnable { 579 private final HStore store; 580 private final HRegion region; 581 private final CompactionContext compaction; 582 private final CompactionLifeCycleTracker tracker; 583 private final CompactionCompleteTracker completeTracker; 584 private int queuedPriority; 585 private ThreadPoolExecutor parent; 586 private User user; 587 private long time; 588 589 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 590 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 591 ThreadPoolExecutor parent, User user) { 592 this.store = store; 593 this.region = region; 594 this.compaction = compaction; 595 this.tracker = tracker; 596 this.completeTracker = completeTracker; 597 this.queuedPriority = 598 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 599 this.parent = parent; 600 this.user = user; 601 this.time = EnvironmentEdgeManager.currentTime(); 602 } 603 604 @Override 605 public String toString() { 606 if (compaction != null) { 607 return "Request=" + compaction.getRequest(); 608 } else { 609 return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority=" 610 + queuedPriority + ", startTime=" + time; 611 } 612 } 613 614 private void doCompaction(User user) { 615 CompactionContext c; 616 // Common case - system compaction without a file selection. Select now. 617 if (compaction == null) { 618 int oldPriority = this.queuedPriority; 619 this.queuedPriority = this.store.getCompactPriority(); 620 if (this.queuedPriority > oldPriority) { 621 // Store priority decreased while we were in queue (due to some other compaction?), 622 // requeue with new priority to avoid blocking potential higher priorities. 623 this.parent.execute(this); 624 return; 625 } 626 Optional<CompactionContext> selected; 627 try { 628 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 629 completeTracker, user); 630 } catch (IOException ex) { 631 LOG.error("Compaction selection failed " + this, ex); 632 server.checkFileSystem(); 633 region.decrementCompactionsQueuedCount(); 634 return; 635 } 636 if (!selected.isPresent()) { 637 region.decrementCompactionsQueuedCount(); 638 return; // nothing to do 639 } 640 c = selected.get(); 641 assert c.hasSelection(); 642 // Now see if we are in correct pool for the size; if not, go to the correct one. 643 // We might end up waiting for a while, so cancel the selection. 644 645 ThreadPoolExecutor pool = 646 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 647 648 // Long compaction pool can process small job 649 // Short compaction pool should not process large job 650 if (this.parent == shortCompactions && pool == longCompactions) { 651 this.store.cancelRequestedCompaction(c); 652 this.parent = pool; 653 this.parent.execute(this); 654 return; 655 } 656 } else { 657 c = compaction; 658 } 659 // Finally we can compact something. 660 assert c != null; 661 662 tracker.beforeExecution(store); 663 try { 664 // Note: please don't put single-compaction logic here; 665 // put it into region/store/etc. This is CST logic. 666 long start = EnvironmentEdgeManager.currentTime(); 667 boolean completed = region.compact(c, store, compactionThroughputController, user); 668 long now = EnvironmentEdgeManager.currentTime(); 669 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration=" 670 + StringUtils.formatTimeDiff(now, start)); 671 if (completed) { 672 // degenerate case: blocked regions require recursive enqueues 673 if ( 674 region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0 675 ) { 676 requestSystemCompaction(region, store, "Recursive enqueue"); 677 } else { 678 // see if the compaction has caused us to exceed max region size 679 if (!requestSplit(region) && store.getCompactPriority() <= 0) { 680 requestSystemCompaction(region, store, "Recursive enqueue"); 681 } 682 } 683 } 684 } catch (IOException ex) { 685 IOException remoteEx = 686 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 687 LOG.error("Compaction failed " + this, remoteEx); 688 if (remoteEx != ex) { 689 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 690 } 691 region.reportCompactionRequestFailure(); 692 server.checkFileSystem(); 693 } catch (Exception ex) { 694 LOG.error("Compaction failed " + this, ex); 695 region.reportCompactionRequestFailure(); 696 server.checkFileSystem(); 697 } finally { 698 tracker.afterExecution(store); 699 completeTracker.completed(store); 700 region.decrementCompactionsQueuedCount(); 701 LOG.debug("Status {}", CompactSplit.this); 702 } 703 } 704 705 @Override 706 public void run() { 707 try { 708 Preconditions.checkNotNull(server); 709 if ( 710 server.isStopped() || (region.getTableDescriptor() != null 711 && !region.getTableDescriptor().isCompactionEnabled()) 712 ) { 713 region.decrementCompactionsQueuedCount(); 714 return; 715 } 716 doCompaction(user); 717 } finally { 718 if (LOG.isDebugEnabled()) { 719 LOG.debug("Remove under compaction mark for store: {}", 720 store.getHRegion().getRegionInfo().getEncodedName() + ":" 721 + store.getColumnFamilyName()); 722 } 723 underCompactionStores.remove(getStoreNameForUnderCompaction(store)); 724 } 725 } 726 727 private String formatStackTrace(Exception ex) { 728 StringWriter sw = new StringWriter(); 729 PrintWriter pw = new PrintWriter(sw); 730 ex.printStackTrace(pw); 731 pw.flush(); 732 return sw.toString(); 733 } 734 } 735 736 /** 737 * {@inheritDoc} 738 */ 739 @Override 740 public void onConfigurationChange(Configuration newConf) { 741 // Check if number of large / small compaction threads has changed, and then 742 // adjust the core pool size of the thread pools, by using the 743 // setCorePoolSize() method. According to the javadocs, it is safe to 744 // change the core pool size on-the-fly. We need to reset the maximum 745 // pool size, as well. 746 int largeThreads = 747 Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 748 if (this.longCompactions.getCorePoolSize() != largeThreads) { 749 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from " 750 + this.longCompactions.getCorePoolSize() + " to " + largeThreads); 751 if (this.longCompactions.getCorePoolSize() < largeThreads) { 752 this.longCompactions.setMaximumPoolSize(largeThreads); 753 this.longCompactions.setCorePoolSize(largeThreads); 754 } else { 755 this.longCompactions.setCorePoolSize(largeThreads); 756 this.longCompactions.setMaximumPoolSize(largeThreads); 757 } 758 } 759 760 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 761 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 762 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from " 763 + this.shortCompactions.getCorePoolSize() + " to " + smallThreads); 764 if (this.shortCompactions.getCorePoolSize() < smallThreads) { 765 this.shortCompactions.setMaximumPoolSize(smallThreads); 766 this.shortCompactions.setCorePoolSize(smallThreads); 767 } else { 768 this.shortCompactions.setCorePoolSize(smallThreads); 769 this.shortCompactions.setMaximumPoolSize(smallThreads); 770 } 771 } 772 773 int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 774 if (this.splits.getCorePoolSize() != splitThreads) { 775 LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize() 776 + " to " + splitThreads); 777 if (this.splits.getCorePoolSize() < splitThreads) { 778 this.splits.setMaximumPoolSize(splitThreads); 779 this.splits.setCorePoolSize(splitThreads); 780 } else { 781 this.splits.setCorePoolSize(splitThreads); 782 this.splits.setMaximumPoolSize(splitThreads); 783 } 784 } 785 786 ThroughputController old = this.compactionThroughputController; 787 if (old != null) { 788 old.stop("configuration change"); 789 } 790 this.compactionThroughputController = 791 CompactionThroughputControllerFactory.create(server, newConf); 792 793 // We change this atomically here instead of reloading the config in order that upstream 794 // would be the only one with the flexibility to reload the config. 795 this.conf.reloadConfiguration(); 796 } 797 798 protected int getSmallCompactionThreadNum() { 799 return this.shortCompactions.getCorePoolSize(); 800 } 801 802 protected int getLargeCompactionThreadNum() { 803 return this.longCompactions.getCorePoolSize(); 804 } 805 806 protected int getSplitThreadNum() { 807 return this.splits.getCorePoolSize(); 808 } 809 810 /** 811 * {@inheritDoc} 812 */ 813 @Override 814 public void registerChildren(ConfigurationManager manager) { 815 // No children to register. 816 } 817 818 /** 819 * {@inheritDoc} 820 */ 821 @Override 822 public void deregisterChildren(ConfigurationManager manager) { 823 // No children to register 824 } 825 826 public ThroughputController getCompactionThroughputController() { 827 return compactionThroughputController; 828 } 829 830 /** 831 * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long 832 * compaction thread pool from stealing job from short compaction queue 833 */ 834 void shutdownLongCompactions() { 835 this.longCompactions.shutdown(); 836 } 837 838 public void clearLongCompactionsQueue() { 839 longCompactions.getQueue().clear(); 840 } 841 842 public void clearShortCompactionsQueue() { 843 shortCompactions.getQueue().clear(); 844 } 845 846 public boolean isCompactionsEnabled() { 847 return compactionsEnabled; 848 } 849 850 public void setCompactionsEnabled(boolean compactionsEnabled) { 851 this.compactionsEnabled = compactionsEnabled; 852 this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled); 853 } 854 855 /** Returns the longCompactions thread pool executor */ 856 ThreadPoolExecutor getLongCompactions() { 857 return longCompactions; 858 } 859 860 /** Returns the shortCompactions thread pool executor */ 861 ThreadPoolExecutor getShortCompactions() { 862 return shortCompactions; 863 } 864 865 private String getStoreNameForUnderCompaction(HStore store) { 866 return String.format("%s:%s", 867 store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "", 868 store.getColumnFamilyName()); 869 } 870 871}