001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
023
024import java.io.IOException;
025import java.io.PrintWriter;
026import java.io.StringWriter;
027import java.util.Comparator;
028import java.util.Iterator;
029import java.util.Optional;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.Executors;
032import java.util.concurrent.RejectedExecutionException;
033import java.util.concurrent.RejectedExecutionHandler;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.function.IntSupplier;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.conf.ConfigurationManager;
040import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
041import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
046import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
048import org.apache.hadoop.hbase.security.Superusers;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.StealJobQueue;
052import org.apache.hadoop.ipc.RemoteException;
053import org.apache.hadoop.util.StringUtils;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
060
061/**
062 * Compact region on request and then run split if appropriate
063 */
064@InterfaceAudience.Private
065public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
066  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
067
068  // Configuration key for the large compaction threads.
069  public final static String LARGE_COMPACTION_THREADS =
070      "hbase.regionserver.thread.compaction.large";
071  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
072
073  // Configuration key for the small compaction threads.
074  public final static String SMALL_COMPACTION_THREADS =
075      "hbase.regionserver.thread.compaction.small";
076  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
077
078  // Configuration key for split threads
079  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
080  public final static int SPLIT_THREADS_DEFAULT = 1;
081
082  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
083      "hbase.regionserver.regionSplitLimit";
084  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
085  public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
086      "hbase.regionserver.compaction.enabled";
087
088  private final HRegionServer server;
089  private final Configuration conf;
090  private volatile ThreadPoolExecutor longCompactions;
091  private volatile ThreadPoolExecutor shortCompactions;
092  private volatile ThreadPoolExecutor splits;
093
094  private volatile ThroughputController compactionThroughputController;
095
096  private volatile boolean compactionsEnabled;
097  /**
098   * Splitting should not take place if the total number of regions exceed this.
099   * This is not a hard limit to the number of regions but it is a guideline to
100   * stop splitting after number of online regions is greater than this.
101   */
102  private int regionSplitLimit;
103
104  CompactSplit(HRegionServer server) {
105    this.server = server;
106    this.conf = server.getConfiguration();
107    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
108    createCompactionExecutors();
109    createSplitExcecutors();
110
111    // compaction throughput controller
112    this.compactionThroughputController =
113        CompactionThroughputControllerFactory.create(server, conf);
114  }
115
116  private void createSplitExcecutors() {
117    final String n = Thread.currentThread().getName();
118    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
119    this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
120      new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
121  }
122
123  private void createCompactionExecutors() {
124    this.regionSplitLimit =
125        conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
126
127    int largeThreads =
128        Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
129    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
130
131    // if we have throttle threads, make sure the user also specified size
132    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
133
134    final String n = Thread.currentThread().getName();
135
136    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
137    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
138        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d")
139            .setDaemon(true).build());
140    this.longCompactions.setRejectedExecutionHandler(new Rejection());
141    this.longCompactions.prestartAllCoreThreads();
142    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
143        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
144            .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
145    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
146  }
147
148  @Override
149  public String toString() {
150    return "compactionQueue=(longCompactions="
151        + longCompactions.getQueue().size() + ":shortCompactions="
152        + shortCompactions.getQueue().size() + ")"
153        + ", splitQueue=" + splits.getQueue().size();
154  }
155
156  public String dumpQueue() {
157    StringBuilder queueLists = new StringBuilder();
158    queueLists.append("Compaction/Split Queue dump:\n");
159    queueLists.append("  LargeCompation Queue:\n");
160    BlockingQueue<Runnable> lq = longCompactions.getQueue();
161    Iterator<Runnable> it = lq.iterator();
162    while (it.hasNext()) {
163      queueLists.append("    " + it.next().toString());
164      queueLists.append("\n");
165    }
166
167    if (shortCompactions != null) {
168      queueLists.append("\n");
169      queueLists.append("  SmallCompation Queue:\n");
170      lq = shortCompactions.getQueue();
171      it = lq.iterator();
172      while (it.hasNext()) {
173        queueLists.append("    " + it.next().toString());
174        queueLists.append("\n");
175      }
176    }
177
178    queueLists.append("\n");
179    queueLists.append("  Split Queue:\n");
180    lq = splits.getQueue();
181    it = lq.iterator();
182    while (it.hasNext()) {
183      queueLists.append("    " + it.next().toString());
184      queueLists.append("\n");
185    }
186
187    return queueLists.toString();
188  }
189
190  public synchronized boolean requestSplit(final Region r) {
191    // don't split regions that are blocking
192    HRegion hr = (HRegion)r;
193    try {
194      if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) {
195        byte[] midKey = hr.checkSplit().orElse(null);
196        if (midKey != null) {
197          requestSplit(r, midKey);
198          return true;
199        }
200      }
201    } catch (IndexOutOfBoundsException e) {
202      // We get this sometimes. Not sure why. Catch and return false; no split request.
203      LOG.warn("Catching out-of-bounds; region={}, policy={}", hr == null? null: hr.getRegionInfo(),
204        hr == null? "null": hr.getCompactPriority(), e);
205    }
206    return false;
207  }
208
209  public synchronized void requestSplit(final Region r, byte[] midKey) {
210    requestSplit(r, midKey, null);
211  }
212
213  /*
214   * The User parameter allows the split thread to assume the correct user identity
215   */
216  public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
217    if (midKey == null) {
218      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
219        " not splittable because midkey=null");
220      return;
221    }
222    try {
223      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
224      if (LOG.isDebugEnabled()) {
225        LOG.debug("Splitting " + r + ", " + this);
226      }
227    } catch (RejectedExecutionException ree) {
228      LOG.info("Could not execute split for " + r, ree);
229    }
230  }
231
232  private void interrupt() {
233    longCompactions.shutdownNow();
234    shortCompactions.shutdownNow();
235  }
236
237  private void reInitializeCompactionsExecutors() {
238    createCompactionExecutors();
239  }
240
241  private interface CompactionCompleteTracker {
242
243    default void completed(Store store) {
244    }
245  }
246
247  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
248    new CompactionCompleteTracker() {};
249
250  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
251
252    private final CompactionLifeCycleTracker tracker;
253
254    private final AtomicInteger remaining;
255
256    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
257      this.tracker = tracker;
258      this.remaining = new AtomicInteger(numberOfStores);
259    }
260
261    @Override
262    public void completed(Store store) {
263      if (remaining.decrementAndGet() == 0) {
264        tracker.completed();
265      }
266    }
267  }
268
269  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
270      IntSupplier numberOfStores) {
271    if (tracker == CompactionLifeCycleTracker.DUMMY) {
272      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
273      // the life cycle of a compaction.
274      return DUMMY_COMPLETE_TRACKER;
275    } else {
276      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
277    }
278  }
279
280  @Override
281  public synchronized void requestCompaction(HRegion region, String why, int priority,
282      CompactionLifeCycleTracker tracker, User user) throws IOException {
283    requestCompactionInternal(region, why, priority, true, tracker,
284      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
285  }
286
287  @Override
288  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
289      CompactionLifeCycleTracker tracker, User user) throws IOException {
290    requestCompactionInternal(region, store, why, priority, true, tracker,
291      getCompleteTracker(tracker, () -> 1), user);
292  }
293
294  @Override
295  public void switchCompaction(boolean onOrOff) {
296    if (onOrOff) {
297      // re-create executor pool if compactions are disabled.
298      if (!isCompactionsEnabled()) {
299        LOG.info("Re-Initializing compactions because user switched on compactions");
300        reInitializeCompactionsExecutors();
301      }
302    } else {
303      LOG.info("Interrupting running compactions because user switched off compactions");
304      interrupt();
305    }
306    setCompactionsEnabled(onOrOff);
307  }
308
309  private void requestCompactionInternal(HRegion region, String why, int priority,
310      boolean selectNow, CompactionLifeCycleTracker tracker,
311      CompactionCompleteTracker completeTracker, User user) throws IOException {
312    // request compaction on all stores
313    for (HStore store : region.stores.values()) {
314      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
315        user);
316    }
317  }
318
319  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
320      boolean selectNow, CompactionLifeCycleTracker tracker,
321      CompactionCompleteTracker completeTracker, User user) throws IOException {
322    if (this.server.isStopped() || (region.getTableDescriptor() != null &&
323        !region.getTableDescriptor().isCompactionEnabled())) {
324      return;
325    }
326    RegionServerSpaceQuotaManager spaceQuotaManager =
327        this.server.getRegionServerSpaceQuotaManager();
328
329    if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
330        && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
331      // Enter here only when:
332      // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
333      String reason = "Ignoring compaction request for " + region +
334          " as an active space quota violation " + " policy disallows compactions.";
335      tracker.notExecuted(store, reason);
336      completeTracker.completed(store);
337      LOG.debug(reason);
338      return;
339    }
340
341    CompactionContext compaction;
342    if (selectNow) {
343      Optional<CompactionContext> c =
344        selectCompaction(region, store, priority, tracker, completeTracker, user);
345      if (!c.isPresent()) {
346        // message logged inside
347        return;
348      }
349      compaction = c.get();
350    } else {
351      compaction = null;
352    }
353
354    ThreadPoolExecutor pool;
355    if (selectNow) {
356      // compaction.get is safe as we will just return if selectNow is true but no compaction is
357      // selected
358      pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
359          : shortCompactions;
360    } else {
361      // We assume that most compactions are small. So, put system compactions into small
362      // pool; we will do selection there, and move to large pool if necessary.
363      pool = shortCompactions;
364    }
365    pool.execute(
366      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
367    region.incrementCompactionsQueuedCount();
368    if (LOG.isDebugEnabled()) {
369      String type = (pool == shortCompactions) ? "Small " : "Large ";
370      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
371          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
372    }
373  }
374
375  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
376    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
377      DUMMY_COMPLETE_TRACKER, null);
378  }
379
380  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
381      throws IOException {
382    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
383      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
384  }
385
386  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
387      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
388      throws IOException {
389    // don't even select for compaction if disableCompactions is set to true
390    if (!isCompactionsEnabled()) {
391      LOG.info(String.format("User has disabled compactions"));
392      return Optional.empty();
393    }
394    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
395    if (!compaction.isPresent() && region.getRegionInfo() != null) {
396      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
397          " because compaction request was cancelled";
398      tracker.notExecuted(store, reason);
399      completeTracker.completed(store);
400      LOG.debug(reason);
401    }
402    return compaction;
403  }
404
405  /**
406   * Only interrupt once it's done with a run through the work loop.
407   */
408  void interruptIfNecessary() {
409    splits.shutdown();
410    longCompactions.shutdown();
411    shortCompactions.shutdown();
412  }
413
414  private void waitFor(ThreadPoolExecutor t, String name) {
415    boolean done = false;
416    while (!done) {
417      try {
418        done = t.awaitTermination(60, TimeUnit.SECONDS);
419        LOG.info("Waiting for " + name + " to finish...");
420        if (!done) {
421          t.shutdownNow();
422        }
423      } catch (InterruptedException ie) {
424        LOG.warn("Interrupted waiting for " + name + " to finish...");
425        t.shutdownNow();
426      }
427    }
428  }
429
430  void join() {
431    waitFor(splits, "Split Thread");
432    waitFor(longCompactions, "Large Compaction Thread");
433    waitFor(shortCompactions, "Small Compaction Thread");
434  }
435
436  /**
437   * Returns the current size of the queue containing regions that are
438   * processed.
439   *
440   * @return The current size of the regions queue.
441   */
442  public int getCompactionQueueSize() {
443    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
444  }
445
446  public int getLargeCompactionQueueSize() {
447    return longCompactions.getQueue().size();
448  }
449
450
451  public int getSmallCompactionQueueSize() {
452    return shortCompactions.getQueue().size();
453  }
454
455  public int getSplitQueueSize() {
456    return splits.getQueue().size();
457  }
458
459  private boolean shouldSplitRegion() {
460    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
461      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
462          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
463    }
464    return (regionSplitLimit > server.getNumberOfOnlineRegions());
465  }
466
467  /**
468   * @return the regionSplitLimit
469   */
470  public int getRegionSplitLimit() {
471    return this.regionSplitLimit;
472  }
473
474  private static final Comparator<Runnable> COMPARATOR =
475      new Comparator<Runnable>() {
476
477    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
478      if (r1 == r2) {
479        return 0; //they are the same request
480      }
481      // less first
482      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
483      if (cmp != 0) {
484        return cmp;
485      }
486      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
487      if (cmp != 0) {
488        return cmp;
489      }
490
491      // break the tie based on hash code
492      return System.identityHashCode(r1) - System.identityHashCode(r2);
493    }
494
495    @Override
496    public int compare(Runnable r1, Runnable r2) {
497      // CompactionRunner first
498      if (r1 instanceof CompactionRunner) {
499        if (!(r2 instanceof CompactionRunner)) {
500          return -1;
501        }
502      } else {
503        if (r2 instanceof CompactionRunner) {
504          return 1;
505        } else {
506          // break the tie based on hash code
507          return System.identityHashCode(r1) - System.identityHashCode(r2);
508        }
509      }
510      CompactionRunner o1 = (CompactionRunner) r1;
511      CompactionRunner o2 = (CompactionRunner) r2;
512      // less first
513      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
514      if (cmp != 0) {
515        return cmp;
516      }
517      CompactionContext c1 = o1.compaction;
518      CompactionContext c2 = o2.compaction;
519      if (c1 != null) {
520        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
521      } else {
522        return c2 != null ? 1 : 0;
523      }
524    }
525  };
526
527  private final class CompactionRunner implements Runnable {
528    private final HStore store;
529    private final HRegion region;
530    private final CompactionContext compaction;
531    private final CompactionLifeCycleTracker tracker;
532    private final CompactionCompleteTracker completeTracker;
533    private int queuedPriority;
534    private ThreadPoolExecutor parent;
535    private User user;
536    private long time;
537
538    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
539        CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
540        ThreadPoolExecutor parent, User user) {
541      this.store = store;
542      this.region = region;
543      this.compaction = compaction;
544      this.tracker = tracker;
545      this.completeTracker = completeTracker;
546      this.queuedPriority =
547          compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
548      this.parent = parent;
549      this.user = user;
550      this.time = EnvironmentEdgeManager.currentTime();
551    }
552
553    @Override
554    public String toString() {
555      if (compaction != null) {
556        return "Request=" + compaction.getRequest();
557      } else {
558        return "region=" + region.toString() + ", storeName=" + store.toString() +
559            ", priority=" + queuedPriority + ", startTime=" + time;
560      }
561    }
562
563    private void doCompaction(User user) {
564      CompactionContext c;
565      // Common case - system compaction without a file selection. Select now.
566      if (compaction == null) {
567        int oldPriority = this.queuedPriority;
568        this.queuedPriority = this.store.getCompactPriority();
569        if (this.queuedPriority > oldPriority) {
570          // Store priority decreased while we were in queue (due to some other compaction?),
571          // requeue with new priority to avoid blocking potential higher priorities.
572          this.parent.execute(this);
573          return;
574        }
575        Optional<CompactionContext> selected;
576        try {
577          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
578            completeTracker, user);
579        } catch (IOException ex) {
580          LOG.error("Compaction selection failed " + this, ex);
581          server.checkFileSystem();
582          region.decrementCompactionsQueuedCount();
583          return;
584        }
585        if (!selected.isPresent()) {
586          region.decrementCompactionsQueuedCount();
587          return; // nothing to do
588        }
589        c = selected.get();
590        assert c.hasSelection();
591        // Now see if we are in correct pool for the size; if not, go to the correct one.
592        // We might end up waiting for a while, so cancel the selection.
593
594        ThreadPoolExecutor pool =
595            store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
596
597        // Long compaction pool can process small job
598        // Short compaction pool should not process large job
599        if (this.parent == shortCompactions && pool == longCompactions) {
600          this.store.cancelRequestedCompaction(c);
601          this.parent = pool;
602          this.parent.execute(this);
603          return;
604        }
605      } else {
606        c = compaction;
607      }
608      // Finally we can compact something.
609      assert c != null;
610
611      tracker.beforeExecution(store);
612      try {
613        // Note: please don't put single-compaction logic here;
614        //       put it into region/store/etc. This is CST logic.
615        long start = EnvironmentEdgeManager.currentTime();
616        boolean completed =
617            region.compact(c, store, compactionThroughputController, user);
618        long now = EnvironmentEdgeManager.currentTime();
619        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
620              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
621        if (completed) {
622          // degenerate case: blocked regions require recursive enqueues
623          if (store.getCompactPriority() <= 0) {
624            requestSystemCompaction(region, store, "Recursive enqueue");
625          } else {
626            // see if the compaction has caused us to exceed max region size
627            requestSplit(region);
628          }
629        }
630      } catch (IOException ex) {
631        IOException remoteEx =
632            ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
633        LOG.error("Compaction failed " + this, remoteEx);
634        if (remoteEx != ex) {
635          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
636        }
637        region.reportCompactionRequestFailure();
638        server.checkFileSystem();
639      } catch (Exception ex) {
640        LOG.error("Compaction failed " + this, ex);
641        region.reportCompactionRequestFailure();
642        server.checkFileSystem();
643      } finally {
644        tracker.afterExecution(store);
645        completeTracker.completed(store);
646        region.decrementCompactionsQueuedCount();
647        LOG.debug("Status {}", CompactSplit.this);
648      }
649    }
650
651    @Override
652    public void run() {
653      Preconditions.checkNotNull(server);
654      if (server.isStopped() || (region.getTableDescriptor() != null &&
655        !region.getTableDescriptor().isCompactionEnabled())) {
656        region.decrementCompactionsQueuedCount();
657        return;
658      }
659      doCompaction(user);
660    }
661
662    private String formatStackTrace(Exception ex) {
663      StringWriter sw = new StringWriter();
664      PrintWriter pw = new PrintWriter(sw);
665      ex.printStackTrace(pw);
666      pw.flush();
667      return sw.toString();
668    }
669  }
670
671  /**
672   * Cleanup class to use when rejecting a compaction request from the queue.
673   */
674  private static class Rejection implements RejectedExecutionHandler {
675    @Override
676    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
677      if (runnable instanceof CompactionRunner) {
678        CompactionRunner runner = (CompactionRunner) runnable;
679        LOG.debug("Compaction Rejected: " + runner);
680        if (runner.compaction != null) {
681          runner.store.cancelRequestedCompaction(runner.compaction);
682        }
683      }
684    }
685  }
686
687  /**
688   * {@inheritDoc}
689   */
690  @Override
691  public void onConfigurationChange(Configuration newConf) {
692    // Check if number of large / small compaction threads has changed, and then
693    // adjust the core pool size of the thread pools, by using the
694    // setCorePoolSize() method. According to the javadocs, it is safe to
695    // change the core pool size on-the-fly. We need to reset the maximum
696    // pool size, as well.
697    int largeThreads = Math.max(1, newConf.getInt(
698            LARGE_COMPACTION_THREADS,
699            LARGE_COMPACTION_THREADS_DEFAULT));
700    if (this.longCompactions.getCorePoolSize() != largeThreads) {
701      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
702              " from " + this.longCompactions.getCorePoolSize() + " to " +
703              largeThreads);
704      if(this.longCompactions.getCorePoolSize() < largeThreads) {
705        this.longCompactions.setMaximumPoolSize(largeThreads);
706        this.longCompactions.setCorePoolSize(largeThreads);
707      } else {
708        this.longCompactions.setCorePoolSize(largeThreads);
709        this.longCompactions.setMaximumPoolSize(largeThreads);
710      }
711    }
712
713    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
714            SMALL_COMPACTION_THREADS_DEFAULT);
715    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
716      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
717                " from " + this.shortCompactions.getCorePoolSize() + " to " +
718                smallThreads);
719      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
720        this.shortCompactions.setMaximumPoolSize(smallThreads);
721        this.shortCompactions.setCorePoolSize(smallThreads);
722      } else {
723        this.shortCompactions.setCorePoolSize(smallThreads);
724        this.shortCompactions.setMaximumPoolSize(smallThreads);
725      }
726    }
727
728    int splitThreads = newConf.getInt(SPLIT_THREADS,
729            SPLIT_THREADS_DEFAULT);
730    if (this.splits.getCorePoolSize() != splitThreads) {
731      LOG.info("Changing the value of " + SPLIT_THREADS +
732                " from " + this.splits.getCorePoolSize() + " to " +
733                splitThreads);
734      if(this.splits.getCorePoolSize() < splitThreads) {
735        this.splits.setMaximumPoolSize(splitThreads);
736        this.splits.setCorePoolSize(splitThreads);
737      } else {
738        this.splits.setCorePoolSize(splitThreads);
739        this.splits.setMaximumPoolSize(splitThreads);
740      }
741    }
742
743    ThroughputController old = this.compactionThroughputController;
744    if (old != null) {
745      old.stop("configuration change");
746    }
747    this.compactionThroughputController =
748        CompactionThroughputControllerFactory.create(server, newConf);
749
750    // We change this atomically here instead of reloading the config in order that upstream
751    // would be the only one with the flexibility to reload the config.
752    this.conf.reloadConfiguration();
753  }
754
755  protected int getSmallCompactionThreadNum() {
756    return this.shortCompactions.getCorePoolSize();
757  }
758
759  protected int getLargeCompactionThreadNum() {
760    return this.longCompactions.getCorePoolSize();
761  }
762
763  protected int getSplitThreadNum() {
764    return this.splits.getCorePoolSize();
765  }
766
767  /**
768   * {@inheritDoc}
769   */
770  @Override
771  public void registerChildren(ConfigurationManager manager) {
772    // No children to register.
773  }
774
775  /**
776   * {@inheritDoc}
777   */
778  @Override
779  public void deregisterChildren(ConfigurationManager manager) {
780    // No children to register
781  }
782
783  public ThroughputController getCompactionThroughputController() {
784    return compactionThroughputController;
785  }
786
787  /**
788   * Shutdown the long compaction thread pool.
789   * Should only be used in unit test to prevent long compaction thread pool from stealing job
790   * from short compaction queue
791   */
792  void shutdownLongCompactions(){
793    this.longCompactions.shutdown();
794  }
795
796  public void clearLongCompactionsQueue() {
797    longCompactions.getQueue().clear();
798  }
799
800  public void clearShortCompactionsQueue() {
801    shortCompactions.getQueue().clear();
802  }
803
804  public boolean isCompactionsEnabled() {
805    return compactionsEnabled;
806  }
807
808  public void setCompactionsEnabled(boolean compactionsEnabled) {
809    this.compactionsEnabled = compactionsEnabled;
810    this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled));
811  }
812
813  /**
814   * @return the longCompactions thread pool executor
815   */
816  ThreadPoolExecutor getLongCompactions() {
817    return longCompactions;
818  }
819
820  /**
821   * @return the shortCompactions thread pool executor
822   */
823  ThreadPoolExecutor getShortCompactions() {
824    return shortCompactions;
825  }
826
827}