001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
022
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Comparator;
027import java.util.Iterator;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.Executors;
033import java.util.concurrent.RejectedExecutionException;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.function.IntSupplier;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.conf.ConfigurationManager;
041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
049import org.apache.hadoop.hbase.security.Superusers;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.ConfigurationUtil;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.StealJobQueue;
054import org.apache.hadoop.ipc.RemoteException;
055import org.apache.hadoop.util.StringUtils;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
062
063/**
064 * Compact region on request and then run split if appropriate
065 */
066@InterfaceAudience.Private
067public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
068  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
069
070  // Configuration key for the large compaction threads.
071  public final static String LARGE_COMPACTION_THREADS =
072    "hbase.regionserver.thread.compaction.large";
073  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
074
075  // Configuration key for the small compaction threads.
076  public final static String SMALL_COMPACTION_THREADS =
077    "hbase.regionserver.thread.compaction.small";
078  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
079
080  // Configuration key for split threads
081  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
082  public final static int SPLIT_THREADS_DEFAULT = 1;
083
084  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
085    "hbase.regionserver.regionSplitLimit";
086  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000;
087  public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
088    "hbase.regionserver.compaction.enabled";
089
090  private final HRegionServer server;
091  private final Configuration conf;
092  private volatile ThreadPoolExecutor longCompactions;
093  private volatile ThreadPoolExecutor shortCompactions;
094  private volatile ThreadPoolExecutor splits;
095
096  private volatile ThroughputController compactionThroughputController;
097  private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
098
099  private volatile boolean compactionsEnabled;
100  /**
101   * Splitting should not take place if the total number of regions exceed this. This is not a hard
102   * limit to the number of regions but it is a guideline to stop splitting after number of online
103   * regions is greater than this.
104   */
105  private int regionSplitLimit;
106
107  CompactSplit(HRegionServer server) {
108    this.server = server;
109    this.conf = server.getConfiguration();
110    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
111    createCompactionExecutors();
112    createSplitExcecutors();
113
114    // compaction throughput controller
115    this.compactionThroughputController =
116      CompactionThroughputControllerFactory.create(server, conf);
117  }
118
119  // only for test
120  public CompactSplit(Configuration conf) {
121    this.server = null;
122    this.conf = conf;
123    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
124    createCompactionExecutors();
125    createSplitExcecutors();
126  }
127
128  private void createSplitExcecutors() {
129    final String n = Thread.currentThread().getName();
130    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
131    this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
132      new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
133  }
134
135  private void createCompactionExecutors() {
136    this.regionSplitLimit =
137      conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
138
139    int largeThreads =
140      Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
141    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
142
143    // if we have throttle threads, make sure the user also specified size
144    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
145
146    final String n = Thread.currentThread().getName();
147
148    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
149    // Since the StealJobQueue inner uses the PriorityBlockingQueue,
150    // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for
151    // the long and short compaction thread pool executors since HBASE-27332.
152    // If anyone who what to change the StealJobQueue to a bounded queue,
153    // please add the rejection handler back.
154    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
155      stealJobQueue,
156      new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
157    this.longCompactions.prestartAllCoreThreads();
158    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
159      stealJobQueue.getStealFromQueue(),
160      new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
161  }
162
163  @Override
164  public String toString() {
165    return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size()
166      + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue="
167      + splits.getQueue().size();
168  }
169
170  public String dumpQueue() {
171    StringBuilder queueLists = new StringBuilder();
172    queueLists.append("Compaction/Split Queue dump:\n");
173    queueLists.append("  LargeCompation Queue:\n");
174    BlockingQueue<Runnable> lq = longCompactions.getQueue();
175    Iterator<Runnable> it = lq.iterator();
176    while (it.hasNext()) {
177      queueLists.append("    " + it.next().toString());
178      queueLists.append("\n");
179    }
180
181    if (shortCompactions != null) {
182      queueLists.append("\n");
183      queueLists.append("  SmallCompation Queue:\n");
184      lq = shortCompactions.getQueue();
185      it = lq.iterator();
186      while (it.hasNext()) {
187        queueLists.append("    " + it.next().toString());
188        queueLists.append("\n");
189      }
190    }
191
192    queueLists.append("\n");
193    queueLists.append("  Split Queue:\n");
194    lq = splits.getQueue();
195    it = lq.iterator();
196    while (it.hasNext()) {
197      queueLists.append("    " + it.next().toString());
198      queueLists.append("\n");
199    }
200
201    return queueLists.toString();
202  }
203
204  public synchronized boolean requestSplit(final Region r) {
205    // Don't split regions that are blocking is the default behavior.
206    // But in some circumstances, split here is needed to prevent the region size from
207    // continuously growing, as well as the number of store files, see HBASE-26242.
208    HRegion hr = (HRegion) r;
209    try {
210      if (shouldSplitRegion(r.getRegionInfo()) && hr.getCompactPriority() >= PRIORITY_USER) {
211        byte[] midKey = hr.checkSplit().orElse(null);
212        if (midKey != null) {
213          requestSplit(r, midKey);
214          return true;
215        }
216      }
217    } catch (IndexOutOfBoundsException e) {
218      // We get this sometimes. Not sure why. Catch and return false; no split request.
219      LOG.warn("Catching out-of-bounds; region={}, policy={}",
220        hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e);
221    }
222    return false;
223  }
224
225  private synchronized void requestSplit(final Region r, byte[] midKey) {
226    requestSplit(r, midKey, null);
227  }
228
229  /*
230   * The User parameter allows the split thread to assume the correct user identity
231   */
232  private synchronized void requestSplit(final Region r, byte[] midKey, User user) {
233    if (midKey == null) {
234      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString()
235        + " not splittable because midkey=null");
236      return;
237    }
238    try {
239      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
240      if (LOG.isDebugEnabled()) {
241        LOG.debug("Splitting " + r + ", " + this);
242      }
243    } catch (RejectedExecutionException ree) {
244      LOG.info("Could not execute split for " + r, ree);
245    }
246  }
247
248  private void interrupt() {
249    longCompactions.shutdownNow();
250    shortCompactions.shutdownNow();
251  }
252
253  private void reInitializeCompactionsExecutors() {
254    createCompactionExecutors();
255  }
256
257  // set protected for test
258  protected interface CompactionCompleteTracker {
259
260    default void completed(Store store) {
261    }
262  }
263
264  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
265    new CompactionCompleteTracker() {
266    };
267
268  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
269
270    private final CompactionLifeCycleTracker tracker;
271
272    private final AtomicInteger remaining;
273
274    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
275      this.tracker = tracker;
276      this.remaining = new AtomicInteger(numberOfStores);
277    }
278
279    @Override
280    public void completed(Store store) {
281      if (remaining.decrementAndGet() == 0) {
282        tracker.completed();
283      }
284    }
285  }
286
287  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
288    IntSupplier numberOfStores) {
289    if (tracker == CompactionLifeCycleTracker.DUMMY) {
290      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
291      // the life cycle of a compaction.
292      return DUMMY_COMPLETE_TRACKER;
293    } else {
294      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
295    }
296  }
297
298  @Override
299  public synchronized void requestCompaction(HRegion region, String why, int priority,
300    CompactionLifeCycleTracker tracker, User user) throws IOException {
301    requestCompactionInternal(region, why, priority, true, tracker,
302      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
303  }
304
305  @Override
306  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
307    CompactionLifeCycleTracker tracker, User user) throws IOException {
308    requestCompactionInternal(region, store, why, priority, true, tracker,
309      getCompleteTracker(tracker, () -> 1), user);
310  }
311
312  @Override
313  public void switchCompaction(boolean onOrOff) {
314    if (onOrOff) {
315      // re-create executor pool if compactions are disabled.
316      if (!isCompactionsEnabled()) {
317        LOG.info("Re-Initializing compactions because user switched on compactions");
318        reInitializeCompactionsExecutors();
319      }
320      setCompactionsEnabled(onOrOff);
321      return;
322    }
323
324    setCompactionsEnabled(onOrOff);
325    LOG.info("Interrupting running compactions because user switched off compactions");
326    interrupt();
327  }
328
329  private void requestCompactionInternal(HRegion region, String why, int priority,
330    boolean selectNow, CompactionLifeCycleTracker tracker,
331    CompactionCompleteTracker completeTracker, User user) throws IOException {
332    // request compaction on all stores
333    for (HStore store : region.stores.values()) {
334      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
335        user);
336    }
337  }
338
339  // set protected for test
340  protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
341    boolean selectNow, CompactionLifeCycleTracker tracker,
342    CompactionCompleteTracker completeTracker, User user) throws IOException {
343    if (!this.isCompactionsEnabled()) {
344      LOG.info("Ignoring compaction request for " + region + ",because compaction is disabled.");
345      return;
346    }
347
348    // Should not allow compaction if cluster is in read-only mode
349    if (ConfigurationUtil.isReadOnlyModeEnabledInConf(conf)) {
350      LOG.info("Ignoring compaction request for " + region + ",because read-only mode is on.");
351      return;
352    }
353
354    if (
355      this.server.isStopped() || (region.getTableDescriptor() != null
356        && !region.getTableDescriptor().isCompactionEnabled())
357    ) {
358      return;
359    }
360    RegionServerSpaceQuotaManager spaceQuotaManager =
361      this.server.getRegionServerSpaceQuotaManager();
362
363    if (
364      user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
365        && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())
366    ) {
367      // Enter here only when:
368      // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
369      String reason = "Ignoring compaction request for " + region
370        + " as an active space quota violation " + " policy disallows compactions.";
371      tracker.notExecuted(store, reason);
372      completeTracker.completed(store);
373      LOG.debug(reason);
374      return;
375    }
376
377    CompactionContext compaction;
378    if (selectNow) {
379      Optional<CompactionContext> c =
380        selectCompaction(region, store, priority, tracker, completeTracker, user);
381      if (!c.isPresent()) {
382        // message logged inside
383        return;
384      }
385      compaction = c.get();
386    } else {
387      compaction = null;
388    }
389
390    ThreadPoolExecutor pool;
391    if (selectNow) {
392      // compaction.get is safe as we will just return if selectNow is true but no compaction is
393      // selected
394      pool = store.throttleCompaction(compaction.getRequest().getSize())
395        ? longCompactions
396        : shortCompactions;
397    } else {
398      // We assume that most compactions are small. So, put system compactions into small
399      // pool; we will do selection there, and move to large pool if necessary.
400      pool = shortCompactions;
401    }
402
403    // A simple implementation for under compaction marks.
404    // Since this method is always called in the synchronized methods, we do not need to use the
405    // boolean result to make sure that exactly the one that added here will be removed
406    // in the next steps.
407    underCompactionStores.add(getStoreNameForUnderCompaction(store));
408    pool.execute(
409      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
410    if (LOG.isDebugEnabled()) {
411      LOG.debug(
412        "Add compact mark for store {}, priority={}, current under compaction "
413          + "store size is {}",
414        getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
415    }
416    region.incrementCompactionsQueuedCount();
417    if (LOG.isDebugEnabled()) {
418      String type = (pool == shortCompactions) ? "Small " : "Large ";
419      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
420        + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
421    }
422  }
423
424  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
425    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
426      DUMMY_COMPLETE_TRACKER, null);
427  }
428
429  public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException {
430    requestSystemCompaction(region, store, why, false);
431  }
432
433  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
434    boolean giveUpIfRequestedOrCompacting) throws IOException {
435    if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
436      LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
437        store.getColumnFamilyName());
438      return;
439    }
440    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
441      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
442  }
443
444  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
445    CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
446    throws IOException {
447    // don't even select for compaction if disableCompactions is set to true
448    if (!isCompactionsEnabled()) {
449      LOG.info(String.format("User has disabled compactions"));
450      return Optional.empty();
451    }
452
453    // Should not allow compaction if cluster is in read-only mode
454    if (ConfigurationUtil.isReadOnlyModeEnabledInConf(conf)) {
455      LOG.info(String.format("Compaction request skipped as read-only mode is on"));
456      return Optional.empty();
457    }
458
459    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
460    if (!compaction.isPresent() && region.getRegionInfo() != null) {
461      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString()
462        + " because compaction request was cancelled";
463      tracker.notExecuted(store, reason);
464      completeTracker.completed(store);
465      LOG.debug(reason);
466    }
467    return compaction;
468  }
469
470  /**
471   * Only interrupt once it's done with a run through the work loop.
472   */
473  void interruptIfNecessary() {
474    splits.shutdown();
475    longCompactions.shutdown();
476    shortCompactions.shutdown();
477  }
478
479  private void waitFor(ThreadPoolExecutor t, String name) {
480    boolean done = false;
481    while (!done) {
482      try {
483        done = t.awaitTermination(60, TimeUnit.SECONDS);
484        LOG.info("Waiting for " + name + " to finish...");
485        if (!done) {
486          t.shutdownNow();
487        }
488      } catch (InterruptedException ie) {
489        LOG.warn("Interrupted waiting for " + name + " to finish...");
490        t.shutdownNow();
491      }
492    }
493  }
494
495  void join() {
496    waitFor(splits, "Split Thread");
497    waitFor(longCompactions, "Large Compaction Thread");
498    waitFor(shortCompactions, "Small Compaction Thread");
499  }
500
501  /**
502   * Returns the current size of the queue containing regions that are processed.
503   * @return The current size of the regions queue.
504   */
505  public int getCompactionQueueSize() {
506    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
507  }
508
509  public int getLargeCompactionQueueSize() {
510    return longCompactions.getQueue().size();
511  }
512
513  public int getSmallCompactionQueueSize() {
514    return shortCompactions.getQueue().size();
515  }
516
517  public int getSplitQueueSize() {
518    return splits.getQueue().size();
519  }
520
521  private boolean shouldSplitRegion(RegionInfo ri) {
522    if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) {
523      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
524        + "Please consider taking a look at "
525        + "https://hbase.apache.org/docs/operational-management/region-and-capacity#region-management");
526    }
527    return (regionSplitLimit > server.getNumberOfOnlineRegions()
528      // Do not attempt to split secondary region replicas, as this is not allowed and our request
529      // to do so will be rejected
530      && ri.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID);
531  }
532
533  /** Returns the regionSplitLimit */
534  public int getRegionSplitLimit() {
535    return this.regionSplitLimit;
536  }
537
538  /**
539   * Check if this store is under compaction
540   */
541  public boolean isUnderCompaction(final HStore s) {
542    return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
543  }
544
545  private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() {
546
547    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
548      if (r1 == r2) {
549        return 0; // they are the same request
550      }
551      // less first
552      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
553      if (cmp != 0) {
554        return cmp;
555      }
556      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
557      if (cmp != 0) {
558        return cmp;
559      }
560
561      // break the tie based on hash code
562      return System.identityHashCode(r1) - System.identityHashCode(r2);
563    }
564
565    @Override
566    public int compare(Runnable r1, Runnable r2) {
567      // CompactionRunner first
568      if (r1 instanceof CompactionRunner) {
569        if (!(r2 instanceof CompactionRunner)) {
570          return -1;
571        }
572      } else {
573        if (r2 instanceof CompactionRunner) {
574          return 1;
575        } else {
576          // break the tie based on hash code
577          return System.identityHashCode(r1) - System.identityHashCode(r2);
578        }
579      }
580      CompactionRunner o1 = (CompactionRunner) r1;
581      CompactionRunner o2 = (CompactionRunner) r2;
582      // less first
583      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
584      if (cmp != 0) {
585        return cmp;
586      }
587      CompactionContext c1 = o1.compaction;
588      CompactionContext c2 = o2.compaction;
589      if (c1 != null) {
590        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
591      } else {
592        return c2 != null ? 1 : 0;
593      }
594    }
595  };
596
597  private final class CompactionRunner implements Runnable {
598    private final HStore store;
599    private final HRegion region;
600    private final CompactionContext compaction;
601    private final CompactionLifeCycleTracker tracker;
602    private final CompactionCompleteTracker completeTracker;
603    private int queuedPriority;
604    private ThreadPoolExecutor parent;
605    private User user;
606    private long time;
607
608    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
609      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
610      ThreadPoolExecutor parent, User user) {
611      this.store = store;
612      this.region = region;
613      this.compaction = compaction;
614      this.tracker = tracker;
615      this.completeTracker = completeTracker;
616      this.queuedPriority =
617        compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
618      this.parent = parent;
619      this.user = user;
620      this.time = EnvironmentEdgeManager.currentTime();
621    }
622
623    @Override
624    public String toString() {
625      if (compaction != null) {
626        return "Request=" + compaction.getRequest();
627      } else {
628        return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority="
629          + queuedPriority + ", startTime=" + time;
630      }
631    }
632
633    private void doCompaction(User user) {
634      CompactionContext c;
635      // Common case - system compaction without a file selection. Select now.
636      if (compaction == null) {
637        int oldPriority = this.queuedPriority;
638        this.queuedPriority = this.store.getCompactPriority();
639        if (this.queuedPriority > oldPriority) {
640          // Store priority decreased while we were in queue (due to some other compaction?),
641          // requeue with new priority to avoid blocking potential higher priorities.
642          this.parent.execute(this);
643          return;
644        }
645        Optional<CompactionContext> selected;
646        try {
647          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
648            completeTracker, user);
649        } catch (IOException ex) {
650          LOG.error("Compaction selection failed " + this, ex);
651          server.checkFileSystem();
652          region.decrementCompactionsQueuedCount();
653          return;
654        }
655        if (!selected.isPresent()) {
656          region.decrementCompactionsQueuedCount();
657          return; // nothing to do
658        }
659        c = selected.get();
660        assert c.hasSelection();
661        // Now see if we are in correct pool for the size; if not, go to the correct one.
662        // We might end up waiting for a while, so cancel the selection.
663
664        ThreadPoolExecutor pool =
665          store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
666
667        // Long compaction pool can process small job
668        // Short compaction pool should not process large job
669        if (this.parent == shortCompactions && pool == longCompactions) {
670          this.store.cancelRequestedCompaction(c);
671          this.parent = pool;
672          this.parent.execute(this);
673          return;
674        }
675      } else {
676        c = compaction;
677      }
678      // Finally we can compact something.
679      assert c != null;
680
681      tracker.beforeExecution(store);
682      try {
683        // Note: please don't put single-compaction logic here;
684        // put it into region/store/etc. This is CST logic.
685        long start = EnvironmentEdgeManager.currentTime();
686        boolean completed = region.compact(c, store, compactionThroughputController, user);
687        long now = EnvironmentEdgeManager.currentTime();
688        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration="
689          + StringUtils.formatTimeDiff(now, start));
690        if (completed) {
691          // degenerate case: blocked regions require recursive enqueues
692          if (
693            region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0
694          ) {
695            requestSystemCompaction(region, store, "Recursive enqueue");
696          } else {
697            // see if the compaction has caused us to exceed max region size
698            if (!requestSplit(region) && store.getCompactPriority() <= 0) {
699              requestSystemCompaction(region, store, "Recursive enqueue");
700            }
701          }
702        }
703      } catch (IOException ex) {
704        IOException remoteEx =
705          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
706        LOG.error("Compaction failed " + this, remoteEx);
707        if (remoteEx != ex) {
708          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
709        }
710        region.reportCompactionRequestFailure();
711        server.checkFileSystem();
712      } catch (Exception ex) {
713        LOG.error("Compaction failed " + this, ex);
714        region.reportCompactionRequestFailure();
715        server.checkFileSystem();
716      } finally {
717        tracker.afterExecution(store);
718        completeTracker.completed(store);
719        region.decrementCompactionsQueuedCount();
720        LOG.debug("Status {}", CompactSplit.this);
721      }
722    }
723
724    @Override
725    public void run() {
726      try {
727        Preconditions.checkNotNull(server);
728        if (
729          server.isStopped() || (region.getTableDescriptor() != null
730            && !region.getTableDescriptor().isCompactionEnabled())
731        ) {
732          region.decrementCompactionsQueuedCount();
733          return;
734        }
735        doCompaction(user);
736      } finally {
737        if (LOG.isDebugEnabled()) {
738          LOG.debug("Remove under compaction mark for store: {}",
739            store.getHRegion().getRegionInfo().getEncodedName() + ":"
740              + store.getColumnFamilyName());
741        }
742        underCompactionStores.remove(getStoreNameForUnderCompaction(store));
743      }
744    }
745
746    private String formatStackTrace(Exception ex) {
747      StringWriter sw = new StringWriter();
748      PrintWriter pw = new PrintWriter(sw);
749      ex.printStackTrace(pw);
750      pw.flush();
751      return sw.toString();
752    }
753  }
754
755  /**
756   * {@inheritDoc}
757   */
758  @Override
759  public void onConfigurationChange(Configuration newConf) {
760    // Check if number of large / small compaction threads has changed, and then
761    // adjust the core pool size of the thread pools, by using the
762    // setCorePoolSize() method. According to the javadocs, it is safe to
763    // change the core pool size on-the-fly. We need to reset the maximum
764    // pool size, as well.
765    int largeThreads =
766      Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
767    if (this.longCompactions.getCorePoolSize() != largeThreads) {
768      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from "
769        + this.longCompactions.getCorePoolSize() + " to " + largeThreads);
770      if (this.longCompactions.getCorePoolSize() < largeThreads) {
771        this.longCompactions.setMaximumPoolSize(largeThreads);
772        this.longCompactions.setCorePoolSize(largeThreads);
773      } else {
774        this.longCompactions.setCorePoolSize(largeThreads);
775        this.longCompactions.setMaximumPoolSize(largeThreads);
776      }
777    }
778
779    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
780    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
781      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from "
782        + this.shortCompactions.getCorePoolSize() + " to " + smallThreads);
783      if (this.shortCompactions.getCorePoolSize() < smallThreads) {
784        this.shortCompactions.setMaximumPoolSize(smallThreads);
785        this.shortCompactions.setCorePoolSize(smallThreads);
786      } else {
787        this.shortCompactions.setCorePoolSize(smallThreads);
788        this.shortCompactions.setMaximumPoolSize(smallThreads);
789      }
790    }
791
792    int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
793    if (this.splits.getCorePoolSize() != splitThreads) {
794      LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize()
795        + " to " + splitThreads);
796      if (this.splits.getCorePoolSize() < splitThreads) {
797        this.splits.setMaximumPoolSize(splitThreads);
798        this.splits.setCorePoolSize(splitThreads);
799      } else {
800        this.splits.setCorePoolSize(splitThreads);
801        this.splits.setMaximumPoolSize(splitThreads);
802      }
803    }
804
805    ThroughputController old = this.compactionThroughputController;
806    if (old != null) {
807      old.stop("configuration change");
808    }
809    this.compactionThroughputController =
810      CompactionThroughputControllerFactory.create(server, newConf);
811
812    // We change this atomically here instead of reloading the config in order that upstream
813    // would be the only one with the flexibility to reload the config.
814    this.conf.reloadConfiguration();
815  }
816
817  protected int getSmallCompactionThreadNum() {
818    return this.shortCompactions.getCorePoolSize();
819  }
820
821  protected int getLargeCompactionThreadNum() {
822    return this.longCompactions.getCorePoolSize();
823  }
824
825  protected int getSplitThreadNum() {
826    return this.splits.getCorePoolSize();
827  }
828
829  /** Exposed for unit testing */
830  long getSubmittedSplitsCount() {
831    return this.splits.getTaskCount();
832  }
833
834  /**
835   * {@inheritDoc}
836   */
837  @Override
838  public void registerChildren(ConfigurationManager manager) {
839    // No children to register.
840  }
841
842  /**
843   * {@inheritDoc}
844   */
845  @Override
846  public void deregisterChildren(ConfigurationManager manager) {
847    // No children to register
848  }
849
850  public ThroughputController getCompactionThroughputController() {
851    return compactionThroughputController;
852  }
853
854  /**
855   * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long
856   * compaction thread pool from stealing job from short compaction queue
857   */
858  void shutdownLongCompactions() {
859    this.longCompactions.shutdown();
860  }
861
862  public void clearLongCompactionsQueue() {
863    longCompactions.getQueue().clear();
864  }
865
866  public void clearShortCompactionsQueue() {
867    shortCompactions.getQueue().clear();
868  }
869
870  public boolean isCompactionsEnabled() {
871    return compactionsEnabled;
872  }
873
874  public void setCompactionsEnabled(boolean compactionsEnabled) {
875    this.compactionsEnabled = compactionsEnabled;
876    this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled);
877  }
878
879  /** Returns the longCompactions thread pool executor */
880  ThreadPoolExecutor getLongCompactions() {
881    return longCompactions;
882  }
883
884  /** Returns the shortCompactions thread pool executor */
885  ThreadPoolExecutor getShortCompactions() {
886    return shortCompactions;
887  }
888
889  private String getStoreNameForUnderCompaction(HStore store) {
890    return String.format("%s:%s",
891      store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
892      store.getColumnFamilyName());
893  }
894
895}