001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
023
024import java.io.IOException;
025import java.io.PrintWriter;
026import java.io.StringWriter;
027import java.util.Comparator;
028import java.util.Iterator;
029import java.util.Optional;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.Executors;
032import java.util.concurrent.RejectedExecutionException;
033import java.util.concurrent.RejectedExecutionHandler;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.function.IntSupplier;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.conf.ConfigurationManager;
041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
049import org.apache.hadoop.hbase.security.Superusers;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.StealJobQueue;
053import org.apache.hadoop.ipc.RemoteException;
054import org.apache.hadoop.util.StringUtils;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
062
063/**
064 * Compact region on request and then run split if appropriate
065 */
066@InterfaceAudience.Private
067public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
068  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
069
070  // Configuration key for the large compaction threads.
071  public final static String LARGE_COMPACTION_THREADS =
072      "hbase.regionserver.thread.compaction.large";
073  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
074
075  // Configuration key for the small compaction threads.
076  public final static String SMALL_COMPACTION_THREADS =
077      "hbase.regionserver.thread.compaction.small";
078  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
079
080  // Configuration key for split threads
081  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
082  public final static int SPLIT_THREADS_DEFAULT = 1;
083
084  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
085      "hbase.regionserver.regionSplitLimit";
086  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
087  public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
088      "hbase.regionserver.compaction.enabled";
089
090  private final HRegionServer server;
091  private final Configuration conf;
092  private volatile ThreadPoolExecutor longCompactions;
093  private volatile ThreadPoolExecutor shortCompactions;
094  private volatile ThreadPoolExecutor splits;
095
096  private volatile ThroughputController compactionThroughputController;
097
098  private volatile boolean compactionsEnabled;
099  /**
100   * Splitting should not take place if the total number of regions exceed this.
101   * This is not a hard limit to the number of regions but it is a guideline to
102   * stop splitting after number of online regions is greater than this.
103   */
104  private int regionSplitLimit;
105
106  /** @param server */
107  CompactSplit(HRegionServer server) {
108    this.server = server;
109    this.conf = server.getConfiguration();
110    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
111    createCompactionExecutors();
112    createSplitExcecutors();
113
114    // compaction throughput controller
115    this.compactionThroughputController =
116        CompactionThroughputControllerFactory.create(server, conf);
117  }
118
119  private void createSplitExcecutors() {
120    final String n = Thread.currentThread().getName();
121    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
122    this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
123      new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
124  }
125
126  private void createCompactionExecutors() {
127    this.regionSplitLimit =
128        conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
129
130    int largeThreads =
131        Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
132    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
133
134    // if we have throttle threads, make sure the user also specified size
135    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
136
137    final String n = Thread.currentThread().getName();
138
139    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
140    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
141        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d")
142            .setDaemon(true).build());
143    this.longCompactions.setRejectedExecutionHandler(new Rejection());
144    this.longCompactions.prestartAllCoreThreads();
145    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
146        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
147            .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
148    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
149  }
150
151  @Override
152  public String toString() {
153    return "compactionQueue=(longCompactions="
154        + longCompactions.getQueue().size() + ":shortCompactions="
155        + shortCompactions.getQueue().size() + ")"
156        + ", splitQueue=" + splits.getQueue().size();
157  }
158
159  public String dumpQueue() {
160    StringBuilder queueLists = new StringBuilder();
161    queueLists.append("Compaction/Split Queue dump:\n");
162    queueLists.append("  LargeCompation Queue:\n");
163    BlockingQueue<Runnable> lq = longCompactions.getQueue();
164    Iterator<Runnable> it = lq.iterator();
165    while (it.hasNext()) {
166      queueLists.append("    " + it.next().toString());
167      queueLists.append("\n");
168    }
169
170    if (shortCompactions != null) {
171      queueLists.append("\n");
172      queueLists.append("  SmallCompation Queue:\n");
173      lq = shortCompactions.getQueue();
174      it = lq.iterator();
175      while (it.hasNext()) {
176        queueLists.append("    " + it.next().toString());
177        queueLists.append("\n");
178      }
179    }
180
181    queueLists.append("\n");
182    queueLists.append("  Split Queue:\n");
183    lq = splits.getQueue();
184    it = lq.iterator();
185    while (it.hasNext()) {
186      queueLists.append("    " + it.next().toString());
187      queueLists.append("\n");
188    }
189
190    return queueLists.toString();
191  }
192
193  public synchronized boolean requestSplit(final Region r) {
194    // don't split regions that are blocking
195    if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) {
196      byte[] midKey = ((HRegion)r).checkSplit();
197      if (midKey != null) {
198        requestSplit(r, midKey);
199        return true;
200      }
201    }
202    return false;
203  }
204
205  public synchronized void requestSplit(final Region r, byte[] midKey) {
206    requestSplit(r, midKey, null);
207  }
208
209  /*
210   * The User parameter allows the split thread to assume the correct user identity
211   */
212  public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
213    if (midKey == null) {
214      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
215        " not splittable because midkey=null");
216      if (((HRegion)r).shouldForceSplit()) {
217        ((HRegion)r).clearSplit();
218      }
219      return;
220    }
221    try {
222      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
223      if (LOG.isDebugEnabled()) {
224        LOG.debug("Splitting " + r + ", " + this);
225      }
226    } catch (RejectedExecutionException ree) {
227      LOG.info("Could not execute split for " + r, ree);
228    }
229  }
230
231  private void interrupt() {
232    longCompactions.shutdownNow();
233    shortCompactions.shutdownNow();
234  }
235
236  private void reInitializeCompactionsExecutors() {
237    createCompactionExecutors();
238  }
239
240  private interface CompactionCompleteTracker {
241
242    default void completed(Store store) {
243    }
244  }
245
246  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
247      new CompactionCompleteTracker() {
248      };
249
250  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
251
252    private final CompactionLifeCycleTracker tracker;
253
254    private final AtomicInteger remaining;
255
256    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
257      this.tracker = tracker;
258      this.remaining = new AtomicInteger(numberOfStores);
259    }
260
261    @Override
262    public void completed(Store store) {
263      if (remaining.decrementAndGet() == 0) {
264        tracker.completed();
265      }
266    }
267  }
268
269  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
270      IntSupplier numberOfStores) {
271    if (tracker == CompactionLifeCycleTracker.DUMMY) {
272      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
273      // the life cycle of a compaction.
274      return DUMMY_COMPLETE_TRACKER;
275    } else {
276      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
277    }
278  }
279
280  @Override
281  public synchronized void requestCompaction(HRegion region, String why, int priority,
282      CompactionLifeCycleTracker tracker, User user) throws IOException {
283    requestCompactionInternal(region, why, priority, true, tracker,
284      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
285  }
286
287  @Override
288  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
289      CompactionLifeCycleTracker tracker, User user) throws IOException {
290    requestCompactionInternal(region, store, why, priority, true, tracker,
291      getCompleteTracker(tracker, () -> 1), user);
292  }
293
294  @Override
295  public void switchCompaction(boolean onOrOff) {
296    if (onOrOff) {
297      // re-create executor pool if compactions are disabled.
298      if (!isCompactionsEnabled()) {
299        LOG.info("Re-Initializing compactions because user switched on compactions");
300        reInitializeCompactionsExecutors();
301      }
302    } else {
303      LOG.info("Interrupting running compactions because user switched off compactions");
304      interrupt();
305    }
306    setCompactionsEnabled(onOrOff);
307  }
308
309  private void requestCompactionInternal(HRegion region, String why, int priority,
310      boolean selectNow, CompactionLifeCycleTracker tracker,
311      CompactionCompleteTracker completeTracker, User user) throws IOException {
312    // request compaction on all stores
313    for (HStore store : region.stores.values()) {
314      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
315        user);
316    }
317  }
318
319  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
320      boolean selectNow, CompactionLifeCycleTracker tracker,
321      CompactionCompleteTracker completeTracker, User user) throws IOException {
322    if (this.server.isStopped() || (region.getTableDescriptor() != null &&
323        !region.getTableDescriptor().isCompactionEnabled())) {
324      return;
325    }
326    RegionServerSpaceQuotaManager spaceQuotaManager =
327        this.server.getRegionServerSpaceQuotaManager();
328
329    if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
330        && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
331      // Enter here only when:
332      // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
333      String reason = "Ignoring compaction request for " + region +
334          " as an active space quota violation " + " policy disallows compactions.";
335      tracker.notExecuted(store, reason);
336      completeTracker.completed(store);
337      LOG.debug(reason);
338      return;
339    }
340
341    CompactionContext compaction;
342    if (selectNow) {
343      Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user);
344      if (!c.isPresent()) {
345        // message logged inside
346        return;
347      }
348      compaction = c.get();
349    } else {
350      compaction = null;
351    }
352
353    ThreadPoolExecutor pool;
354    if (selectNow) {
355      // compaction.get is safe as we will just return if selectNow is true but no compaction is
356      // selected
357      pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
358          : shortCompactions;
359    } else {
360      // We assume that most compactions are small. So, put system compactions into small
361      // pool; we will do selection there, and move to large pool if necessary.
362      pool = shortCompactions;
363    }
364    pool.execute(
365      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
366    region.incrementCompactionsQueuedCount();
367    if (LOG.isDebugEnabled()) {
368      String type = (pool == shortCompactions) ? "Small " : "Large ";
369      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
370          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
371    }
372  }
373
374  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
375    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
376      DUMMY_COMPLETE_TRACKER, null);
377  }
378
379  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
380      throws IOException {
381    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
382      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
383  }
384
385  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
386      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
387      throws IOException {
388    // don't even select for compaction if disableCompactions is set to true
389    if (!isCompactionsEnabled()) {
390      LOG.info(String.format("User has disabled compactions"));
391      return Optional.empty();
392    }
393    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
394    if (!compaction.isPresent() && region.getRegionInfo() != null) {
395      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
396          " because compaction request was cancelled";
397      tracker.notExecuted(store, reason);
398      completeTracker.completed(store);
399      LOG.debug(reason);
400    }
401    return compaction;
402  }
403
404  /**
405   * Only interrupt once it's done with a run through the work loop.
406   */
407  void interruptIfNecessary() {
408    splits.shutdown();
409    longCompactions.shutdown();
410    shortCompactions.shutdown();
411  }
412
413  private void waitFor(ThreadPoolExecutor t, String name) {
414    boolean done = false;
415    while (!done) {
416      try {
417        done = t.awaitTermination(60, TimeUnit.SECONDS);
418        LOG.info("Waiting for " + name + " to finish...");
419        if (!done) {
420          t.shutdownNow();
421        }
422      } catch (InterruptedException ie) {
423        LOG.warn("Interrupted waiting for " + name + " to finish...");
424        t.shutdownNow();
425      }
426    }
427  }
428
429  void join() {
430    waitFor(splits, "Split Thread");
431    waitFor(longCompactions, "Large Compaction Thread");
432    waitFor(shortCompactions, "Small Compaction Thread");
433  }
434
435  /**
436   * Returns the current size of the queue containing regions that are
437   * processed.
438   *
439   * @return The current size of the regions queue.
440   */
441  public int getCompactionQueueSize() {
442    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
443  }
444
445  public int getLargeCompactionQueueSize() {
446    return longCompactions.getQueue().size();
447  }
448
449
450  public int getSmallCompactionQueueSize() {
451    return shortCompactions.getQueue().size();
452  }
453
454  public int getSplitQueueSize() {
455    return splits.getQueue().size();
456  }
457
458  private boolean shouldSplitRegion() {
459    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
460      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
461          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
462    }
463    return (regionSplitLimit > server.getNumberOfOnlineRegions());
464  }
465
466  /**
467   * @return the regionSplitLimit
468   */
469  public int getRegionSplitLimit() {
470    return this.regionSplitLimit;
471  }
472
473  private static final Comparator<Runnable> COMPARATOR =
474      new Comparator<Runnable>() {
475
476    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
477      if (r1 == r2) {
478        return 0; //they are the same request
479      }
480      // less first
481      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
482      if (cmp != 0) {
483        return cmp;
484      }
485      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
486      if (cmp != 0) {
487        return cmp;
488      }
489
490      // break the tie based on hash code
491      return System.identityHashCode(r1) - System.identityHashCode(r2);
492    }
493
494    @Override
495    public int compare(Runnable r1, Runnable r2) {
496      // CompactionRunner first
497      if (r1 instanceof CompactionRunner) {
498        if (!(r2 instanceof CompactionRunner)) {
499          return -1;
500        }
501      } else {
502        if (r2 instanceof CompactionRunner) {
503          return 1;
504        } else {
505          // break the tie based on hash code
506          return System.identityHashCode(r1) - System.identityHashCode(r2);
507        }
508      }
509      CompactionRunner o1 = (CompactionRunner) r1;
510      CompactionRunner o2 = (CompactionRunner) r2;
511      // less first
512      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
513      if (cmp != 0) {
514        return cmp;
515      }
516      CompactionContext c1 = o1.compaction;
517      CompactionContext c2 = o2.compaction;
518      if (c1 != null) {
519        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
520      } else {
521        return c2 != null ? 1 : 0;
522      }
523    }
524  };
525
526  private final class CompactionRunner implements Runnable {
527    private final HStore store;
528    private final HRegion region;
529    private final CompactionContext compaction;
530    private final CompactionLifeCycleTracker tracker;
531    private final CompactionCompleteTracker completeTracker;
532    private int queuedPriority;
533    private ThreadPoolExecutor parent;
534    private User user;
535    private long time;
536
537    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
538        CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
539        ThreadPoolExecutor parent, User user) {
540      this.store = store;
541      this.region = region;
542      this.compaction = compaction;
543      this.tracker = tracker;
544      this.completeTracker = completeTracker;
545      this.queuedPriority =
546          compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
547      this.parent = parent;
548      this.user = user;
549      this.time = EnvironmentEdgeManager.currentTime();
550    }
551
552    @Override
553    public String toString() {
554      if (compaction != null) {
555        return "Request=" + compaction.getRequest();
556      } else {
557        return "region=" + region.toString() + ", storeName=" + store.toString() +
558            ", priority=" + queuedPriority + ", startTime=" + time;
559      }
560    }
561
562    private void doCompaction(User user) {
563      CompactionContext c;
564      // Common case - system compaction without a file selection. Select now.
565      if (compaction == null) {
566        int oldPriority = this.queuedPriority;
567        this.queuedPriority = this.store.getCompactPriority();
568        if (this.queuedPriority > oldPriority) {
569          // Store priority decreased while we were in queue (due to some other compaction?),
570          // requeue with new priority to avoid blocking potential higher priorities.
571          this.parent.execute(this);
572          return;
573        }
574        Optional<CompactionContext> selected;
575        try {
576          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
577            completeTracker, user);
578        } catch (IOException ex) {
579          LOG.error("Compaction selection failed " + this, ex);
580          server.checkFileSystem();
581          region.decrementCompactionsQueuedCount();
582          return;
583        }
584        if (!selected.isPresent()) {
585          region.decrementCompactionsQueuedCount();
586          return; // nothing to do
587        }
588        c = selected.get();
589        assert c.hasSelection();
590        // Now see if we are in correct pool for the size; if not, go to the correct one.
591        // We might end up waiting for a while, so cancel the selection.
592
593        ThreadPoolExecutor pool =
594            store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
595
596        // Long compaction pool can process small job
597        // Short compaction pool should not process large job
598        if (this.parent == shortCompactions && pool == longCompactions) {
599          this.store.cancelRequestedCompaction(c);
600          this.parent = pool;
601          this.parent.execute(this);
602          return;
603        }
604      } else {
605        c = compaction;
606      }
607      // Finally we can compact something.
608      assert c != null;
609
610      tracker.beforeExecution(store);
611      try {
612        // Note: please don't put single-compaction logic here;
613        //       put it into region/store/etc. This is CST logic.
614        long start = EnvironmentEdgeManager.currentTime();
615        boolean completed =
616            region.compact(c, store, compactionThroughputController, user);
617        long now = EnvironmentEdgeManager.currentTime();
618        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
619              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
620        if (completed) {
621          // degenerate case: blocked regions require recursive enqueues
622          if (store.getCompactPriority() <= 0) {
623            requestSystemCompaction(region, store, "Recursive enqueue");
624          } else {
625            // see if the compaction has caused us to exceed max region size
626            requestSplit(region);
627          }
628        }
629      } catch (IOException ex) {
630        IOException remoteEx =
631            ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
632        LOG.error("Compaction failed " + this, remoteEx);
633        if (remoteEx != ex) {
634          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
635        }
636        region.reportCompactionRequestFailure();
637        server.checkFileSystem();
638      } catch (Exception ex) {
639        LOG.error("Compaction failed " + this, ex);
640        region.reportCompactionRequestFailure();
641        server.checkFileSystem();
642      } finally {
643        tracker.afterExecution(store);
644        completeTracker.completed(store);
645        region.decrementCompactionsQueuedCount();
646        LOG.debug("Status {}", CompactSplit.this);
647      }
648    }
649
650    @Override
651    public void run() {
652      Preconditions.checkNotNull(server);
653      if (server.isStopped()
654          || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) {
655        region.decrementCompactionsQueuedCount();
656        return;
657      }
658      doCompaction(user);
659    }
660
661    private String formatStackTrace(Exception ex) {
662      StringWriter sw = new StringWriter();
663      PrintWriter pw = new PrintWriter(sw);
664      ex.printStackTrace(pw);
665      pw.flush();
666      return sw.toString();
667    }
668  }
669
670  /**
671   * Cleanup class to use when rejecting a compaction request from the queue.
672   */
673  private static class Rejection implements RejectedExecutionHandler {
674    @Override
675    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
676      if (runnable instanceof CompactionRunner) {
677        CompactionRunner runner = (CompactionRunner) runnable;
678        LOG.debug("Compaction Rejected: " + runner);
679        if (runner.compaction != null) {
680          runner.store.cancelRequestedCompaction(runner.compaction);
681        }
682      }
683    }
684  }
685
686  /**
687   * {@inheritDoc}
688   */
689  @Override
690  public void onConfigurationChange(Configuration newConf) {
691    // Check if number of large / small compaction threads has changed, and then
692    // adjust the core pool size of the thread pools, by using the
693    // setCorePoolSize() method. According to the javadocs, it is safe to
694    // change the core pool size on-the-fly. We need to reset the maximum
695    // pool size, as well.
696    int largeThreads = Math.max(1, newConf.getInt(
697            LARGE_COMPACTION_THREADS,
698            LARGE_COMPACTION_THREADS_DEFAULT));
699    if (this.longCompactions.getCorePoolSize() != largeThreads) {
700      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
701              " from " + this.longCompactions.getCorePoolSize() + " to " +
702              largeThreads);
703      if(this.longCompactions.getCorePoolSize() < largeThreads) {
704        this.longCompactions.setMaximumPoolSize(largeThreads);
705        this.longCompactions.setCorePoolSize(largeThreads);
706      } else {
707        this.longCompactions.setCorePoolSize(largeThreads);
708        this.longCompactions.setMaximumPoolSize(largeThreads);
709      }
710    }
711
712    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
713            SMALL_COMPACTION_THREADS_DEFAULT);
714    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
715      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
716                " from " + this.shortCompactions.getCorePoolSize() + " to " +
717                smallThreads);
718      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
719        this.shortCompactions.setMaximumPoolSize(smallThreads);
720        this.shortCompactions.setCorePoolSize(smallThreads);
721      } else {
722        this.shortCompactions.setCorePoolSize(smallThreads);
723        this.shortCompactions.setMaximumPoolSize(smallThreads);
724      }
725    }
726
727    int splitThreads = newConf.getInt(SPLIT_THREADS,
728            SPLIT_THREADS_DEFAULT);
729    if (this.splits.getCorePoolSize() != splitThreads) {
730      LOG.info("Changing the value of " + SPLIT_THREADS +
731                " from " + this.splits.getCorePoolSize() + " to " +
732                splitThreads);
733      if(this.splits.getCorePoolSize() < splitThreads) {
734        this.splits.setMaximumPoolSize(splitThreads);
735        this.splits.setCorePoolSize(splitThreads);
736      } else {
737        this.splits.setCorePoolSize(splitThreads);
738        this.splits.setMaximumPoolSize(splitThreads);
739      }
740    }
741
742    ThroughputController old = this.compactionThroughputController;
743    if (old != null) {
744      old.stop("configuration change");
745    }
746    this.compactionThroughputController =
747        CompactionThroughputControllerFactory.create(server, newConf);
748
749    // We change this atomically here instead of reloading the config in order that upstream
750    // would be the only one with the flexibility to reload the config.
751    this.conf.reloadConfiguration();
752  }
753
754  protected int getSmallCompactionThreadNum() {
755    return this.shortCompactions.getCorePoolSize();
756  }
757
758  protected int getLargeCompactionThreadNum() {
759    return this.longCompactions.getCorePoolSize();
760  }
761
762  protected int getSplitThreadNum() {
763    return this.splits.getCorePoolSize();
764  }
765
766  /**
767   * {@inheritDoc}
768   */
769  @Override
770  public void registerChildren(ConfigurationManager manager) {
771    // No children to register.
772  }
773
774  /**
775   * {@inheritDoc}
776   */
777  @Override
778  public void deregisterChildren(ConfigurationManager manager) {
779    // No children to register
780  }
781
782  @VisibleForTesting
783  public ThroughputController getCompactionThroughputController() {
784    return compactionThroughputController;
785  }
786
787  @VisibleForTesting
788  /**
789   * Shutdown the long compaction thread pool.
790   * Should only be used in unit test to prevent long compaction thread pool from stealing job
791   * from short compaction queue
792   */
793  void shutdownLongCompactions(){
794    this.longCompactions.shutdown();
795  }
796
797  public void clearLongCompactionsQueue() {
798    longCompactions.getQueue().clear();
799  }
800
801  public void clearShortCompactionsQueue() {
802    shortCompactions.getQueue().clear();
803  }
804
805  public boolean isCompactionsEnabled() {
806    return compactionsEnabled;
807  }
808
809  public void setCompactionsEnabled(boolean compactionsEnabled) {
810    this.compactionsEnabled = compactionsEnabled;
811    this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled));
812  }
813
814  /**
815   * @return the longCompactions thread pool executor
816   */
817  ThreadPoolExecutor getLongCompactions() {
818    return longCompactions;
819  }
820
821  /**
822   * @return the shortCompactions thread pool executor
823   */
824  ThreadPoolExecutor getShortCompactions() {
825    return shortCompactions;
826  }
827
828}