001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
022
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Comparator;
027import java.util.Iterator;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.Executors;
033import java.util.concurrent.RejectedExecutionException;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.function.IntSupplier;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.conf.ConfigurationManager;
041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
049import org.apache.hadoop.hbase.security.Superusers;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.StealJobQueue;
053import org.apache.hadoop.ipc.RemoteException;
054import org.apache.hadoop.util.StringUtils;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062/**
063 * Compact region on request and then run split if appropriate
064 */
065@InterfaceAudience.Private
066public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
067  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
068
069  // Configuration key for the large compaction threads.
070  public final static String LARGE_COMPACTION_THREADS =
071    "hbase.regionserver.thread.compaction.large";
072  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
073
074  // Configuration key for the small compaction threads.
075  public final static String SMALL_COMPACTION_THREADS =
076    "hbase.regionserver.thread.compaction.small";
077  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
078
079  // Configuration key for split threads
080  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
081  public final static int SPLIT_THREADS_DEFAULT = 1;
082
083  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
084    "hbase.regionserver.regionSplitLimit";
085  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000;
086  public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
087    "hbase.regionserver.compaction.enabled";
088
089  private final HRegionServer server;
090  private final Configuration conf;
091  private volatile ThreadPoolExecutor longCompactions;
092  private volatile ThreadPoolExecutor shortCompactions;
093  private volatile ThreadPoolExecutor splits;
094
095  private volatile ThroughputController compactionThroughputController;
096  private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
097
098  private volatile boolean compactionsEnabled;
099  /**
100   * Splitting should not take place if the total number of regions exceed this. This is not a hard
101   * limit to the number of regions but it is a guideline to stop splitting after number of online
102   * regions is greater than this.
103   */
104  private int regionSplitLimit;
105
106  CompactSplit(HRegionServer server) {
107    this.server = server;
108    this.conf = server.getConfiguration();
109    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
110    createCompactionExecutors();
111    createSplitExcecutors();
112
113    // compaction throughput controller
114    this.compactionThroughputController =
115      CompactionThroughputControllerFactory.create(server, conf);
116  }
117
118  // only for test
119  public CompactSplit(Configuration conf) {
120    this.server = null;
121    this.conf = conf;
122    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
123    createCompactionExecutors();
124    createSplitExcecutors();
125  }
126
127  private void createSplitExcecutors() {
128    final String n = Thread.currentThread().getName();
129    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
130    this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
131      new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
132  }
133
134  private void createCompactionExecutors() {
135    this.regionSplitLimit =
136      conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
137
138    int largeThreads =
139      Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
140    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
141
142    // if we have throttle threads, make sure the user also specified size
143    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
144
145    final String n = Thread.currentThread().getName();
146
147    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
148    // Since the StealJobQueue inner uses the PriorityBlockingQueue,
149    // which is an unbounded blocking queue, we remove the RejectedExecutionHandler for
150    // the long and short compaction thread pool executors since HBASE-27332.
151    // If anyone who what to change the StealJobQueue to a bounded queue,
152    // please add the rejection handler back.
153    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
154      stealJobQueue,
155      new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
156    this.longCompactions.prestartAllCoreThreads();
157    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
158      stealJobQueue.getStealFromQueue(),
159      new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
160  }
161
162  @Override
163  public String toString() {
164    return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size()
165      + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue="
166      + splits.getQueue().size();
167  }
168
169  public String dumpQueue() {
170    StringBuilder queueLists = new StringBuilder();
171    queueLists.append("Compaction/Split Queue dump:\n");
172    queueLists.append("  LargeCompation Queue:\n");
173    BlockingQueue<Runnable> lq = longCompactions.getQueue();
174    Iterator<Runnable> it = lq.iterator();
175    while (it.hasNext()) {
176      queueLists.append("    " + it.next().toString());
177      queueLists.append("\n");
178    }
179
180    if (shortCompactions != null) {
181      queueLists.append("\n");
182      queueLists.append("  SmallCompation Queue:\n");
183      lq = shortCompactions.getQueue();
184      it = lq.iterator();
185      while (it.hasNext()) {
186        queueLists.append("    " + it.next().toString());
187        queueLists.append("\n");
188      }
189    }
190
191    queueLists.append("\n");
192    queueLists.append("  Split Queue:\n");
193    lq = splits.getQueue();
194    it = lq.iterator();
195    while (it.hasNext()) {
196      queueLists.append("    " + it.next().toString());
197      queueLists.append("\n");
198    }
199
200    return queueLists.toString();
201  }
202
203  public synchronized boolean requestSplit(final Region r) {
204    // Don't split regions that are blocking is the default behavior.
205    // But in some circumstances, split here is needed to prevent the region size from
206    // continuously growing, as well as the number of store files, see HBASE-26242.
207    HRegion hr = (HRegion) r;
208    try {
209      if (shouldSplitRegion(r.getRegionInfo()) && hr.getCompactPriority() >= PRIORITY_USER) {
210        byte[] midKey = hr.checkSplit().orElse(null);
211        if (midKey != null) {
212          requestSplit(r, midKey);
213          return true;
214        }
215      }
216    } catch (IndexOutOfBoundsException e) {
217      // We get this sometimes. Not sure why. Catch and return false; no split request.
218      LOG.warn("Catching out-of-bounds; region={}, policy={}",
219        hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e);
220    }
221    return false;
222  }
223
224  private synchronized void requestSplit(final Region r, byte[] midKey) {
225    requestSplit(r, midKey, null);
226  }
227
228  /*
229   * The User parameter allows the split thread to assume the correct user identity
230   */
231  private synchronized void requestSplit(final Region r, byte[] midKey, User user) {
232    if (midKey == null) {
233      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString()
234        + " not splittable because midkey=null");
235      return;
236    }
237    try {
238      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
239      if (LOG.isDebugEnabled()) {
240        LOG.debug("Splitting " + r + ", " + this);
241      }
242    } catch (RejectedExecutionException ree) {
243      LOG.info("Could not execute split for " + r, ree);
244    }
245  }
246
247  private void interrupt() {
248    longCompactions.shutdownNow();
249    shortCompactions.shutdownNow();
250  }
251
252  private void reInitializeCompactionsExecutors() {
253    createCompactionExecutors();
254  }
255
256  // set protected for test
257  protected interface CompactionCompleteTracker {
258
259    default void completed(Store store) {
260    }
261  }
262
263  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
264    new CompactionCompleteTracker() {
265    };
266
267  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
268
269    private final CompactionLifeCycleTracker tracker;
270
271    private final AtomicInteger remaining;
272
273    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
274      this.tracker = tracker;
275      this.remaining = new AtomicInteger(numberOfStores);
276    }
277
278    @Override
279    public void completed(Store store) {
280      if (remaining.decrementAndGet() == 0) {
281        tracker.completed();
282      }
283    }
284  }
285
286  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
287    IntSupplier numberOfStores) {
288    if (tracker == CompactionLifeCycleTracker.DUMMY) {
289      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
290      // the life cycle of a compaction.
291      return DUMMY_COMPLETE_TRACKER;
292    } else {
293      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
294    }
295  }
296
297  @Override
298  public synchronized void requestCompaction(HRegion region, String why, int priority,
299    CompactionLifeCycleTracker tracker, User user) throws IOException {
300    requestCompactionInternal(region, why, priority, true, tracker,
301      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
302  }
303
304  @Override
305  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
306    CompactionLifeCycleTracker tracker, User user) throws IOException {
307    requestCompactionInternal(region, store, why, priority, true, tracker,
308      getCompleteTracker(tracker, () -> 1), user);
309  }
310
311  @Override
312  public void switchCompaction(boolean onOrOff) {
313    if (onOrOff) {
314      // re-create executor pool if compactions are disabled.
315      if (!isCompactionsEnabled()) {
316        LOG.info("Re-Initializing compactions because user switched on compactions");
317        reInitializeCompactionsExecutors();
318      }
319      setCompactionsEnabled(onOrOff);
320      return;
321    }
322
323    setCompactionsEnabled(onOrOff);
324    LOG.info("Interrupting running compactions because user switched off compactions");
325    interrupt();
326  }
327
328  private void requestCompactionInternal(HRegion region, String why, int priority,
329    boolean selectNow, CompactionLifeCycleTracker tracker,
330    CompactionCompleteTracker completeTracker, User user) throws IOException {
331    // request compaction on all stores
332    for (HStore store : region.stores.values()) {
333      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
334        user);
335    }
336  }
337
338  // set protected for test
339  protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
340    boolean selectNow, CompactionLifeCycleTracker tracker,
341    CompactionCompleteTracker completeTracker, User user) throws IOException {
342    if (!this.isCompactionsEnabled()) {
343      LOG.info("Ignoring compaction request for " + region + ",because compaction is disabled.");
344      return;
345    }
346
347    if (
348      this.server.isStopped() || (region.getTableDescriptor() != null
349        && !region.getTableDescriptor().isCompactionEnabled())
350    ) {
351      return;
352    }
353    RegionServerSpaceQuotaManager spaceQuotaManager =
354      this.server.getRegionServerSpaceQuotaManager();
355
356    if (
357      user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
358        && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())
359    ) {
360      // Enter here only when:
361      // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
362      String reason = "Ignoring compaction request for " + region
363        + " as an active space quota violation " + " policy disallows compactions.";
364      tracker.notExecuted(store, reason);
365      completeTracker.completed(store);
366      LOG.debug(reason);
367      return;
368    }
369
370    CompactionContext compaction;
371    if (selectNow) {
372      Optional<CompactionContext> c =
373        selectCompaction(region, store, priority, tracker, completeTracker, user);
374      if (!c.isPresent()) {
375        // message logged inside
376        return;
377      }
378      compaction = c.get();
379    } else {
380      compaction = null;
381    }
382
383    ThreadPoolExecutor pool;
384    if (selectNow) {
385      // compaction.get is safe as we will just return if selectNow is true but no compaction is
386      // selected
387      pool = store.throttleCompaction(compaction.getRequest().getSize())
388        ? longCompactions
389        : shortCompactions;
390    } else {
391      // We assume that most compactions are small. So, put system compactions into small
392      // pool; we will do selection there, and move to large pool if necessary.
393      pool = shortCompactions;
394    }
395
396    // A simple implementation for under compaction marks.
397    // Since this method is always called in the synchronized methods, we do not need to use the
398    // boolean result to make sure that exactly the one that added here will be removed
399    // in the next steps.
400    underCompactionStores.add(getStoreNameForUnderCompaction(store));
401    pool.execute(
402      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
403    if (LOG.isDebugEnabled()) {
404      LOG.debug(
405        "Add compact mark for store {}, priority={}, current under compaction "
406          + "store size is {}",
407        getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
408    }
409    region.incrementCompactionsQueuedCount();
410    if (LOG.isDebugEnabled()) {
411      String type = (pool == shortCompactions) ? "Small " : "Large ";
412      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
413        + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
414    }
415  }
416
417  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
418    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
419      DUMMY_COMPLETE_TRACKER, null);
420  }
421
422  public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException {
423    requestSystemCompaction(region, store, why, false);
424  }
425
426  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
427    boolean giveUpIfRequestedOrCompacting) throws IOException {
428    if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
429      LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
430        store.getColumnFamilyName());
431      return;
432    }
433    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
434      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
435  }
436
437  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
438    CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
439    throws IOException {
440    // don't even select for compaction if disableCompactions is set to true
441    if (!isCompactionsEnabled()) {
442      LOG.info(String.format("User has disabled compactions"));
443      return Optional.empty();
444    }
445    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
446    if (!compaction.isPresent() && region.getRegionInfo() != null) {
447      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString()
448        + " because compaction request was cancelled";
449      tracker.notExecuted(store, reason);
450      completeTracker.completed(store);
451      LOG.debug(reason);
452    }
453    return compaction;
454  }
455
456  /**
457   * Only interrupt once it's done with a run through the work loop.
458   */
459  void interruptIfNecessary() {
460    splits.shutdown();
461    longCompactions.shutdown();
462    shortCompactions.shutdown();
463  }
464
465  private void waitFor(ThreadPoolExecutor t, String name) {
466    boolean done = false;
467    while (!done) {
468      try {
469        done = t.awaitTermination(60, TimeUnit.SECONDS);
470        LOG.info("Waiting for " + name + " to finish...");
471        if (!done) {
472          t.shutdownNow();
473        }
474      } catch (InterruptedException ie) {
475        LOG.warn("Interrupted waiting for " + name + " to finish...");
476        t.shutdownNow();
477      }
478    }
479  }
480
481  void join() {
482    waitFor(splits, "Split Thread");
483    waitFor(longCompactions, "Large Compaction Thread");
484    waitFor(shortCompactions, "Small Compaction Thread");
485  }
486
487  /**
488   * Returns the current size of the queue containing regions that are processed.
489   * @return The current size of the regions queue.
490   */
491  public int getCompactionQueueSize() {
492    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
493  }
494
495  public int getLargeCompactionQueueSize() {
496    return longCompactions.getQueue().size();
497  }
498
499  public int getSmallCompactionQueueSize() {
500    return shortCompactions.getQueue().size();
501  }
502
503  public int getSplitQueueSize() {
504    return splits.getQueue().size();
505  }
506
507  private boolean shouldSplitRegion(RegionInfo ri) {
508    if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) {
509      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
510        + "Please consider taking a look at "
511        + "https://hbase.apache.org/docs/operational-management/region-and-capacity#region-management");
512    }
513    return (regionSplitLimit > server.getNumberOfOnlineRegions()
514      // Do not attempt to split secondary region replicas, as this is not allowed and our request
515      // to do so will be rejected
516      && ri.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID);
517  }
518
519  /** Returns the regionSplitLimit */
520  public int getRegionSplitLimit() {
521    return this.regionSplitLimit;
522  }
523
524  /**
525   * Check if this store is under compaction
526   */
527  public boolean isUnderCompaction(final HStore s) {
528    return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
529  }
530
531  private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() {
532
533    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
534      if (r1 == r2) {
535        return 0; // they are the same request
536      }
537      // less first
538      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
539      if (cmp != 0) {
540        return cmp;
541      }
542      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
543      if (cmp != 0) {
544        return cmp;
545      }
546
547      // break the tie based on hash code
548      return System.identityHashCode(r1) - System.identityHashCode(r2);
549    }
550
551    @Override
552    public int compare(Runnable r1, Runnable r2) {
553      // CompactionRunner first
554      if (r1 instanceof CompactionRunner) {
555        if (!(r2 instanceof CompactionRunner)) {
556          return -1;
557        }
558      } else {
559        if (r2 instanceof CompactionRunner) {
560          return 1;
561        } else {
562          // break the tie based on hash code
563          return System.identityHashCode(r1) - System.identityHashCode(r2);
564        }
565      }
566      CompactionRunner o1 = (CompactionRunner) r1;
567      CompactionRunner o2 = (CompactionRunner) r2;
568      // less first
569      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
570      if (cmp != 0) {
571        return cmp;
572      }
573      CompactionContext c1 = o1.compaction;
574      CompactionContext c2 = o2.compaction;
575      if (c1 != null) {
576        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
577      } else {
578        return c2 != null ? 1 : 0;
579      }
580    }
581  };
582
583  private final class CompactionRunner implements Runnable {
584    private final HStore store;
585    private final HRegion region;
586    private final CompactionContext compaction;
587    private final CompactionLifeCycleTracker tracker;
588    private final CompactionCompleteTracker completeTracker;
589    private int queuedPriority;
590    private ThreadPoolExecutor parent;
591    private User user;
592    private long time;
593
594    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
595      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
596      ThreadPoolExecutor parent, User user) {
597      this.store = store;
598      this.region = region;
599      this.compaction = compaction;
600      this.tracker = tracker;
601      this.completeTracker = completeTracker;
602      this.queuedPriority =
603        compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
604      this.parent = parent;
605      this.user = user;
606      this.time = EnvironmentEdgeManager.currentTime();
607    }
608
609    @Override
610    public String toString() {
611      if (compaction != null) {
612        return "Request=" + compaction.getRequest();
613      } else {
614        return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority="
615          + queuedPriority + ", startTime=" + time;
616      }
617    }
618
619    private void doCompaction(User user) {
620      CompactionContext c;
621      // Common case - system compaction without a file selection. Select now.
622      if (compaction == null) {
623        int oldPriority = this.queuedPriority;
624        this.queuedPriority = this.store.getCompactPriority();
625        if (this.queuedPriority > oldPriority) {
626          // Store priority decreased while we were in queue (due to some other compaction?),
627          // requeue with new priority to avoid blocking potential higher priorities.
628          this.parent.execute(this);
629          return;
630        }
631        Optional<CompactionContext> selected;
632        try {
633          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
634            completeTracker, user);
635        } catch (IOException ex) {
636          LOG.error("Compaction selection failed " + this, ex);
637          server.checkFileSystem();
638          region.decrementCompactionsQueuedCount();
639          return;
640        }
641        if (!selected.isPresent()) {
642          region.decrementCompactionsQueuedCount();
643          return; // nothing to do
644        }
645        c = selected.get();
646        assert c.hasSelection();
647        // Now see if we are in correct pool for the size; if not, go to the correct one.
648        // We might end up waiting for a while, so cancel the selection.
649
650        ThreadPoolExecutor pool =
651          store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
652
653        // Long compaction pool can process small job
654        // Short compaction pool should not process large job
655        if (this.parent == shortCompactions && pool == longCompactions) {
656          this.store.cancelRequestedCompaction(c);
657          this.parent = pool;
658          this.parent.execute(this);
659          return;
660        }
661      } else {
662        c = compaction;
663      }
664      // Finally we can compact something.
665      assert c != null;
666
667      tracker.beforeExecution(store);
668      try {
669        // Note: please don't put single-compaction logic here;
670        // put it into region/store/etc. This is CST logic.
671        long start = EnvironmentEdgeManager.currentTime();
672        boolean completed = region.compact(c, store, compactionThroughputController, user);
673        long now = EnvironmentEdgeManager.currentTime();
674        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration="
675          + StringUtils.formatTimeDiff(now, start));
676        if (completed) {
677          // degenerate case: blocked regions require recursive enqueues
678          if (
679            region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0
680          ) {
681            requestSystemCompaction(region, store, "Recursive enqueue");
682          } else {
683            // see if the compaction has caused us to exceed max region size
684            if (!requestSplit(region) && store.getCompactPriority() <= 0) {
685              requestSystemCompaction(region, store, "Recursive enqueue");
686            }
687          }
688        }
689      } catch (IOException ex) {
690        IOException remoteEx =
691          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
692        LOG.error("Compaction failed " + this, remoteEx);
693        if (remoteEx != ex) {
694          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
695        }
696        region.reportCompactionRequestFailure();
697        server.checkFileSystem();
698      } catch (Exception ex) {
699        LOG.error("Compaction failed " + this, ex);
700        region.reportCompactionRequestFailure();
701        server.checkFileSystem();
702      } finally {
703        tracker.afterExecution(store);
704        completeTracker.completed(store);
705        region.decrementCompactionsQueuedCount();
706        LOG.debug("Status {}", CompactSplit.this);
707      }
708    }
709
710    @Override
711    public void run() {
712      try {
713        Preconditions.checkNotNull(server);
714        if (
715          server.isStopped() || (region.getTableDescriptor() != null
716            && !region.getTableDescriptor().isCompactionEnabled())
717        ) {
718          region.decrementCompactionsQueuedCount();
719          return;
720        }
721        doCompaction(user);
722      } finally {
723        if (LOG.isDebugEnabled()) {
724          LOG.debug("Remove under compaction mark for store: {}",
725            store.getHRegion().getRegionInfo().getEncodedName() + ":"
726              + store.getColumnFamilyName());
727        }
728        underCompactionStores.remove(getStoreNameForUnderCompaction(store));
729      }
730    }
731
732    private String formatStackTrace(Exception ex) {
733      StringWriter sw = new StringWriter();
734      PrintWriter pw = new PrintWriter(sw);
735      ex.printStackTrace(pw);
736      pw.flush();
737      return sw.toString();
738    }
739  }
740
741  /**
742   * {@inheritDoc}
743   */
744  @Override
745  public void onConfigurationChange(Configuration newConf) {
746    // Check if number of large / small compaction threads has changed, and then
747    // adjust the core pool size of the thread pools, by using the
748    // setCorePoolSize() method. According to the javadocs, it is safe to
749    // change the core pool size on-the-fly. We need to reset the maximum
750    // pool size, as well.
751    int largeThreads =
752      Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
753    if (this.longCompactions.getCorePoolSize() != largeThreads) {
754      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from "
755        + this.longCompactions.getCorePoolSize() + " to " + largeThreads);
756      if (this.longCompactions.getCorePoolSize() < largeThreads) {
757        this.longCompactions.setMaximumPoolSize(largeThreads);
758        this.longCompactions.setCorePoolSize(largeThreads);
759      } else {
760        this.longCompactions.setCorePoolSize(largeThreads);
761        this.longCompactions.setMaximumPoolSize(largeThreads);
762      }
763    }
764
765    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
766    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
767      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from "
768        + this.shortCompactions.getCorePoolSize() + " to " + smallThreads);
769      if (this.shortCompactions.getCorePoolSize() < smallThreads) {
770        this.shortCompactions.setMaximumPoolSize(smallThreads);
771        this.shortCompactions.setCorePoolSize(smallThreads);
772      } else {
773        this.shortCompactions.setCorePoolSize(smallThreads);
774        this.shortCompactions.setMaximumPoolSize(smallThreads);
775      }
776    }
777
778    int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
779    if (this.splits.getCorePoolSize() != splitThreads) {
780      LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize()
781        + " to " + splitThreads);
782      if (this.splits.getCorePoolSize() < splitThreads) {
783        this.splits.setMaximumPoolSize(splitThreads);
784        this.splits.setCorePoolSize(splitThreads);
785      } else {
786        this.splits.setCorePoolSize(splitThreads);
787        this.splits.setMaximumPoolSize(splitThreads);
788      }
789    }
790
791    ThroughputController old = this.compactionThroughputController;
792    if (old != null) {
793      old.stop("configuration change");
794    }
795    this.compactionThroughputController =
796      CompactionThroughputControllerFactory.create(server, newConf);
797
798    // We change this atomically here instead of reloading the config in order that upstream
799    // would be the only one with the flexibility to reload the config.
800    this.conf.reloadConfiguration();
801  }
802
803  protected int getSmallCompactionThreadNum() {
804    return this.shortCompactions.getCorePoolSize();
805  }
806
807  protected int getLargeCompactionThreadNum() {
808    return this.longCompactions.getCorePoolSize();
809  }
810
811  protected int getSplitThreadNum() {
812    return this.splits.getCorePoolSize();
813  }
814
815  /** Exposed for unit testing */
816  long getSubmittedSplitsCount() {
817    return this.splits.getTaskCount();
818  }
819
820  /**
821   * {@inheritDoc}
822   */
823  @Override
824  public void registerChildren(ConfigurationManager manager) {
825    // No children to register.
826  }
827
828  /**
829   * {@inheritDoc}
830   */
831  @Override
832  public void deregisterChildren(ConfigurationManager manager) {
833    // No children to register
834  }
835
836  public ThroughputController getCompactionThroughputController() {
837    return compactionThroughputController;
838  }
839
840  /**
841   * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long
842   * compaction thread pool from stealing job from short compaction queue
843   */
844  void shutdownLongCompactions() {
845    this.longCompactions.shutdown();
846  }
847
848  public void clearLongCompactionsQueue() {
849    longCompactions.getQueue().clear();
850  }
851
852  public void clearShortCompactionsQueue() {
853    shortCompactions.getQueue().clear();
854  }
855
856  public boolean isCompactionsEnabled() {
857    return compactionsEnabled;
858  }
859
860  public void setCompactionsEnabled(boolean compactionsEnabled) {
861    this.compactionsEnabled = compactionsEnabled;
862    this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled);
863  }
864
865  /** Returns the longCompactions thread pool executor */
866  ThreadPoolExecutor getLongCompactions() {
867    return longCompactions;
868  }
869
870  /** Returns the shortCompactions thread pool executor */
871  ThreadPoolExecutor getShortCompactions() {
872    return shortCompactions;
873  }
874
875  private String getStoreNameForUnderCompaction(HStore store) {
876    return String.format("%s:%s",
877      store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
878      store.getColumnFamilyName());
879  }
880
881}