001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; 021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Comparator; 027import java.util.Iterator; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.Executors; 033import java.util.concurrent.RejectedExecutionException; 034import java.util.concurrent.RejectedExecutionHandler; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.function.IntSupplier; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.conf.ConfigurationManager; 041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 049import org.apache.hadoop.hbase.security.Superusers; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.StealJobQueue; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.util.StringUtils; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062/** 063 * Compact region on request and then run split if appropriate 064 */ 065@InterfaceAudience.Private 066public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { 067 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); 068 069 // Configuration key for the large compaction threads. 070 public final static String LARGE_COMPACTION_THREADS = 071 "hbase.regionserver.thread.compaction.large"; 072 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; 073 074 // Configuration key for the small compaction threads. 075 public final static String SMALL_COMPACTION_THREADS = 076 "hbase.regionserver.thread.compaction.small"; 077 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; 078 079 // Configuration key for split threads 080 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; 081 public final static int SPLIT_THREADS_DEFAULT = 1; 082 083 public static final String REGION_SERVER_REGION_SPLIT_LIMIT = 084 "hbase.regionserver.regionSplitLimit"; 085 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000; 086 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = 087 "hbase.regionserver.compaction.enabled"; 088 089 private final HRegionServer server; 090 private final Configuration conf; 091 private volatile ThreadPoolExecutor longCompactions; 092 private volatile ThreadPoolExecutor shortCompactions; 093 private volatile ThreadPoolExecutor splits; 094 095 private volatile ThroughputController compactionThroughputController; 096 private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet(); 097 098 private volatile boolean compactionsEnabled; 099 /** 100 * Splitting should not take place if the total number of regions exceed this. This is not a hard 101 * limit to the number of regions but it is a guideline to stop splitting after number of online 102 * regions is greater than this. 103 */ 104 private int regionSplitLimit; 105 106 CompactSplit(HRegionServer server) { 107 this.server = server; 108 this.conf = server.getConfiguration(); 109 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 110 createCompactionExecutors(); 111 createSplitExcecutors(); 112 113 // compaction throughput controller 114 this.compactionThroughputController = 115 CompactionThroughputControllerFactory.create(server, conf); 116 } 117 118 // only for test 119 public CompactSplit(Configuration conf) { 120 this.server = null; 121 this.conf = conf; 122 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); 123 createCompactionExecutors(); 124 createSplitExcecutors(); 125 } 126 127 private void createSplitExcecutors() { 128 final String n = Thread.currentThread().getName(); 129 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 130 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, 131 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build()); 132 } 133 134 private void createCompactionExecutors() { 135 this.regionSplitLimit = 136 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); 137 138 int largeThreads = 139 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 140 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 141 142 // if we have throttle threads, make sure the user also specified size 143 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); 144 145 final String n = Thread.currentThread().getName(); 146 147 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); 148 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, 149 stealJobQueue, 150 new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build()); 151 this.longCompactions.setRejectedExecutionHandler(new Rejection()); 152 this.longCompactions.prestartAllCoreThreads(); 153 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, 154 stealJobQueue.getStealFromQueue(), 155 new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build()); 156 this.shortCompactions.setRejectedExecutionHandler(new Rejection()); 157 } 158 159 @Override 160 public String toString() { 161 return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size() 162 + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue=" 163 + splits.getQueue().size(); 164 } 165 166 public String dumpQueue() { 167 StringBuilder queueLists = new StringBuilder(); 168 queueLists.append("Compaction/Split Queue dump:\n"); 169 queueLists.append(" LargeCompation Queue:\n"); 170 BlockingQueue<Runnable> lq = longCompactions.getQueue(); 171 Iterator<Runnable> it = lq.iterator(); 172 while (it.hasNext()) { 173 queueLists.append(" " + it.next().toString()); 174 queueLists.append("\n"); 175 } 176 177 if (shortCompactions != null) { 178 queueLists.append("\n"); 179 queueLists.append(" SmallCompation Queue:\n"); 180 lq = shortCompactions.getQueue(); 181 it = lq.iterator(); 182 while (it.hasNext()) { 183 queueLists.append(" " + it.next().toString()); 184 queueLists.append("\n"); 185 } 186 } 187 188 queueLists.append("\n"); 189 queueLists.append(" Split Queue:\n"); 190 lq = splits.getQueue(); 191 it = lq.iterator(); 192 while (it.hasNext()) { 193 queueLists.append(" " + it.next().toString()); 194 queueLists.append("\n"); 195 } 196 197 return queueLists.toString(); 198 } 199 200 public synchronized boolean requestSplit(final Region r) { 201 // Don't split regions that are blocking is the default behavior. 202 // But in some circumstances, split here is needed to prevent the region size from 203 // continuously growing, as well as the number of store files, see HBASE-26242. 204 HRegion hr = (HRegion) r; 205 try { 206 if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) { 207 byte[] midKey = hr.checkSplit().orElse(null); 208 if (midKey != null) { 209 requestSplit(r, midKey); 210 return true; 211 } 212 } 213 } catch (IndexOutOfBoundsException e) { 214 // We get this sometimes. Not sure why. Catch and return false; no split request. 215 LOG.warn("Catching out-of-bounds; region={}, policy={}", 216 hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e); 217 } 218 return false; 219 } 220 221 private synchronized void requestSplit(final Region r, byte[] midKey) { 222 requestSplit(r, midKey, null); 223 } 224 225 /* 226 * The User parameter allows the split thread to assume the correct user identity 227 */ 228 private synchronized void requestSplit(final Region r, byte[] midKey, User user) { 229 if (midKey == null) { 230 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() 231 + " not splittable because midkey=null"); 232 return; 233 } 234 try { 235 this.splits.execute(new SplitRequest(r, midKey, this.server, user)); 236 if (LOG.isDebugEnabled()) { 237 LOG.debug("Splitting " + r + ", " + this); 238 } 239 } catch (RejectedExecutionException ree) { 240 LOG.info("Could not execute split for " + r, ree); 241 } 242 } 243 244 private void interrupt() { 245 longCompactions.shutdownNow(); 246 shortCompactions.shutdownNow(); 247 } 248 249 private void reInitializeCompactionsExecutors() { 250 createCompactionExecutors(); 251 } 252 253 // set protected for test 254 protected interface CompactionCompleteTracker { 255 256 default void completed(Store store) { 257 } 258 } 259 260 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = 261 new CompactionCompleteTracker() { 262 }; 263 264 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { 265 266 private final CompactionLifeCycleTracker tracker; 267 268 private final AtomicInteger remaining; 269 270 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { 271 this.tracker = tracker; 272 this.remaining = new AtomicInteger(numberOfStores); 273 } 274 275 @Override 276 public void completed(Store store) { 277 if (remaining.decrementAndGet() == 0) { 278 tracker.completed(); 279 } 280 } 281 } 282 283 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, 284 IntSupplier numberOfStores) { 285 if (tracker == CompactionLifeCycleTracker.DUMMY) { 286 // a simple optimization to avoid creating unnecessary objects as usually we do not care about 287 // the life cycle of a compaction. 288 return DUMMY_COMPLETE_TRACKER; 289 } else { 290 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); 291 } 292 } 293 294 @Override 295 public synchronized void requestCompaction(HRegion region, String why, int priority, 296 CompactionLifeCycleTracker tracker, User user) throws IOException { 297 requestCompactionInternal(region, why, priority, true, tracker, 298 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); 299 } 300 301 @Override 302 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, 303 CompactionLifeCycleTracker tracker, User user) throws IOException { 304 requestCompactionInternal(region, store, why, priority, true, tracker, 305 getCompleteTracker(tracker, () -> 1), user); 306 } 307 308 @Override 309 public void switchCompaction(boolean onOrOff) { 310 if (onOrOff) { 311 // re-create executor pool if compactions are disabled. 312 if (!isCompactionsEnabled()) { 313 LOG.info("Re-Initializing compactions because user switched on compactions"); 314 reInitializeCompactionsExecutors(); 315 } 316 } else { 317 LOG.info("Interrupting running compactions because user switched off compactions"); 318 interrupt(); 319 } 320 setCompactionsEnabled(onOrOff); 321 } 322 323 private void requestCompactionInternal(HRegion region, String why, int priority, 324 boolean selectNow, CompactionLifeCycleTracker tracker, 325 CompactionCompleteTracker completeTracker, User user) throws IOException { 326 // request compaction on all stores 327 for (HStore store : region.stores.values()) { 328 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, 329 user); 330 } 331 } 332 333 // set protected for test 334 protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, 335 boolean selectNow, CompactionLifeCycleTracker tracker, 336 CompactionCompleteTracker completeTracker, User user) throws IOException { 337 if ( 338 this.server.isStopped() || (region.getTableDescriptor() != null 339 && !region.getTableDescriptor().isCompactionEnabled()) 340 ) { 341 return; 342 } 343 RegionServerSpaceQuotaManager spaceQuotaManager = 344 this.server.getRegionServerSpaceQuotaManager(); 345 346 if ( 347 user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null 348 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName()) 349 ) { 350 // Enter here only when: 351 // It's a user generated req, the user is super user, quotas enabled, compactions disabled. 352 String reason = "Ignoring compaction request for " + region 353 + " as an active space quota violation " + " policy disallows compactions."; 354 tracker.notExecuted(store, reason); 355 completeTracker.completed(store); 356 LOG.debug(reason); 357 return; 358 } 359 360 CompactionContext compaction; 361 if (selectNow) { 362 Optional<CompactionContext> c = 363 selectCompaction(region, store, priority, tracker, completeTracker, user); 364 if (!c.isPresent()) { 365 // message logged inside 366 return; 367 } 368 compaction = c.get(); 369 } else { 370 compaction = null; 371 } 372 373 ThreadPoolExecutor pool; 374 if (selectNow) { 375 // compaction.get is safe as we will just return if selectNow is true but no compaction is 376 // selected 377 pool = store.throttleCompaction(compaction.getRequest().getSize()) 378 ? longCompactions 379 : shortCompactions; 380 } else { 381 // We assume that most compactions are small. So, put system compactions into small 382 // pool; we will do selection there, and move to large pool if necessary. 383 pool = shortCompactions; 384 } 385 pool.execute( 386 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); 387 if (LOG.isDebugEnabled()) { 388 LOG.debug( 389 "Add compact mark for store {}, priority={}, current under compaction " 390 + "store size is {}", 391 getStoreNameForUnderCompaction(store), priority, underCompactionStores.size()); 392 } 393 underCompactionStores.add(getStoreNameForUnderCompaction(store)); 394 region.incrementCompactionsQueuedCount(); 395 if (LOG.isDebugEnabled()) { 396 String type = (pool == shortCompactions) ? "Small " : "Large "; 397 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") 398 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); 399 } 400 } 401 402 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { 403 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, 404 DUMMY_COMPLETE_TRACKER, null); 405 } 406 407 public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException { 408 requestSystemCompaction(region, store, why, false); 409 } 410 411 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why, 412 boolean giveUpIfRequestedOrCompacting) throws IOException { 413 if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) { 414 LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region, 415 store.getColumnFamilyName()); 416 return; 417 } 418 requestCompactionInternal(region, store, why, NO_PRIORITY, false, 419 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); 420 } 421 422 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, 423 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) 424 throws IOException { 425 // don't even select for compaction if disableCompactions is set to true 426 if (!isCompactionsEnabled()) { 427 LOG.info(String.format("User has disabled compactions")); 428 return Optional.empty(); 429 } 430 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); 431 if (!compaction.isPresent() && region.getRegionInfo() != null) { 432 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() 433 + " because compaction request was cancelled"; 434 tracker.notExecuted(store, reason); 435 completeTracker.completed(store); 436 LOG.debug(reason); 437 } 438 return compaction; 439 } 440 441 /** 442 * Only interrupt once it's done with a run through the work loop. 443 */ 444 void interruptIfNecessary() { 445 splits.shutdown(); 446 longCompactions.shutdown(); 447 shortCompactions.shutdown(); 448 } 449 450 private void waitFor(ThreadPoolExecutor t, String name) { 451 boolean done = false; 452 while (!done) { 453 try { 454 done = t.awaitTermination(60, TimeUnit.SECONDS); 455 LOG.info("Waiting for " + name + " to finish..."); 456 if (!done) { 457 t.shutdownNow(); 458 } 459 } catch (InterruptedException ie) { 460 LOG.warn("Interrupted waiting for " + name + " to finish..."); 461 t.shutdownNow(); 462 } 463 } 464 } 465 466 void join() { 467 waitFor(splits, "Split Thread"); 468 waitFor(longCompactions, "Large Compaction Thread"); 469 waitFor(shortCompactions, "Small Compaction Thread"); 470 } 471 472 /** 473 * Returns the current size of the queue containing regions that are processed. 474 * @return The current size of the regions queue. 475 */ 476 public int getCompactionQueueSize() { 477 return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); 478 } 479 480 public int getLargeCompactionQueueSize() { 481 return longCompactions.getQueue().size(); 482 } 483 484 public int getSmallCompactionQueueSize() { 485 return shortCompactions.getQueue().size(); 486 } 487 488 public int getSplitQueueSize() { 489 return splits.getQueue().size(); 490 } 491 492 private boolean shouldSplitRegion() { 493 if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) { 494 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " 495 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); 496 } 497 return (regionSplitLimit > server.getNumberOfOnlineRegions()); 498 } 499 500 /** Returns the regionSplitLimit */ 501 public int getRegionSplitLimit() { 502 return this.regionSplitLimit; 503 } 504 505 /** 506 * Check if this store is under compaction 507 */ 508 public boolean isUnderCompaction(final HStore s) { 509 return underCompactionStores.contains(getStoreNameForUnderCompaction(s)); 510 } 511 512 private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() { 513 514 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) { 515 if (r1 == r2) { 516 return 0; // they are the same request 517 } 518 // less first 519 int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); 520 if (cmp != 0) { 521 return cmp; 522 } 523 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime()); 524 if (cmp != 0) { 525 return cmp; 526 } 527 528 // break the tie based on hash code 529 return System.identityHashCode(r1) - System.identityHashCode(r2); 530 } 531 532 @Override 533 public int compare(Runnable r1, Runnable r2) { 534 // CompactionRunner first 535 if (r1 instanceof CompactionRunner) { 536 if (!(r2 instanceof CompactionRunner)) { 537 return -1; 538 } 539 } else { 540 if (r2 instanceof CompactionRunner) { 541 return 1; 542 } else { 543 // break the tie based on hash code 544 return System.identityHashCode(r1) - System.identityHashCode(r2); 545 } 546 } 547 CompactionRunner o1 = (CompactionRunner) r1; 548 CompactionRunner o2 = (CompactionRunner) r2; 549 // less first 550 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); 551 if (cmp != 0) { 552 return cmp; 553 } 554 CompactionContext c1 = o1.compaction; 555 CompactionContext c2 = o2.compaction; 556 if (c1 != null) { 557 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; 558 } else { 559 return c2 != null ? 1 : 0; 560 } 561 } 562 }; 563 564 private final class CompactionRunner implements Runnable { 565 private final HStore store; 566 private final HRegion region; 567 private final CompactionContext compaction; 568 private final CompactionLifeCycleTracker tracker; 569 private final CompactionCompleteTracker completeTracker; 570 private int queuedPriority; 571 private ThreadPoolExecutor parent; 572 private User user; 573 private long time; 574 575 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, 576 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, 577 ThreadPoolExecutor parent, User user) { 578 this.store = store; 579 this.region = region; 580 this.compaction = compaction; 581 this.tracker = tracker; 582 this.completeTracker = completeTracker; 583 this.queuedPriority = 584 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); 585 this.parent = parent; 586 this.user = user; 587 this.time = EnvironmentEdgeManager.currentTime(); 588 } 589 590 @Override 591 public String toString() { 592 if (compaction != null) { 593 return "Request=" + compaction.getRequest(); 594 } else { 595 return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority=" 596 + queuedPriority + ", startTime=" + time; 597 } 598 } 599 600 private void doCompaction(User user) { 601 CompactionContext c; 602 // Common case - system compaction without a file selection. Select now. 603 if (compaction == null) { 604 int oldPriority = this.queuedPriority; 605 this.queuedPriority = this.store.getCompactPriority(); 606 if (this.queuedPriority > oldPriority) { 607 // Store priority decreased while we were in queue (due to some other compaction?), 608 // requeue with new priority to avoid blocking potential higher priorities. 609 this.parent.execute(this); 610 return; 611 } 612 Optional<CompactionContext> selected; 613 try { 614 selected = selectCompaction(this.region, this.store, queuedPriority, tracker, 615 completeTracker, user); 616 } catch (IOException ex) { 617 LOG.error("Compaction selection failed " + this, ex); 618 server.checkFileSystem(); 619 region.decrementCompactionsQueuedCount(); 620 return; 621 } 622 if (!selected.isPresent()) { 623 region.decrementCompactionsQueuedCount(); 624 return; // nothing to do 625 } 626 c = selected.get(); 627 assert c.hasSelection(); 628 // Now see if we are in correct pool for the size; if not, go to the correct one. 629 // We might end up waiting for a while, so cancel the selection. 630 631 ThreadPoolExecutor pool = 632 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; 633 634 // Long compaction pool can process small job 635 // Short compaction pool should not process large job 636 if (this.parent == shortCompactions && pool == longCompactions) { 637 this.store.cancelRequestedCompaction(c); 638 this.parent = pool; 639 this.parent.execute(this); 640 return; 641 } 642 } else { 643 c = compaction; 644 } 645 // Finally we can compact something. 646 assert c != null; 647 648 tracker.beforeExecution(store); 649 try { 650 // Note: please don't put single-compaction logic here; 651 // put it into region/store/etc. This is CST logic. 652 long start = EnvironmentEdgeManager.currentTime(); 653 boolean completed = region.compact(c, store, compactionThroughputController, user); 654 long now = EnvironmentEdgeManager.currentTime(); 655 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration=" 656 + StringUtils.formatTimeDiff(now, start)); 657 if (completed) { 658 // degenerate case: blocked regions require recursive enqueues 659 if ( 660 region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0 661 ) { 662 requestSystemCompaction(region, store, "Recursive enqueue"); 663 } else { 664 // see if the compaction has caused us to exceed max region size 665 if (!requestSplit(region) && store.getCompactPriority() <= 0) { 666 requestSystemCompaction(region, store, "Recursive enqueue"); 667 } 668 } 669 } 670 } catch (IOException ex) { 671 IOException remoteEx = 672 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 673 LOG.error("Compaction failed " + this, remoteEx); 674 if (remoteEx != ex) { 675 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); 676 } 677 region.reportCompactionRequestFailure(); 678 server.checkFileSystem(); 679 } catch (Exception ex) { 680 LOG.error("Compaction failed " + this, ex); 681 region.reportCompactionRequestFailure(); 682 server.checkFileSystem(); 683 } finally { 684 tracker.afterExecution(store); 685 completeTracker.completed(store); 686 region.decrementCompactionsQueuedCount(); 687 LOG.debug("Status {}", CompactSplit.this); 688 } 689 } 690 691 @Override 692 public void run() { 693 try { 694 Preconditions.checkNotNull(server); 695 if ( 696 server.isStopped() || (region.getTableDescriptor() != null 697 && !region.getTableDescriptor().isCompactionEnabled()) 698 ) { 699 region.decrementCompactionsQueuedCount(); 700 return; 701 } 702 doCompaction(user); 703 } finally { 704 if (LOG.isDebugEnabled()) { 705 LOG.debug("Remove under compaction mark for store: {}", 706 store.getHRegion().getRegionInfo().getEncodedName() + ":" 707 + store.getColumnFamilyName()); 708 } 709 underCompactionStores.remove(getStoreNameForUnderCompaction(store)); 710 } 711 } 712 713 private String formatStackTrace(Exception ex) { 714 StringWriter sw = new StringWriter(); 715 PrintWriter pw = new PrintWriter(sw); 716 ex.printStackTrace(pw); 717 pw.flush(); 718 return sw.toString(); 719 } 720 } 721 722 /** 723 * Cleanup class to use when rejecting a compaction request from the queue. 724 */ 725 private static class Rejection implements RejectedExecutionHandler { 726 @Override 727 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { 728 if (runnable instanceof CompactionRunner) { 729 CompactionRunner runner = (CompactionRunner) runnable; 730 LOG.debug("Compaction Rejected: " + runner); 731 if (runner.compaction != null) { 732 runner.store.cancelRequestedCompaction(runner.compaction); 733 } 734 } 735 } 736 } 737 738 /** 739 * {@inheritDoc} 740 */ 741 @Override 742 public void onConfigurationChange(Configuration newConf) { 743 // Check if number of large / small compaction threads has changed, and then 744 // adjust the core pool size of the thread pools, by using the 745 // setCorePoolSize() method. According to the javadocs, it is safe to 746 // change the core pool size on-the-fly. We need to reset the maximum 747 // pool size, as well. 748 int largeThreads = 749 Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); 750 if (this.longCompactions.getCorePoolSize() != largeThreads) { 751 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from " 752 + this.longCompactions.getCorePoolSize() + " to " + largeThreads); 753 if (this.longCompactions.getCorePoolSize() < largeThreads) { 754 this.longCompactions.setMaximumPoolSize(largeThreads); 755 this.longCompactions.setCorePoolSize(largeThreads); 756 } else { 757 this.longCompactions.setCorePoolSize(largeThreads); 758 this.longCompactions.setMaximumPoolSize(largeThreads); 759 } 760 } 761 762 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); 763 if (this.shortCompactions.getCorePoolSize() != smallThreads) { 764 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from " 765 + this.shortCompactions.getCorePoolSize() + " to " + smallThreads); 766 if (this.shortCompactions.getCorePoolSize() < smallThreads) { 767 this.shortCompactions.setMaximumPoolSize(smallThreads); 768 this.shortCompactions.setCorePoolSize(smallThreads); 769 } else { 770 this.shortCompactions.setCorePoolSize(smallThreads); 771 this.shortCompactions.setMaximumPoolSize(smallThreads); 772 } 773 } 774 775 int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); 776 if (this.splits.getCorePoolSize() != splitThreads) { 777 LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize() 778 + " to " + splitThreads); 779 if (this.splits.getCorePoolSize() < splitThreads) { 780 this.splits.setMaximumPoolSize(splitThreads); 781 this.splits.setCorePoolSize(splitThreads); 782 } else { 783 this.splits.setCorePoolSize(splitThreads); 784 this.splits.setMaximumPoolSize(splitThreads); 785 } 786 } 787 788 ThroughputController old = this.compactionThroughputController; 789 if (old != null) { 790 old.stop("configuration change"); 791 } 792 this.compactionThroughputController = 793 CompactionThroughputControllerFactory.create(server, newConf); 794 795 // We change this atomically here instead of reloading the config in order that upstream 796 // would be the only one with the flexibility to reload the config. 797 this.conf.reloadConfiguration(); 798 } 799 800 protected int getSmallCompactionThreadNum() { 801 return this.shortCompactions.getCorePoolSize(); 802 } 803 804 protected int getLargeCompactionThreadNum() { 805 return this.longCompactions.getCorePoolSize(); 806 } 807 808 protected int getSplitThreadNum() { 809 return this.splits.getCorePoolSize(); 810 } 811 812 /** 813 * {@inheritDoc} 814 */ 815 @Override 816 public void registerChildren(ConfigurationManager manager) { 817 // No children to register. 818 } 819 820 /** 821 * {@inheritDoc} 822 */ 823 @Override 824 public void deregisterChildren(ConfigurationManager manager) { 825 // No children to register 826 } 827 828 public ThroughputController getCompactionThroughputController() { 829 return compactionThroughputController; 830 } 831 832 /** 833 * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long 834 * compaction thread pool from stealing job from short compaction queue 835 */ 836 void shutdownLongCompactions() { 837 this.longCompactions.shutdown(); 838 } 839 840 public void clearLongCompactionsQueue() { 841 longCompactions.getQueue().clear(); 842 } 843 844 public void clearShortCompactionsQueue() { 845 shortCompactions.getQueue().clear(); 846 } 847 848 public boolean isCompactionsEnabled() { 849 return compactionsEnabled; 850 } 851 852 public void setCompactionsEnabled(boolean compactionsEnabled) { 853 this.compactionsEnabled = compactionsEnabled; 854 this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION, String.valueOf(compactionsEnabled)); 855 } 856 857 /** Returns the longCompactions thread pool executor */ 858 ThreadPoolExecutor getLongCompactions() { 859 return longCompactions; 860 } 861 862 /** Returns the shortCompactions thread pool executor */ 863 ThreadPoolExecutor getShortCompactions() { 864 return shortCompactions; 865 } 866 867 private String getStoreNameForUnderCompaction(HStore store) { 868 return String.format("%s:%s", 869 store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "", 870 store.getColumnFamilyName()); 871 } 872 873}