001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
022
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Comparator;
027import java.util.Iterator;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.Executors;
033import java.util.concurrent.RejectedExecutionException;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.function.IntSupplier;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.conf.ConfigurationManager;
040import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
041import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
046import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
048import org.apache.hadoop.hbase.security.Superusers;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.StealJobQueue;
052import org.apache.hadoop.ipc.RemoteException;
053import org.apache.hadoop.util.StringUtils;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
060
061/**
062 * Compact region on request and then run split if appropriate
063 */
064@InterfaceAudience.Private
065public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
066  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
067
068  // Configuration key for the large compaction threads.
069  public final static String LARGE_COMPACTION_THREADS =
070    "hbase.regionserver.thread.compaction.large";
071  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
072
073  // Configuration key for the small compaction threads.
074  public final static String SMALL_COMPACTION_THREADS =
075    "hbase.regionserver.thread.compaction.small";
076  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
077
078  // Configuration key for split threads
079  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
080  public final static int SPLIT_THREADS_DEFAULT = 1;
081
082  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
083    "hbase.regionserver.regionSplitLimit";
084  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000;
085  public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
086    "hbase.regionserver.compaction.enabled";
087
088  private final HRegionServer server;
089  private final Configuration conf;
090  private volatile ThreadPoolExecutor longCompactions;
091  private volatile ThreadPoolExecutor shortCompactions;
092  private volatile ThreadPoolExecutor splits;
093
094  private volatile ThroughputController compactionThroughputController;
095  private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
096
097  private volatile boolean compactionsEnabled;
098  /**
099   * Splitting should not take place if the total number of regions exceed this. This is not a hard
100   * limit to the number of regions but it is a guideline to stop splitting after number of online
101   * regions is greater than this.
102   */
103  private int regionSplitLimit;
104
105  CompactSplit(HRegionServer server) {
106    this.server = server;
107    this.conf = server.getConfiguration();
108    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
109    createCompactionExecutors();
110    createSplitExcecutors();
111
112    // compaction throughput controller
113    this.compactionThroughputController =
114      CompactionThroughputControllerFactory.create(server, conf);
115  }
116
117  // only for test
118  public CompactSplit(Configuration conf) {
119    this.server = null;
120    this.conf = conf;
121    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
122    createCompactionExecutors();
123    createSplitExcecutors();
124  }
125
126  private void createSplitExcecutors() {
127    final String n = Thread.currentThread().getName();
128    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
129    this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
130      new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
131  }
132
133  private void createCompactionExecutors() {
134    this.regionSplitLimit =
135      conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
136
137    int largeThreads =
138      Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
139    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
140
141    // if we have throttle threads, make sure the user also specified size
142    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
143
144    final String n = Thread.currentThread().getName();
145
146    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
147    // Since the StealJobQueue inner uses the PriorityBlockingQueue,
148    // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for
149    // the long and short compaction thread pool executors since HBASE-27332.
150    // If anyone who what to change the StealJobQueue to a bounded queue,
151    // please add the rejection handler back.
152    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
153      stealJobQueue,
154      new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
155    this.longCompactions.prestartAllCoreThreads();
156    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
157      stealJobQueue.getStealFromQueue(),
158      new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
159  }
160
161  @Override
162  public String toString() {
163    return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size()
164      + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue="
165      + splits.getQueue().size();
166  }
167
168  public String dumpQueue() {
169    StringBuilder queueLists = new StringBuilder();
170    queueLists.append("Compaction/Split Queue dump:\n");
171    queueLists.append("  LargeCompation Queue:\n");
172    BlockingQueue<Runnable> lq = longCompactions.getQueue();
173    Iterator<Runnable> it = lq.iterator();
174    while (it.hasNext()) {
175      queueLists.append("    " + it.next().toString());
176      queueLists.append("\n");
177    }
178
179    if (shortCompactions != null) {
180      queueLists.append("\n");
181      queueLists.append("  SmallCompation Queue:\n");
182      lq = shortCompactions.getQueue();
183      it = lq.iterator();
184      while (it.hasNext()) {
185        queueLists.append("    " + it.next().toString());
186        queueLists.append("\n");
187      }
188    }
189
190    queueLists.append("\n");
191    queueLists.append("  Split Queue:\n");
192    lq = splits.getQueue();
193    it = lq.iterator();
194    while (it.hasNext()) {
195      queueLists.append("    " + it.next().toString());
196      queueLists.append("\n");
197    }
198
199    return queueLists.toString();
200  }
201
202  public synchronized boolean requestSplit(final Region r) {
203    // Don't split regions that are blocking is the default behavior.
204    // But in some circumstances, split here is needed to prevent the region size from
205    // continuously growing, as well as the number of store files, see HBASE-26242.
206    HRegion hr = (HRegion) r;
207    try {
208      if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) {
209        byte[] midKey = hr.checkSplit().orElse(null);
210        if (midKey != null) {
211          requestSplit(r, midKey);
212          return true;
213        }
214      }
215    } catch (IndexOutOfBoundsException e) {
216      // We get this sometimes. Not sure why. Catch and return false; no split request.
217      LOG.warn("Catching out-of-bounds; region={}, policy={}",
218        hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e);
219    }
220    return false;
221  }
222
223  private synchronized void requestSplit(final Region r, byte[] midKey) {
224    requestSplit(r, midKey, null);
225  }
226
227  /*
228   * The User parameter allows the split thread to assume the correct user identity
229   */
230  private synchronized void requestSplit(final Region r, byte[] midKey, User user) {
231    if (midKey == null) {
232      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString()
233        + " not splittable because midkey=null");
234      return;
235    }
236    try {
237      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
238      if (LOG.isDebugEnabled()) {
239        LOG.debug("Splitting " + r + ", " + this);
240      }
241    } catch (RejectedExecutionException ree) {
242      LOG.info("Could not execute split for " + r, ree);
243    }
244  }
245
246  private void interrupt() {
247    longCompactions.shutdownNow();
248    shortCompactions.shutdownNow();
249  }
250
251  private void reInitializeCompactionsExecutors() {
252    createCompactionExecutors();
253  }
254
255  // set protected for test
256  protected interface CompactionCompleteTracker {
257
258    default void completed(Store store) {
259    }
260  }
261
262  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
263    new CompactionCompleteTracker() {
264    };
265
266  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
267
268    private final CompactionLifeCycleTracker tracker;
269
270    private final AtomicInteger remaining;
271
272    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
273      this.tracker = tracker;
274      this.remaining = new AtomicInteger(numberOfStores);
275    }
276
277    @Override
278    public void completed(Store store) {
279      if (remaining.decrementAndGet() == 0) {
280        tracker.completed();
281      }
282    }
283  }
284
285  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
286    IntSupplier numberOfStores) {
287    if (tracker == CompactionLifeCycleTracker.DUMMY) {
288      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
289      // the life cycle of a compaction.
290      return DUMMY_COMPLETE_TRACKER;
291    } else {
292      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
293    }
294  }
295
296  @Override
297  public synchronized void requestCompaction(HRegion region, String why, int priority,
298    CompactionLifeCycleTracker tracker, User user) throws IOException {
299    requestCompactionInternal(region, why, priority, true, tracker,
300      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
301  }
302
303  @Override
304  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
305    CompactionLifeCycleTracker tracker, User user) throws IOException {
306    requestCompactionInternal(region, store, why, priority, true, tracker,
307      getCompleteTracker(tracker, () -> 1), user);
308  }
309
310  @Override
311  public void switchCompaction(boolean onOrOff) {
312    if (onOrOff) {
313      // re-create executor pool if compactions are disabled.
314      if (!isCompactionsEnabled()) {
315        LOG.info("Re-Initializing compactions because user switched on compactions");
316        reInitializeCompactionsExecutors();
317      }
318      setCompactionsEnabled(onOrOff);
319      return;
320    }
321
322    setCompactionsEnabled(onOrOff);
323    LOG.info("Interrupting running compactions because user switched off compactions");
324    interrupt();
325  }
326
327  private void requestCompactionInternal(HRegion region, String why, int priority,
328    boolean selectNow, CompactionLifeCycleTracker tracker,
329    CompactionCompleteTracker completeTracker, User user) throws IOException {
330    // request compaction on all stores
331    for (HStore store : region.stores.values()) {
332      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
333        user);
334    }
335  }
336
337  // set protected for test
338  protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
339    boolean selectNow, CompactionLifeCycleTracker tracker,
340    CompactionCompleteTracker completeTracker, User user) throws IOException {
341    if (!this.isCompactionsEnabled()) {
342      LOG.info("Ignoring compaction request for " + region + ",because compaction is disabled.");
343      return;
344    }
345
346    if (
347      this.server.isStopped() || (region.getTableDescriptor() != null
348        && !region.getTableDescriptor().isCompactionEnabled())
349    ) {
350      return;
351    }
352    RegionServerSpaceQuotaManager spaceQuotaManager =
353      this.server.getRegionServerSpaceQuotaManager();
354
355    if (
356      user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
357        && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())
358    ) {
359      // Enter here only when:
360      // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
361      String reason = "Ignoring compaction request for " + region
362        + " as an active space quota violation " + " policy disallows compactions.";
363      tracker.notExecuted(store, reason);
364      completeTracker.completed(store);
365      LOG.debug(reason);
366      return;
367    }
368
369    CompactionContext compaction;
370    if (selectNow) {
371      Optional<CompactionContext> c =
372        selectCompaction(region, store, priority, tracker, completeTracker, user);
373      if (!c.isPresent()) {
374        // message logged inside
375        return;
376      }
377      compaction = c.get();
378    } else {
379      compaction = null;
380    }
381
382    ThreadPoolExecutor pool;
383    if (selectNow) {
384      // compaction.get is safe as we will just return if selectNow is true but no compaction is
385      // selected
386      pool = store.throttleCompaction(compaction.getRequest().getSize())
387        ? longCompactions
388        : shortCompactions;
389    } else {
390      // We assume that most compactions are small. So, put system compactions into small
391      // pool; we will do selection there, and move to large pool if necessary.
392      pool = shortCompactions;
393    }
394
395    // A simple implementation for under compaction marks.
396    // Since this method is always called in the synchronized methods, we do not need to use the
397    // boolean result to make sure that exactly the one that added here will be removed
398    // in the next steps.
399    underCompactionStores.add(getStoreNameForUnderCompaction(store));
400    pool.execute(
401      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
402    if (LOG.isDebugEnabled()) {
403      LOG.debug(
404        "Add compact mark for store {}, priority={}, current under compaction "
405          + "store size is {}",
406        getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
407    }
408    region.incrementCompactionsQueuedCount();
409    if (LOG.isDebugEnabled()) {
410      String type = (pool == shortCompactions) ? "Small " : "Large ";
411      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
412        + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
413    }
414  }
415
416  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
417    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
418      DUMMY_COMPLETE_TRACKER, null);
419  }
420
421  public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException {
422    requestSystemCompaction(region, store, why, false);
423  }
424
425  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
426    boolean giveUpIfRequestedOrCompacting) throws IOException {
427    if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
428      LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
429        store.getColumnFamilyName());
430      return;
431    }
432    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
433      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
434  }
435
436  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
437    CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
438    throws IOException {
439    // don't even select for compaction if disableCompactions is set to true
440    if (!isCompactionsEnabled()) {
441      LOG.info(String.format("User has disabled compactions"));
442      return Optional.empty();
443    }
444    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
445    if (!compaction.isPresent() && region.getRegionInfo() != null) {
446      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString()
447        + " because compaction request was cancelled";
448      tracker.notExecuted(store, reason);
449      completeTracker.completed(store);
450      LOG.debug(reason);
451    }
452    return compaction;
453  }
454
455  /**
456   * Only interrupt once it's done with a run through the work loop.
457   */
458  void interruptIfNecessary() {
459    splits.shutdown();
460    longCompactions.shutdown();
461    shortCompactions.shutdown();
462  }
463
464  private void waitFor(ThreadPoolExecutor t, String name) {
465    boolean done = false;
466    while (!done) {
467      try {
468        done = t.awaitTermination(60, TimeUnit.SECONDS);
469        LOG.info("Waiting for " + name + " to finish...");
470        if (!done) {
471          t.shutdownNow();
472        }
473      } catch (InterruptedException ie) {
474        LOG.warn("Interrupted waiting for " + name + " to finish...");
475        t.shutdownNow();
476      }
477    }
478  }
479
480  void join() {
481    waitFor(splits, "Split Thread");
482    waitFor(longCompactions, "Large Compaction Thread");
483    waitFor(shortCompactions, "Small Compaction Thread");
484  }
485
486  /**
487   * Returns the current size of the queue containing regions that are processed.
488   * @return The current size of the regions queue.
489   */
490  public int getCompactionQueueSize() {
491    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
492  }
493
494  public int getLargeCompactionQueueSize() {
495    return longCompactions.getQueue().size();
496  }
497
498  public int getSmallCompactionQueueSize() {
499    return shortCompactions.getQueue().size();
500  }
501
502  public int getSplitQueueSize() {
503    return splits.getQueue().size();
504  }
505
506  private boolean shouldSplitRegion() {
507    if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) {
508      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
509        + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
510    }
511    return (regionSplitLimit > server.getNumberOfOnlineRegions());
512  }
513
514  /** Returns the regionSplitLimit */
515  public int getRegionSplitLimit() {
516    return this.regionSplitLimit;
517  }
518
519  /**
520   * Check if this store is under compaction
521   */
522  public boolean isUnderCompaction(final HStore s) {
523    return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
524  }
525
526  private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() {
527
528    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
529      if (r1 == r2) {
530        return 0; // they are the same request
531      }
532      // less first
533      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
534      if (cmp != 0) {
535        return cmp;
536      }
537      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
538      if (cmp != 0) {
539        return cmp;
540      }
541
542      // break the tie based on hash code
543      return System.identityHashCode(r1) - System.identityHashCode(r2);
544    }
545
546    @Override
547    public int compare(Runnable r1, Runnable r2) {
548      // CompactionRunner first
549      if (r1 instanceof CompactionRunner) {
550        if (!(r2 instanceof CompactionRunner)) {
551          return -1;
552        }
553      } else {
554        if (r2 instanceof CompactionRunner) {
555          return 1;
556        } else {
557          // break the tie based on hash code
558          return System.identityHashCode(r1) - System.identityHashCode(r2);
559        }
560      }
561      CompactionRunner o1 = (CompactionRunner) r1;
562      CompactionRunner o2 = (CompactionRunner) r2;
563      // less first
564      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
565      if (cmp != 0) {
566        return cmp;
567      }
568      CompactionContext c1 = o1.compaction;
569      CompactionContext c2 = o2.compaction;
570      if (c1 != null) {
571        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
572      } else {
573        return c2 != null ? 1 : 0;
574      }
575    }
576  };
577
578  private final class CompactionRunner implements Runnable {
579    private final HStore store;
580    private final HRegion region;
581    private final CompactionContext compaction;
582    private final CompactionLifeCycleTracker tracker;
583    private final CompactionCompleteTracker completeTracker;
584    private int queuedPriority;
585    private ThreadPoolExecutor parent;
586    private User user;
587    private long time;
588
589    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
590      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
591      ThreadPoolExecutor parent, User user) {
592      this.store = store;
593      this.region = region;
594      this.compaction = compaction;
595      this.tracker = tracker;
596      this.completeTracker = completeTracker;
597      this.queuedPriority =
598        compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
599      this.parent = parent;
600      this.user = user;
601      this.time = EnvironmentEdgeManager.currentTime();
602    }
603
604    @Override
605    public String toString() {
606      if (compaction != null) {
607        return "Request=" + compaction.getRequest();
608      } else {
609        return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority="
610          + queuedPriority + ", startTime=" + time;
611      }
612    }
613
614    private void doCompaction(User user) {
615      CompactionContext c;
616      // Common case - system compaction without a file selection. Select now.
617      if (compaction == null) {
618        int oldPriority = this.queuedPriority;
619        this.queuedPriority = this.store.getCompactPriority();
620        if (this.queuedPriority > oldPriority) {
621          // Store priority decreased while we were in queue (due to some other compaction?),
622          // requeue with new priority to avoid blocking potential higher priorities.
623          this.parent.execute(this);
624          return;
625        }
626        Optional<CompactionContext> selected;
627        try {
628          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
629            completeTracker, user);
630        } catch (IOException ex) {
631          LOG.error("Compaction selection failed " + this, ex);
632          server.checkFileSystem();
633          region.decrementCompactionsQueuedCount();
634          return;
635        }
636        if (!selected.isPresent()) {
637          region.decrementCompactionsQueuedCount();
638          return; // nothing to do
639        }
640        c = selected.get();
641        assert c.hasSelection();
642        // Now see if we are in correct pool for the size; if not, go to the correct one.
643        // We might end up waiting for a while, so cancel the selection.
644
645        ThreadPoolExecutor pool =
646          store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
647
648        // Long compaction pool can process small job
649        // Short compaction pool should not process large job
650        if (this.parent == shortCompactions && pool == longCompactions) {
651          this.store.cancelRequestedCompaction(c);
652          this.parent = pool;
653          this.parent.execute(this);
654          return;
655        }
656      } else {
657        c = compaction;
658      }
659      // Finally we can compact something.
660      assert c != null;
661
662      tracker.beforeExecution(store);
663      try {
664        // Note: please don't put single-compaction logic here;
665        // put it into region/store/etc. This is CST logic.
666        long start = EnvironmentEdgeManager.currentTime();
667        boolean completed = region.compact(c, store, compactionThroughputController, user);
668        long now = EnvironmentEdgeManager.currentTime();
669        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration="
670          + StringUtils.formatTimeDiff(now, start));
671        if (completed) {
672          // degenerate case: blocked regions require recursive enqueues
673          if (
674            region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0
675          ) {
676            requestSystemCompaction(region, store, "Recursive enqueue");
677          } else {
678            // see if the compaction has caused us to exceed max region size
679            if (!requestSplit(region) && store.getCompactPriority() <= 0) {
680              requestSystemCompaction(region, store, "Recursive enqueue");
681            }
682          }
683        }
684      } catch (IOException ex) {
685        IOException remoteEx =
686          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
687        LOG.error("Compaction failed " + this, remoteEx);
688        if (remoteEx != ex) {
689          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
690        }
691        region.reportCompactionRequestFailure();
692        server.checkFileSystem();
693      } catch (Exception ex) {
694        LOG.error("Compaction failed " + this, ex);
695        region.reportCompactionRequestFailure();
696        server.checkFileSystem();
697      } finally {
698        tracker.afterExecution(store);
699        completeTracker.completed(store);
700        region.decrementCompactionsQueuedCount();
701        LOG.debug("Status {}", CompactSplit.this);
702      }
703    }
704
705    @Override
706    public void run() {
707      try {
708        Preconditions.checkNotNull(server);
709        if (
710          server.isStopped() || (region.getTableDescriptor() != null
711            && !region.getTableDescriptor().isCompactionEnabled())
712        ) {
713          region.decrementCompactionsQueuedCount();
714          return;
715        }
716        doCompaction(user);
717      } finally {
718        if (LOG.isDebugEnabled()) {
719          LOG.debug("Remove under compaction mark for store: {}",
720            store.getHRegion().getRegionInfo().getEncodedName() + ":"
721              + store.getColumnFamilyName());
722        }
723        underCompactionStores.remove(getStoreNameForUnderCompaction(store));
724      }
725    }
726
727    private String formatStackTrace(Exception ex) {
728      StringWriter sw = new StringWriter();
729      PrintWriter pw = new PrintWriter(sw);
730      ex.printStackTrace(pw);
731      pw.flush();
732      return sw.toString();
733    }
734  }
735
736  /**
737   * {@inheritDoc}
738   */
739  @Override
740  public void onConfigurationChange(Configuration newConf) {
741    // Check if number of large / small compaction threads has changed, and then
742    // adjust the core pool size of the thread pools, by using the
743    // setCorePoolSize() method. According to the javadocs, it is safe to
744    // change the core pool size on-the-fly. We need to reset the maximum
745    // pool size, as well.
746    int largeThreads =
747      Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
748    if (this.longCompactions.getCorePoolSize() != largeThreads) {
749      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from "
750        + this.longCompactions.getCorePoolSize() + " to " + largeThreads);
751      if (this.longCompactions.getCorePoolSize() < largeThreads) {
752        this.longCompactions.setMaximumPoolSize(largeThreads);
753        this.longCompactions.setCorePoolSize(largeThreads);
754      } else {
755        this.longCompactions.setCorePoolSize(largeThreads);
756        this.longCompactions.setMaximumPoolSize(largeThreads);
757      }
758    }
759
760    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
761    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
762      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from "
763        + this.shortCompactions.getCorePoolSize() + " to " + smallThreads);
764      if (this.shortCompactions.getCorePoolSize() < smallThreads) {
765        this.shortCompactions.setMaximumPoolSize(smallThreads);
766        this.shortCompactions.setCorePoolSize(smallThreads);
767      } else {
768        this.shortCompactions.setCorePoolSize(smallThreads);
769        this.shortCompactions.setMaximumPoolSize(smallThreads);
770      }
771    }
772
773    int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
774    if (this.splits.getCorePoolSize() != splitThreads) {
775      LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize()
776        + " to " + splitThreads);
777      if (this.splits.getCorePoolSize() < splitThreads) {
778        this.splits.setMaximumPoolSize(splitThreads);
779        this.splits.setCorePoolSize(splitThreads);
780      } else {
781        this.splits.setCorePoolSize(splitThreads);
782        this.splits.setMaximumPoolSize(splitThreads);
783      }
784    }
785
786    ThroughputController old = this.compactionThroughputController;
787    if (old != null) {
788      old.stop("configuration change");
789    }
790    this.compactionThroughputController =
791      CompactionThroughputControllerFactory.create(server, newConf);
792
793    // We change this atomically here instead of reloading the config in order that upstream
794    // would be the only one with the flexibility to reload the config.
795    this.conf.reloadConfiguration();
796  }
797
798  protected int getSmallCompactionThreadNum() {
799    return this.shortCompactions.getCorePoolSize();
800  }
801
802  protected int getLargeCompactionThreadNum() {
803    return this.longCompactions.getCorePoolSize();
804  }
805
806  protected int getSplitThreadNum() {
807    return this.splits.getCorePoolSize();
808  }
809
810  /**
811   * {@inheritDoc}
812   */
813  @Override
814  public void registerChildren(ConfigurationManager manager) {
815    // No children to register.
816  }
817
818  /**
819   * {@inheritDoc}
820   */
821  @Override
822  public void deregisterChildren(ConfigurationManager manager) {
823    // No children to register
824  }
825
826  public ThroughputController getCompactionThroughputController() {
827    return compactionThroughputController;
828  }
829
830  /**
831   * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long
832   * compaction thread pool from stealing job from short compaction queue
833   */
834  void shutdownLongCompactions() {
835    this.longCompactions.shutdown();
836  }
837
838  public void clearLongCompactionsQueue() {
839    longCompactions.getQueue().clear();
840  }
841
842  public void clearShortCompactionsQueue() {
843    shortCompactions.getQueue().clear();
844  }
845
846  public boolean isCompactionsEnabled() {
847    return compactionsEnabled;
848  }
849
850  public void setCompactionsEnabled(boolean compactionsEnabled) {
851    this.compactionsEnabled = compactionsEnabled;
852    this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled);
853  }
854
855  /** Returns the longCompactions thread pool executor */
856  ThreadPoolExecutor getLongCompactions() {
857    return longCompactions;
858  }
859
860  /** Returns the shortCompactions thread pool executor */
861  ThreadPoolExecutor getShortCompactions() {
862    return shortCompactions;
863  }
864
865  private String getStoreNameForUnderCompaction(HStore store) {
866    return String.format("%s:%s",
867      store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
868      store.getColumnFamilyName());
869  }
870
871}