001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
021import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
022
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Comparator;
027import java.util.Iterator;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.Executors;
033import java.util.concurrent.RejectedExecutionException;
034import java.util.concurrent.RejectedExecutionHandler;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicInteger;
038import java.util.function.IntSupplier;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.conf.ConfigurationManager;
041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
049import org.apache.hadoop.hbase.security.Superusers;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.StealJobQueue;
053import org.apache.hadoop.ipc.RemoteException;
054import org.apache.hadoop.util.StringUtils;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062/**
063 * Compact region on request and then run split if appropriate
064 */
065@InterfaceAudience.Private
066public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
067  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
068
069  // Configuration key for the large compaction threads.
070  public final static String LARGE_COMPACTION_THREADS =
071    "hbase.regionserver.thread.compaction.large";
072  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
073
074  // Configuration key for the small compaction threads.
075  public final static String SMALL_COMPACTION_THREADS =
076    "hbase.regionserver.thread.compaction.small";
077  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
078
079  // Configuration key for split threads
080  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
081  public final static int SPLIT_THREADS_DEFAULT = 1;
082
083  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
084    "hbase.regionserver.regionSplitLimit";
085  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000;
086  public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
087    "hbase.regionserver.compaction.enabled";
088
089  private final HRegionServer server;
090  private final Configuration conf;
091  private volatile ThreadPoolExecutor longCompactions;
092  private volatile ThreadPoolExecutor shortCompactions;
093  private volatile ThreadPoolExecutor splits;
094
095  private volatile ThroughputController compactionThroughputController;
096  private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
097
098  private volatile boolean compactionsEnabled;
099  /**
100   * Splitting should not take place if the total number of regions exceed this. This is not a hard
101   * limit to the number of regions but it is a guideline to stop splitting after number of online
102   * regions is greater than this.
103   */
104  private int regionSplitLimit;
105
106  CompactSplit(HRegionServer server) {
107    this.server = server;
108    this.conf = server.getConfiguration();
109    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
110    createCompactionExecutors();
111    createSplitExcecutors();
112
113    // compaction throughput controller
114    this.compactionThroughputController =
115      CompactionThroughputControllerFactory.create(server, conf);
116  }
117
118  // only for test
119  public CompactSplit(Configuration conf) {
120    this.server = null;
121    this.conf = conf;
122    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
123    createCompactionExecutors();
124    createSplitExcecutors();
125  }
126
127  private void createSplitExcecutors() {
128    final String n = Thread.currentThread().getName();
129    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
130    this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
131      new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
132  }
133
134  private void createCompactionExecutors() {
135    this.regionSplitLimit =
136      conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
137
138    int largeThreads =
139      Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
140    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
141
142    // if we have throttle threads, make sure the user also specified size
143    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
144
145    final String n = Thread.currentThread().getName();
146
147    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
148    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
149      stealJobQueue,
150      new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
151    this.longCompactions.setRejectedExecutionHandler(new Rejection());
152    this.longCompactions.prestartAllCoreThreads();
153    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
154      stealJobQueue.getStealFromQueue(),
155      new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
156    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
157  }
158
159  @Override
160  public String toString() {
161    return "compactionQueue=(longCompactions=" + longCompactions.getQueue().size()
162      + ":shortCompactions=" + shortCompactions.getQueue().size() + ")" + ", splitQueue="
163      + splits.getQueue().size();
164  }
165
166  public String dumpQueue() {
167    StringBuilder queueLists = new StringBuilder();
168    queueLists.append("Compaction/Split Queue dump:\n");
169    queueLists.append("  LargeCompation Queue:\n");
170    BlockingQueue<Runnable> lq = longCompactions.getQueue();
171    Iterator<Runnable> it = lq.iterator();
172    while (it.hasNext()) {
173      queueLists.append("    " + it.next().toString());
174      queueLists.append("\n");
175    }
176
177    if (shortCompactions != null) {
178      queueLists.append("\n");
179      queueLists.append("  SmallCompation Queue:\n");
180      lq = shortCompactions.getQueue();
181      it = lq.iterator();
182      while (it.hasNext()) {
183        queueLists.append("    " + it.next().toString());
184        queueLists.append("\n");
185      }
186    }
187
188    queueLists.append("\n");
189    queueLists.append("  Split Queue:\n");
190    lq = splits.getQueue();
191    it = lq.iterator();
192    while (it.hasNext()) {
193      queueLists.append("    " + it.next().toString());
194      queueLists.append("\n");
195    }
196
197    return queueLists.toString();
198  }
199
200  public synchronized boolean requestSplit(final Region r) {
201    // Don't split regions that are blocking is the default behavior.
202    // But in some circumstances, split here is needed to prevent the region size from
203    // continuously growing, as well as the number of store files, see HBASE-26242.
204    HRegion hr = (HRegion) r;
205    try {
206      if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) {
207        byte[] midKey = hr.checkSplit().orElse(null);
208        if (midKey != null) {
209          requestSplit(r, midKey);
210          return true;
211        }
212      }
213    } catch (IndexOutOfBoundsException e) {
214      // We get this sometimes. Not sure why. Catch and return false; no split request.
215      LOG.warn("Catching out-of-bounds; region={}, policy={}",
216        hr == null ? null : hr.getRegionInfo(), hr == null ? "null" : hr.getCompactPriority(), e);
217    }
218    return false;
219  }
220
221  private synchronized void requestSplit(final Region r, byte[] midKey) {
222    requestSplit(r, midKey, null);
223  }
224
225  /*
226   * The User parameter allows the split thread to assume the correct user identity
227   */
228  private synchronized void requestSplit(final Region r, byte[] midKey, User user) {
229    if (midKey == null) {
230      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString()
231        + " not splittable because midkey=null");
232      return;
233    }
234    try {
235      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
236      if (LOG.isDebugEnabled()) {
237        LOG.debug("Splitting " + r + ", " + this);
238      }
239    } catch (RejectedExecutionException ree) {
240      LOG.info("Could not execute split for " + r, ree);
241    }
242  }
243
244  private void interrupt() {
245    longCompactions.shutdownNow();
246    shortCompactions.shutdownNow();
247  }
248
249  private void reInitializeCompactionsExecutors() {
250    createCompactionExecutors();
251  }
252
253  // set protected for test
254  protected interface CompactionCompleteTracker {
255
256    default void completed(Store store) {
257    }
258  }
259
260  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
261    new CompactionCompleteTracker() {
262    };
263
264  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
265
266    private final CompactionLifeCycleTracker tracker;
267
268    private final AtomicInteger remaining;
269
270    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
271      this.tracker = tracker;
272      this.remaining = new AtomicInteger(numberOfStores);
273    }
274
275    @Override
276    public void completed(Store store) {
277      if (remaining.decrementAndGet() == 0) {
278        tracker.completed();
279      }
280    }
281  }
282
283  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
284    IntSupplier numberOfStores) {
285    if (tracker == CompactionLifeCycleTracker.DUMMY) {
286      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
287      // the life cycle of a compaction.
288      return DUMMY_COMPLETE_TRACKER;
289    } else {
290      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
291    }
292  }
293
294  @Override
295  public synchronized void requestCompaction(HRegion region, String why, int priority,
296    CompactionLifeCycleTracker tracker, User user) throws IOException {
297    requestCompactionInternal(region, why, priority, true, tracker,
298      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
299  }
300
301  @Override
302  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
303    CompactionLifeCycleTracker tracker, User user) throws IOException {
304    requestCompactionInternal(region, store, why, priority, true, tracker,
305      getCompleteTracker(tracker, () -> 1), user);
306  }
307
308  @Override
309  public void switchCompaction(boolean onOrOff) {
310    if (onOrOff) {
311      // re-create executor pool if compactions are disabled.
312      if (!isCompactionsEnabled()) {
313        LOG.info("Re-Initializing compactions because user switched on compactions");
314        reInitializeCompactionsExecutors();
315      }
316    } else {
317      LOG.info("Interrupting running compactions because user switched off compactions");
318      interrupt();
319    }
320    setCompactionsEnabled(onOrOff);
321  }
322
323  private void requestCompactionInternal(HRegion region, String why, int priority,
324    boolean selectNow, CompactionLifeCycleTracker tracker,
325    CompactionCompleteTracker completeTracker, User user) throws IOException {
326    // request compaction on all stores
327    for (HStore store : region.stores.values()) {
328      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
329        user);
330    }
331  }
332
333  // set protected for test
334  protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
335    boolean selectNow, CompactionLifeCycleTracker tracker,
336    CompactionCompleteTracker completeTracker, User user) throws IOException {
337    if (
338      this.server.isStopped() || (region.getTableDescriptor() != null
339        && !region.getTableDescriptor().isCompactionEnabled())
340    ) {
341      return;
342    }
343    RegionServerSpaceQuotaManager spaceQuotaManager =
344      this.server.getRegionServerSpaceQuotaManager();
345
346    if (
347      user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
348        && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())
349    ) {
350      // Enter here only when:
351      // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
352      String reason = "Ignoring compaction request for " + region
353        + " as an active space quota violation " + " policy disallows compactions.";
354      tracker.notExecuted(store, reason);
355      completeTracker.completed(store);
356      LOG.debug(reason);
357      return;
358    }
359
360    CompactionContext compaction;
361    if (selectNow) {
362      Optional<CompactionContext> c =
363        selectCompaction(region, store, priority, tracker, completeTracker, user);
364      if (!c.isPresent()) {
365        // message logged inside
366        return;
367      }
368      compaction = c.get();
369    } else {
370      compaction = null;
371    }
372
373    ThreadPoolExecutor pool;
374    if (selectNow) {
375      // compaction.get is safe as we will just return if selectNow is true but no compaction is
376      // selected
377      pool = store.throttleCompaction(compaction.getRequest().getSize())
378        ? longCompactions
379        : shortCompactions;
380    } else {
381      // We assume that most compactions are small. So, put system compactions into small
382      // pool; we will do selection there, and move to large pool if necessary.
383      pool = shortCompactions;
384    }
385    pool.execute(
386      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
387    if (LOG.isDebugEnabled()) {
388      LOG.debug(
389        "Add compact mark for store {}, priority={}, current under compaction "
390          + "store size is {}",
391        getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
392    }
393    underCompactionStores.add(getStoreNameForUnderCompaction(store));
394    region.incrementCompactionsQueuedCount();
395    if (LOG.isDebugEnabled()) {
396      String type = (pool == shortCompactions) ? "Small " : "Large ";
397      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
398        + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
399    }
400  }
401
402  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
403    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
404      DUMMY_COMPLETE_TRACKER, null);
405  }
406
407  public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException {
408    requestSystemCompaction(region, store, why, false);
409  }
410
411  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
412    boolean giveUpIfRequestedOrCompacting) throws IOException {
413    if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
414      LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
415        store.getColumnFamilyName());
416      return;
417    }
418    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
419      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
420  }
421
422  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
423    CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
424    throws IOException {
425    // don't even select for compaction if disableCompactions is set to true
426    if (!isCompactionsEnabled()) {
427      LOG.info(String.format("User has disabled compactions"));
428      return Optional.empty();
429    }
430    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
431    if (!compaction.isPresent() && region.getRegionInfo() != null) {
432      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString()
433        + " because compaction request was cancelled";
434      tracker.notExecuted(store, reason);
435      completeTracker.completed(store);
436      LOG.debug(reason);
437    }
438    return compaction;
439  }
440
441  /**
442   * Only interrupt once it's done with a run through the work loop.
443   */
444  void interruptIfNecessary() {
445    splits.shutdown();
446    longCompactions.shutdown();
447    shortCompactions.shutdown();
448  }
449
450  private void waitFor(ThreadPoolExecutor t, String name) {
451    boolean done = false;
452    while (!done) {
453      try {
454        done = t.awaitTermination(60, TimeUnit.SECONDS);
455        LOG.info("Waiting for " + name + " to finish...");
456        if (!done) {
457          t.shutdownNow();
458        }
459      } catch (InterruptedException ie) {
460        LOG.warn("Interrupted waiting for " + name + " to finish...");
461        t.shutdownNow();
462      }
463    }
464  }
465
466  void join() {
467    waitFor(splits, "Split Thread");
468    waitFor(longCompactions, "Large Compaction Thread");
469    waitFor(shortCompactions, "Small Compaction Thread");
470  }
471
472  /**
473   * Returns the current size of the queue containing regions that are processed.
474   * @return The current size of the regions queue.
475   */
476  public int getCompactionQueueSize() {
477    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
478  }
479
480  public int getLargeCompactionQueueSize() {
481    return longCompactions.getQueue().size();
482  }
483
484  public int getSmallCompactionQueueSize() {
485    return shortCompactions.getQueue().size();
486  }
487
488  public int getSplitQueueSize() {
489    return splits.getQueue().size();
490  }
491
492  private boolean shouldSplitRegion() {
493    if (server.getNumberOfOnlineRegions() > 0.9 * regionSplitLimit) {
494      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
495        + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
496    }
497    return (regionSplitLimit > server.getNumberOfOnlineRegions());
498  }
499
500  /** Returns the regionSplitLimit */
501  public int getRegionSplitLimit() {
502    return this.regionSplitLimit;
503  }
504
505  /**
506   * Check if this store is under compaction
507   */
508  public boolean isUnderCompaction(final HStore s) {
509    return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
510  }
511
512  private static final Comparator<Runnable> COMPARATOR = new Comparator<Runnable>() {
513
514    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
515      if (r1 == r2) {
516        return 0; // they are the same request
517      }
518      // less first
519      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
520      if (cmp != 0) {
521        return cmp;
522      }
523      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
524      if (cmp != 0) {
525        return cmp;
526      }
527
528      // break the tie based on hash code
529      return System.identityHashCode(r1) - System.identityHashCode(r2);
530    }
531
532    @Override
533    public int compare(Runnable r1, Runnable r2) {
534      // CompactionRunner first
535      if (r1 instanceof CompactionRunner) {
536        if (!(r2 instanceof CompactionRunner)) {
537          return -1;
538        }
539      } else {
540        if (r2 instanceof CompactionRunner) {
541          return 1;
542        } else {
543          // break the tie based on hash code
544          return System.identityHashCode(r1) - System.identityHashCode(r2);
545        }
546      }
547      CompactionRunner o1 = (CompactionRunner) r1;
548      CompactionRunner o2 = (CompactionRunner) r2;
549      // less first
550      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
551      if (cmp != 0) {
552        return cmp;
553      }
554      CompactionContext c1 = o1.compaction;
555      CompactionContext c2 = o2.compaction;
556      if (c1 != null) {
557        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
558      } else {
559        return c2 != null ? 1 : 0;
560      }
561    }
562  };
563
564  private final class CompactionRunner implements Runnable {
565    private final HStore store;
566    private final HRegion region;
567    private final CompactionContext compaction;
568    private final CompactionLifeCycleTracker tracker;
569    private final CompactionCompleteTracker completeTracker;
570    private int queuedPriority;
571    private ThreadPoolExecutor parent;
572    private User user;
573    private long time;
574
575    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
576      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
577      ThreadPoolExecutor parent, User user) {
578      this.store = store;
579      this.region = region;
580      this.compaction = compaction;
581      this.tracker = tracker;
582      this.completeTracker = completeTracker;
583      this.queuedPriority =
584        compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
585      this.parent = parent;
586      this.user = user;
587      this.time = EnvironmentEdgeManager.currentTime();
588    }
589
590    @Override
591    public String toString() {
592      if (compaction != null) {
593        return "Request=" + compaction.getRequest();
594      } else {
595        return "region=" + region.toString() + ", storeName=" + store.toString() + ", priority="
596          + queuedPriority + ", startTime=" + time;
597      }
598    }
599
600    private void doCompaction(User user) {
601      CompactionContext c;
602      // Common case - system compaction without a file selection. Select now.
603      if (compaction == null) {
604        int oldPriority = this.queuedPriority;
605        this.queuedPriority = this.store.getCompactPriority();
606        if (this.queuedPriority > oldPriority) {
607          // Store priority decreased while we were in queue (due to some other compaction?),
608          // requeue with new priority to avoid blocking potential higher priorities.
609          this.parent.execute(this);
610          return;
611        }
612        Optional<CompactionContext> selected;
613        try {
614          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
615            completeTracker, user);
616        } catch (IOException ex) {
617          LOG.error("Compaction selection failed " + this, ex);
618          server.checkFileSystem();
619          region.decrementCompactionsQueuedCount();
620          return;
621        }
622        if (!selected.isPresent()) {
623          region.decrementCompactionsQueuedCount();
624          return; // nothing to do
625        }
626        c = selected.get();
627        assert c.hasSelection();
628        // Now see if we are in correct pool for the size; if not, go to the correct one.
629        // We might end up waiting for a while, so cancel the selection.
630
631        ThreadPoolExecutor pool =
632          store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
633
634        // Long compaction pool can process small job
635        // Short compaction pool should not process large job
636        if (this.parent == shortCompactions && pool == longCompactions) {
637          this.store.cancelRequestedCompaction(c);
638          this.parent = pool;
639          this.parent.execute(this);
640          return;
641        }
642      } else {
643        c = compaction;
644      }
645      // Finally we can compact something.
646      assert c != null;
647
648      tracker.beforeExecution(store);
649      try {
650        // Note: please don't put single-compaction logic here;
651        // put it into region/store/etc. This is CST logic.
652        long start = EnvironmentEdgeManager.currentTime();
653        boolean completed = region.compact(c, store, compactionThroughputController, user);
654        long now = EnvironmentEdgeManager.currentTime();
655        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " + this + "; duration="
656          + StringUtils.formatTimeDiff(now, start));
657        if (completed) {
658          // degenerate case: blocked regions require recursive enqueues
659          if (
660            region.getCompactPriority() < Store.PRIORITY_USER && store.getCompactPriority() <= 0
661          ) {
662            requestSystemCompaction(region, store, "Recursive enqueue");
663          } else {
664            // see if the compaction has caused us to exceed max region size
665            if (!requestSplit(region) && store.getCompactPriority() <= 0) {
666              requestSystemCompaction(region, store, "Recursive enqueue");
667            }
668          }
669        }
670      } catch (IOException ex) {
671        IOException remoteEx =
672          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
673        LOG.error("Compaction failed " + this, remoteEx);
674        if (remoteEx != ex) {
675          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
676        }
677        region.reportCompactionRequestFailure();
678        server.checkFileSystem();
679      } catch (Exception ex) {
680        LOG.error("Compaction failed " + this, ex);
681        region.reportCompactionRequestFailure();
682        server.checkFileSystem();
683      } finally {
684        tracker.afterExecution(store);
685        completeTracker.completed(store);
686        region.decrementCompactionsQueuedCount();
687        LOG.debug("Status {}", CompactSplit.this);
688      }
689    }
690
691    @Override
692    public void run() {
693      try {
694        Preconditions.checkNotNull(server);
695        if (
696          server.isStopped() || (region.getTableDescriptor() != null
697            && !region.getTableDescriptor().isCompactionEnabled())
698        ) {
699          region.decrementCompactionsQueuedCount();
700          return;
701        }
702        doCompaction(user);
703      } finally {
704        if (LOG.isDebugEnabled()) {
705          LOG.debug("Remove under compaction mark for store: {}",
706            store.getHRegion().getRegionInfo().getEncodedName() + ":"
707              + store.getColumnFamilyName());
708        }
709        underCompactionStores.remove(getStoreNameForUnderCompaction(store));
710      }
711    }
712
713    private String formatStackTrace(Exception ex) {
714      StringWriter sw = new StringWriter();
715      PrintWriter pw = new PrintWriter(sw);
716      ex.printStackTrace(pw);
717      pw.flush();
718      return sw.toString();
719    }
720  }
721
722  /**
723   * Cleanup class to use when rejecting a compaction request from the queue.
724   */
725  private static class Rejection implements RejectedExecutionHandler {
726    @Override
727    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
728      if (runnable instanceof CompactionRunner) {
729        CompactionRunner runner = (CompactionRunner) runnable;
730        LOG.debug("Compaction Rejected: " + runner);
731        if (runner.compaction != null) {
732          runner.store.cancelRequestedCompaction(runner.compaction);
733        }
734      }
735    }
736  }
737
738  /**
739   * {@inheritDoc}
740   */
741  @Override
742  public void onConfigurationChange(Configuration newConf) {
743    // Check if number of large / small compaction threads has changed, and then
744    // adjust the core pool size of the thread pools, by using the
745    // setCorePoolSize() method. According to the javadocs, it is safe to
746    // change the core pool size on-the-fly. We need to reset the maximum
747    // pool size, as well.
748    int largeThreads =
749      Math.max(1, newConf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
750    if (this.longCompactions.getCorePoolSize() != largeThreads) {
751      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + " from "
752        + this.longCompactions.getCorePoolSize() + " to " + largeThreads);
753      if (this.longCompactions.getCorePoolSize() < largeThreads) {
754        this.longCompactions.setMaximumPoolSize(largeThreads);
755        this.longCompactions.setCorePoolSize(largeThreads);
756      } else {
757        this.longCompactions.setCorePoolSize(largeThreads);
758        this.longCompactions.setMaximumPoolSize(largeThreads);
759      }
760    }
761
762    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
763    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
764      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + " from "
765        + this.shortCompactions.getCorePoolSize() + " to " + smallThreads);
766      if (this.shortCompactions.getCorePoolSize() < smallThreads) {
767        this.shortCompactions.setMaximumPoolSize(smallThreads);
768        this.shortCompactions.setCorePoolSize(smallThreads);
769      } else {
770        this.shortCompactions.setCorePoolSize(smallThreads);
771        this.shortCompactions.setMaximumPoolSize(smallThreads);
772      }
773    }
774
775    int splitThreads = newConf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
776    if (this.splits.getCorePoolSize() != splitThreads) {
777      LOG.info("Changing the value of " + SPLIT_THREADS + " from " + this.splits.getCorePoolSize()
778        + " to " + splitThreads);
779      if (this.splits.getCorePoolSize() < splitThreads) {
780        this.splits.setMaximumPoolSize(splitThreads);
781        this.splits.setCorePoolSize(splitThreads);
782      } else {
783        this.splits.setCorePoolSize(splitThreads);
784        this.splits.setMaximumPoolSize(splitThreads);
785      }
786    }
787
788    ThroughputController old = this.compactionThroughputController;
789    if (old != null) {
790      old.stop("configuration change");
791    }
792    this.compactionThroughputController =
793      CompactionThroughputControllerFactory.create(server, newConf);
794
795    // We change this atomically here instead of reloading the config in order that upstream
796    // would be the only one with the flexibility to reload the config.
797    this.conf.reloadConfiguration();
798  }
799
800  protected int getSmallCompactionThreadNum() {
801    return this.shortCompactions.getCorePoolSize();
802  }
803
804  protected int getLargeCompactionThreadNum() {
805    return this.longCompactions.getCorePoolSize();
806  }
807
808  protected int getSplitThreadNum() {
809    return this.splits.getCorePoolSize();
810  }
811
812  /**
813   * {@inheritDoc}
814   */
815  @Override
816  public void registerChildren(ConfigurationManager manager) {
817    // No children to register.
818  }
819
820  /**
821   * {@inheritDoc}
822   */
823  @Override
824  public void deregisterChildren(ConfigurationManager manager) {
825    // No children to register
826  }
827
828  public ThroughputController getCompactionThroughputController() {
829    return compactionThroughputController;
830  }
831
832  /**
833   * Shutdown the long compaction thread pool. Should only be used in unit test to prevent long
834   * compaction thread pool from stealing job from short compaction queue
835   */
836  void shutdownLongCompactions() {
837    this.longCompactions.shutdown();
838  }
839
840  public void clearLongCompactionsQueue() {
841    longCompactions.getQueue().clear();
842  }
843
844  public void clearShortCompactionsQueue() {
845    shortCompactions.getQueue().clear();
846  }
847
848  public boolean isCompactionsEnabled() {
849    return compactionsEnabled;
850  }
851
852  public void setCompactionsEnabled(boolean compactionsEnabled) {
853    this.compactionsEnabled = compactionsEnabled;
854    this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION, String.valueOf(compactionsEnabled));
855  }
856
857  /** Returns the longCompactions thread pool executor */
858  ThreadPoolExecutor getLongCompactions() {
859    return longCompactions;
860  }
861
862  /** Returns the shortCompactions thread pool executor */
863  ThreadPoolExecutor getShortCompactions() {
864    return shortCompactions;
865  }
866
867  private String getStoreNameForUnderCompaction(HStore store) {
868    return String.format("%s:%s",
869      store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
870      store.getColumnFamilyName());
871  }
872
873}