001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Comparator; 027import java.util.Iterator; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.Executors; 033import java.util.concurrent.RejectedExecutionException; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.function.IntSupplier; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.conf.ConfigurationManager; 041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 049import org.apache.hadoop.hbase.security.Superusers; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.ConfigurationUtil; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.StealJobQueue; 054import org.apache.hadoop.ipc.RemoteException; 055import org.apache.hadoop.util.StringUtils; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 062 063/** 064 * Compact region on request and then run split if appropriate 065 */ 066@InterfaceAudience.Private 067public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 068 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 069 070 // Configuration key for the large compaction threads. 071 public final static String LARGE_COMPACTION_THREADS = 072 "hbase.regionserver.thread.compaction.large"; 073 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 074 075 // Configuration key for the small compaction threads. 076 public final static String SMALL_COMPACTION_THREADS = 077 "hbase.regionserver.thread.compaction.small"; 078 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 079 080 // Configuration key for split threads 081 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 082 public final static int SPLIT_THREADS_DEFAULT = 1; 083 084 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 085 "hbase.regionserver.regionSplitLimit"; 086 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000; 087 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = 088 "hbase.regionserver.compaction.enabled"; 089 090 private final HRegionServer server; 091 private final Configuration conf; 092 private volatile ThreadPoolExecutor longCompactions; 093 private volatile ThreadPoolExecutor shortCompactions; 094 private volatile ThreadPoolExecutor splits; 095 096 private volatile ThroughputController compactionThroughputController; 097 private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet(); 098 099 private volatile boolean compactionsEnabled; 100 /** 101 * Splitting should not take place if the total number of regions exceed this. This is not a hard 102 * limit to the number of regions but it is a guideline to stop splitting after number of online 103 * regions is greater than this. 104 */ 105 private int regionSplitLimit; 106 107 CompactSplit(HRegionServer server) { 108 this.server = server; 109 this.conf = server.getConfiguration(); 110 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 111 createCompactionExecutors(); 112 createSplitExcecutors(); 113 114 // compaction throughput controller 115 this.compactionThroughputController = 116 CompactionThroughputControllerFactory.create(server, conf); 117 } 118 119 // only for test 120 public CompactSplit(Configuration conf) { 121 this.server = null; 122 this.conf = conf; 123 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 124 createCompactionExecutors(); 125 createSplitExcecutors(); 126 } 127 128 private void createSplitExcecutors() { 129 final String n = Thread.currentThread().getName(); 130 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 131 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 132 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 133 } 134 135 private void createCompactionExecutors() { 136 this.regionSplitLimit = 137 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 138 139 int largeThreads = 140 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 141 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 142 143 // if we have throttle threads, make sure the user also specified size 144 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 145 146 final String n = Thread.currentThread().getName(); 147 148 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 149 // Since the StealJobQueue inner uses the PriorityBlockingQueue, 150 // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for 151 // the long and short compaction thread pool executors since HBASE-27332. 152 // If anyone who what to change the StealJobQueue to a bounded queue, 153 // please add the rejection handler back. 154 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 155 stealJobQueue, 156 new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); 157 this.longCompactions.prestartAllCoreThreads(); 158 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 159 stealJobQueue.getStealFromQueue(), 160 new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 161 } 162 163 @Override 164 public String toString() { 165 return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size() 166 + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue=" 167 + splits.getQueue().size(); 168 } 169 170 public String dumpQueue() { 171 StringBuilder queueLists = new StringBuilder(); 172 queueLists.append("Compaction/Split Queue dump:\n"); 173 queueLists.append(" LargeCompation Queue:\n"); 174 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 175 Iterator<Runnable> it = lq.iterator(); 176 while (it.hasNext()) { 177 queueLists.append(" " + it.next().toString()); 178 queueLists.append("\n"); 179 } 180 181 if (shortCompactions != null) { 182 queueLists.append("\n"); 183 queueLists.append(" SmallCompation Queue:\n"); 184 lq = shortCompactions.getQueue(); 185 it = lq.iterator(); 186 while (it.hasNext()) { 187 queueLists.append(" " + it.next().toString()); 188 queueLists.append("\n"); 189 } 190 } 191 192 queueLists.append("\n"); 193 queueLists.append(" Split Queue:\n"); 194 lq = splits.getQueue(); 195 it = lq.iterator(); 196 while (it.hasNext()) { 197 queueLists.append(" " + it.next().toString()); 198 queueLists.append("\n"); 199 } 200 201 return queueLists.toString(); 202 } 203 204 public synchronized boolean requestSplit(final Region r) { 205 // Don't split regions that are blocking is the default behavior. 206 // But in some circumstances, split here is needed to prevent the region size from 207 // continuously growing, as well as the number of store files, see HBASE-26242. 208 HRegion hr = (HRegion) r; 209 try { 210 if (shouldSplitRegion(r.getRegionInfo()) && hr.getCompactPriority() >= PRIORITY_USER) { 211 byte[] midKey = hr.checkSplit().orElse(null); 212 if (midKey != null) { 213 requestSplit(r, midKey); 214 return true; 215 } 216 } 217 } catch (IndexOutOfBoundsException e) { 218 // We get this sometimes. Not sure why. Catch and return false; no split request. 219 LOG.warn("Catching out-of-bounds; region={}, policy={}", 220 hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e); 221 } 222 return false; 223 } 224 225 private synchronized void requestSplit(final Region r, byte[] midKey) { 226 requestSplit(r, midKey, null); 227 } 228 229 /* 230 * The User parameter allows the split thread to assume the correct user identity 231 */ 232 private synchronized void requestSplit(final Region r, byte[] midKey, User user) { 233 if (midKey == null) { 234 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() 235 + " not splittable because midkey=null"); 236 return; 237 } 238 try { 239 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 240 if (LOG.isDebugEnabled()) { 241 LOG.debug("Splitting " + r + ", " + this); 242 } 243 } catch (RejectedExecutionException ree) { 244 LOG.info("Could not execute split for " + r, ree); 245 } 246 } 247 248 private void interrupt() { 249 longCompactions.shutdownNow(); 250 shortCompactions.shutdownNow(); 251 } 252 253 private void reInitializeCompactionsExecutors() { 254 createCompactionExecutors(); 255 } 256 257 // set protected for test 258 protected interface CompactionCompleteTracker { 259 260 default void completed(Store store) { 261 } 262 } 263 264 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 265 new CompactionCompleteTracker() { 266 }; 267 268 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 269 270 private final CompactionLifeCycleTracker tracker; 271 272 private final AtomicInteger remaining; 273 274 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 275 this.tracker = tracker; 276 this.remaining = new AtomicInteger(numberOfStores); 277 } 278 279 @Override 280 public void completed(Store store) { 281 if (remaining.decrementAndGet() == 0) { 282 tracker.completed(); 283 } 284 } 285 } 286 287 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 288 IntSupplier numberOfStores) { 289 if (tracker == CompactionLifeCycleTracker.DUMMY) { 290 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 291 // the life cycle of a compaction. 292 return DUMMY_COMPLETE_TRACKER; 293 } else { 294 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 295 } 296 } 297 298 @Override 299 public synchronized void requestCompaction(HRegion region, String why, int priority, 300 CompactionLifeCycleTracker tracker, User user) throws IOException { 301 requestCompactionInternal(region, why, priority, true, tracker, 302 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 303 } 304 305 @Override 306 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 307 CompactionLifeCycleTracker tracker, User user) throws IOException { 308 requestCompactionInternal(region, store, why, priority, true, tracker, 309 getCompleteTracker(tracker, () -> 1), user); 310 } 311 312 @Override 313 public void switchCompaction(boolean onOrOff) { 314 if (onOrOff) { 315 // re-create executor pool if compactions are disabled. 316 if (!isCompactionsEnabled()) { 317 LOG.info("Re-Initializing compactions because user switched on compactions"); 318 reInitializeCompactionsExecutors(); 319 } 320 setCompactionsEnabled(onOrOff); 321 return; 322 } 323 324 setCompactionsEnabled(onOrOff); 325 LOG.info("Interrupting running compactions because user switched off compactions"); 326 interrupt(); 327 } 328 329 private void requestCompactionInternal(HRegion region, String why, int priority, 330 boolean selectNow, CompactionLifeCycleTracker tracker, 331 CompactionCompleteTracker completeTracker, User user) throws IOException { 332 // request compaction on all stores 333 for (HStore store : region.stores.values()) { 334 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 335 user); 336 } 337 } 338 339 // set protected for test 340 protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 341 boolean selectNow, CompactionLifeCycleTracker tracker, 342 CompactionCompleteTracker completeTracker, User user) throws IOException { 343 if (!this.isCompactionsEnabled()) { 344 LOG.info("Ignoring compaction request for " + region + ",because compaction is disabled."); 345 return; 346 } 347 348 // Should not allow compaction if cluster is in read-only mode 349 if (ConfigurationUtil.isReadOnlyModeEnabledInConf(conf)) { 350 LOG.info("Ignoring compaction request for " + region + ",because read-only mode is on."); 351 return; 352 } 353 354 if ( 355 this.server.isStopped() || (region.getTableDescriptor() != null 356 && !region.getTableDescriptor().isCompactionEnabled()) 357 ) { 358 return; 359 } 360 RegionServerSpaceQuotaManager spaceQuotaManager = 361 this.server.getRegionServerSpaceQuotaManager(); 362 363 if ( 364 user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 365 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName()) 366 ) { 367 // Enter here only when: 368 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 369 String reason = "Ignoring compaction request for " + region 370 + " as an active space quota violation " + " policy disallows compactions."; 371 tracker.notExecuted(store, reason); 372 completeTracker.completed(store); 373 LOG.debug(reason); 374 return; 375 } 376 377 CompactionContext compaction; 378 if (selectNow) { 379 Optional<CompactionContext> c = 380 selectCompaction(region, store, priority, tracker, completeTracker, user); 381 if (!c.isPresent()) { 382 // message logged inside 383 return; 384 } 385 compaction = c.get(); 386 } else { 387 compaction = null; 388 } 389 390 ThreadPoolExecutor pool; 391 if (selectNow) { 392 // compaction.get is safe as we will just return if selectNow is true but no compaction is 393 // selected 394 pool = store.throttleCompaction(compaction.getRequest().getSize()) 395 ? longCompactions 396 : shortCompactions; 397 } else { 398 // We assume that most compactions are small. So, put system compactions into small 399 // pool; we will do selection there, and move to large pool if necessary. 400 pool = shortCompactions; 401 } 402 403 // A simple implementation for under compaction marks. 404 // Since this method is always called in the synchronized methods, we do not need to use the 405 // boolean result to make sure that exactly the one that added here will be removed 406 // in the next steps. 407 underCompactionStores.add(getStoreNameForUnderCompaction(store)); 408 pool.execute( 409 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 410 if (LOG.isDebugEnabled()) { 411 LOG.debug( 412 "Add compact mark for store {}, priority={}, current under compaction " 413 + "store size is {}", 414 getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); 415 } 416 region.incrementCompactionsQueuedCount(); 417 if (LOG.isDebugEnabled()) { 418 String type = (pool == shortCompactions) ? "Small " : "Large "; 419 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 420 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 421 } 422 } 423 424 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 425 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 426 DUMMY_COMPLETE_TRACKER, null); 427 } 428 429 public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException { 430 requestSystemCompaction(region, store, why, false); 431 } 432 433 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why, 434 boolean giveUpIfRequestedOrCompacting) throws IOException { 435 if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) { 436 LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region, 437 store.getColumnFamilyName()); 438 return; 439 } 440 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 441 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 442 } 443 444 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 445 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 446 throws IOException { 447 // don't even select for compaction if disableCompactions is set to true 448 if (!isCompactionsEnabled()) { 449 LOG.info(String.format("User has disabled compactions")); 450 return Optional.empty(); 451 } 452 453 // Should not allow compaction if cluster is in read-only mode 454 if (ConfigurationUtil.isReadOnlyModeEnabledInConf(conf)) { 455 LOG.info(String.format("Compaction request skipped as read-only mode is on")); 456 return Optional.empty(); 457 } 458 459 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 460 if (!compaction.isPresent() && region.getRegionInfo() != null) { 461 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() 462 + " because compaction request was cancelled"; 463 tracker.notExecuted(store, reason); 464 completeTracker.completed(store); 465 LOG.debug(reason); 466 } 467 return compaction; 468 } 469 470 /** 471 * Only interrupt once it's done with a run through the work loop. 472 */ 473 void interruptIfNecessary() { 474 splits.shutdown(); 475 longCompactions.shutdown(); 476 shortCompactions.shutdown(); 477 } 478 479 private void waitFor(ThreadPoolExecutor t, String name) { 480 boolean done = false; 481 while (!done) { 482 try { 483 done = t.awaitTermination(60, TimeUnit.SECONDS); 484 LOG.info("Waiting for " + name + " to finish..."); 485 if (!done) { 486 t.shutdownNow(); 487 } 488 } catch (InterruptedException ie) { 489 LOG.warn("Interrupted waiting for " + name + " to finish..."); 490 t.shutdownNow(); 491 } 492 } 493 } 494 495 void join() { 496 waitFor(splits, "Split Thread"); 497 waitFor(longCompactions, "Large Compaction Thread"); 498 waitFor(shortCompactions, "Small Compaction Thread"); 499 } 500 501 /** 502 * Returns the current size of the queue containing regions that are processed. 503 * @return The current size of the regions queue. 504 */ 505 public int getCompactionQueueSize() { 506 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 507 } 508 509 public int getLargeCompactionQueueSize() { 510 return longCompactions.getQueue().size(); 511 } 512 513 public int getSmallCompactionQueueSize() { 514 return shortCompactions.getQueue().size(); 515 } 516 517 public int getSplitQueueSize() { 518 return splits.getQueue().size(); 519 } 520 521 private boolean shouldSplitRegion(RegionInfo ri) { 522 if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) { 523 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 524 + "Please consider taking a look at " 525 + "https://hbase.apache.org/docs/operational-management/region-and-capacity#region-management"); 526 } 527 return (regionSplitLimit > server.getNumberOfOnlineRegions() 528 // Do not attempt to split secondary region replicas, as this is not allowed and our request 529 // to do so will be rejected 530 && ri.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID); 531 } 532 533 /** Returns the regionSplitLimit */ 534 public int getRegionSplitLimit() { 535 return this.regionSplitLimit; 536 } 537 538 /** 539 * Check if this store is under compaction 540 */ 541 public boolean isUnderCompaction(final HStore s) { 542 return underCompactionStores.contains(getStoreNameForUnderCompaction(s)); 543 } 544 545 private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() { 546 547 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 548 if (r1 == r2) { 549 return 0; // they are the same request 550 } 551 // less first 552 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 553 if (cmp != 0) { 554 return cmp; 555 } 556 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 557 if (cmp != 0) { 558 return cmp; 559 } 560 561 // break the tie based on hash code 562 return System.identityHashCode(r1) - System.identityHashCode(r2); 563 } 564 565 @Override 566 public int compare(Runnable r1, Runnable r2) { 567 // CompactionRunner first 568 if (r1 instanceof CompactionRunner) { 569 if (!(r2 instanceof CompactionRunner)) { 570 return -1; 571 } 572 } else { 573 if (r2 instanceof CompactionRunner) { 574 return 1; 575 } else { 576 // break the tie based on hash code 577 return System.identityHashCode(r1) - System.identityHashCode(r2); 578 } 579 } 580 CompactionRunner o1 = (CompactionRunner) r1; 581 CompactionRunner o2 = (CompactionRunner) r2; 582 // less first 583 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 584 if (cmp != 0) { 585 return cmp; 586 } 587 CompactionContext c1 = o1.compaction; 588 CompactionContext c2 = o2.compaction; 589 if (c1 != null) { 590 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 591 } else { 592 return c2 != null ? 1 : 0; 593 } 594 } 595 }; 596 597 private final class CompactionRunner implements Runnable { 598 private final HStore store; 599 private final HRegion region; 600 private final CompactionContext compaction; 601 private final CompactionLifeCycleTracker tracker; 602 private final CompactionCompleteTracker completeTracker; 603 private int queuedPriority; 604 private ThreadPoolExecutor parent; 605 private User user; 606 private long time; 607 608 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 609 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 610 ThreadPoolExecutor parent, User user) { 611 this.store = store; 612 this.region = region; 613 this.compaction = compaction; 614 this.tracker = tracker; 615 this.completeTracker = completeTracker; 616 this.queuedPriority = 617 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 618 this.parent = parent; 619 this.user = user; 620 this.time = EnvironmentEdgeManager.currentTime(); 621 } 622 623 @Override 624 public String toString() { 625 if (compaction != null) { 626 return "Request=" + compaction.getRequest(); 627 } else { 628 return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority=" 629 + queuedPriority + ", startTime=" + time; 630 } 631 } 632 633 private void doCompaction(User user) { 634 CompactionContext c; 635 // Common case - system compaction without a file selection. Select now. 636 if (compaction == null) { 637 int oldPriority = this.queuedPriority; 638 this.queuedPriority = this.store.getCompactPriority(); 639 if (this.queuedPriority > oldPriority) { 640 // Store priority decreased while we were in queue (due to some other compaction?), 641 // requeue with new priority to avoid blocking potential higher priorities. 642 this.parent.execute(this); 643 return; 644 } 645 Optional<CompactionContext> selected; 646 try { 647 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 648 completeTracker, user); 649 } catch (IOException ex) { 650 LOG.error("Compaction selection failed " + this, ex); 651 server.checkFileSystem(); 652 region.decrementCompactionsQueuedCount(); 653 return; 654 } 655 if (!selected.isPresent()) { 656 region.decrementCompactionsQueuedCount(); 657 return; // nothing to do 658 } 659 c = selected.get(); 660 assert c.hasSelection(); 661 // Now see if we are in correct pool for the size; if not, go to the correct one. 662 // We might end up waiting for a while, so cancel the selection. 663 664 ThreadPoolExecutor pool = 665 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 666 667 // Long compaction pool can process small job 668 // Short compaction pool should not process large job 669 if (this.parent == shortCompactions && pool == longCompactions) { 670 this.store.cancelRequestedCompaction(c); 671 this.parent = pool; 672 this.parent.execute(this); 673 return; 674 } 675 } else { 676 c = compaction; 677 } 678 // Finally we can compact something. 679 assert c != null; 680 681 tracker.beforeExecution(store); 682 try { 683 // Note: please don't put single-compaction logic here; 684 // put it into region/store/etc. This is CST logic. 685 long start = EnvironmentEdgeManager.currentTime(); 686 boolean completed = region.compact(c, store, compactionThroughputController, user); 687 long now = EnvironmentEdgeManager.currentTime(); 688 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration=" 689 + StringUtils.formatTimeDiff(now, start)); 690 if (completed) { 691 // degenerate case: blocked regions require recursive enqueues 692 if ( 693 region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0 694 ) { 695 requestSystemCompaction(region, store, "Recursive enqueue"); 696 } else { 697 // see if the compaction has caused us to exceed max region size 698 if (!requestSplit(region) && store.getCompactPriority() <= 0) { 699 requestSystemCompaction(region, store, "Recursive enqueue"); 700 } 701 } 702 } 703 } catch (IOException ex) { 704 IOException remoteEx = 705 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 706 LOG.error("Compaction failed " + this, remoteEx); 707 if (remoteEx != ex) { 708 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 709 } 710 region.reportCompactionRequestFailure(); 711 server.checkFileSystem(); 712 } catch (Exception ex) { 713 LOG.error("Compaction failed " + this, ex); 714 region.reportCompactionRequestFailure(); 715 server.checkFileSystem(); 716 } finally { 717 tracker.afterExecution(store); 718 completeTracker.completed(store); 719 region.decrementCompactionsQueuedCount(); 720 LOG.debug("Status {}", CompactSplit.this); 721 } 722 } 723 724 @Override 725 public void run() { 726 try { 727 Preconditions.checkNotNull(server); 728 if ( 729 server.isStopped() || (region.getTableDescriptor() != null 730 && !region.getTableDescriptor().isCompactionEnabled()) 731 ) { 732 region.decrementCompactionsQueuedCount(); 733 return; 734 } 735 doCompaction(user); 736 } finally { 737 if (LOG.isDebugEnabled()) { 738 LOG.debug("Remove under compaction mark for store: {}", 739 store.getHRegion().getRegionInfo().getEncodedName() + ":" 740 + store.getColumnFamilyName()); 741 } 742 underCompactionStores.remove(getStoreNameForUnderCompaction(store)); 743 } 744 } 745 746 private String formatStackTrace(Exception ex) { 747 StringWriter sw = new StringWriter(); 748 PrintWriter pw = new PrintWriter(sw); 749 ex.printStackTrace(pw); 750 pw.flush(); 751 return sw.toString(); 752 } 753 } 754 755 /** 756 * {@inheritDoc} 757 */ 758 @Override 759 public void onConfigurationChange(Configuration newConf) { 760 // Check if number of large / small compaction threads has changed, and then 761 // adjust the core pool size of the thread pools, by using the 762 // setCorePoolSize() method. According to the javadocs, it is safe to 763 // change the core pool size on-the-fly. We need to reset the maximum 764 // pool size, as well. 765 int largeThreads = 766 Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 767 if (this.longCompactions.getCorePoolSize() != largeThreads) { 768 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from " 769 + this.longCompactions.getCorePoolSize() + " to " + largeThreads); 770 if (this.longCompactions.getCorePoolSize() < largeThreads) { 771 this.longCompactions.setMaximumPoolSize(largeThreads); 772 this.longCompactions.setCorePoolSize(largeThreads); 773 } else { 774 this.longCompactions.setCorePoolSize(largeThreads); 775 this.longCompactions.setMaximumPoolSize(largeThreads); 776 } 777 } 778 779 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 780 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 781 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from " 782 + this.shortCompactions.getCorePoolSize() + " to " + smallThreads); 783 if (this.shortCompactions.getCorePoolSize() < smallThreads) { 784 this.shortCompactions.setMaximumPoolSize(smallThreads); 785 this.shortCompactions.setCorePoolSize(smallThreads); 786 } else { 787 this.shortCompactions.setCorePoolSize(smallThreads); 788 this.shortCompactions.setMaximumPoolSize(smallThreads); 789 } 790 } 791 792 int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 793 if (this.splits.getCorePoolSize() != splitThreads) { 794 LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize() 795 + " to " + splitThreads); 796 if (this.splits.getCorePoolSize() < splitThreads) { 797 this.splits.setMaximumPoolSize(splitThreads); 798 this.splits.setCorePoolSize(splitThreads); 799 } else { 800 this.splits.setCorePoolSize(splitThreads); 801 this.splits.setMaximumPoolSize(splitThreads); 802 } 803 } 804 805 ThroughputController old = this.compactionThroughputController; 806 if (old != null) { 807 old.stop("configuration change"); 808 } 809 this.compactionThroughputController = 810 CompactionThroughputControllerFactory.create(server, newConf); 811 812 // We change this atomically here instead of reloading the config in order that upstream 813 // would be the only one with the flexibility to reload the config. 814 this.conf.reloadConfiguration(); 815 } 816 817 protected int getSmallCompactionThreadNum() { 818 return this.shortCompactions.getCorePoolSize(); 819 } 820 821 protected int getLargeCompactionThreadNum() { 822 return this.longCompactions.getCorePoolSize(); 823 } 824 825 protected int getSplitThreadNum() { 826 return this.splits.getCorePoolSize(); 827 } 828 829 /** Exposed for unit testing */ 830 long getSubmittedSplitsCount() { 831 return this.splits.getTaskCount(); 832 } 833 834 /** 835 * {@inheritDoc} 836 */ 837 @Override 838 public void registerChildren(ConfigurationManager manager) { 839 // No children to register. 840 } 841 842 /** 843 * {@inheritDoc} 844 */ 845 @Override 846 public void deregisterChildren(ConfigurationManager manager) { 847 // No children to register 848 } 849 850 public ThroughputController getCompactionThroughputController() { 851 return compactionThroughputController; 852 } 853 854 /** 855 * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long 856 * compaction thread pool from stealing job from short compaction queue 857 */ 858 void shutdownLongCompactions() { 859 this.longCompactions.shutdown(); 860 } 861 862 public void clearLongCompactionsQueue() { 863 longCompactions.getQueue().clear(); 864 } 865 866 public void clearShortCompactionsQueue() { 867 shortCompactions.getQueue().clear(); 868 } 869 870 public boolean isCompactionsEnabled() { 871 return compactionsEnabled; 872 } 873 874 public void setCompactionsEnabled(boolean compactionsEnabled) { 875 this.compactionsEnabled = compactionsEnabled; 876 this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled); 877 } 878 879 /** Returns the longCompactions thread pool executor */ 880 ThreadPoolExecutor getLongCompactions() { 881 return longCompactions; 882 } 883 884 /** Returns the shortCompactions thread pool executor */ 885 ThreadPoolExecutor getShortCompactions() { 886 return shortCompactions; 887 } 888 889 private String getStoreNameForUnderCompaction(HStore store) { 890 return String.format("%s:%s", 891 store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "", 892 store.getColumnFamilyName()); 893 } 894 895}