001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Comparator;
027import java.util.Iterator;
028import java.util.Optional;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.Executors;
031import java.util.concurrent.RejectedExecutionException;
032import java.util.concurrent.RejectedExecutionHandler;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.function.IntSupplier;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.conf.ConfigurationManager;
039import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
040import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
041import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
045import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
047import org.apache.hadoop.hbase.security.Superusers;
048import org.apache.hadoop.hbase.security.User;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.StealJobQueue;
051import org.apache.hadoop.ipc.RemoteException;
052import org.apache.hadoop.util.StringUtils;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
059
060/**
061 * Compact region on request and then run split if appropriate
062 */
063@InterfaceAudience.Private
064public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
065  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
066
067  // Configuration key for the large compaction threads.
068  public final static String LARGE_COMPACTION_THREADS =
069      "hbase.regionserver.thread.compaction.large";
070  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
071
072  // Configuration key for the small compaction threads.
073  public final static String SMALL_COMPACTION_THREADS =
074      "hbase.regionserver.thread.compaction.small";
075  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
076
077  // Configuration key for split threads
078  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
079  public final static int SPLIT_THREADS_DEFAULT = 1;
080
081  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
082      "hbase.regionserver.regionSplitLimit";
083  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
084  public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
085      "hbase.regionserver.compaction.enabled";
086
087  private final HRegionServer server;
088  private final Configuration conf;
089  private volatile ThreadPoolExecutor longCompactions;
090  private volatile ThreadPoolExecutor shortCompactions;
091  private volatile ThreadPoolExecutor splits;
092
093  private volatile ThroughputController compactionThroughputController;
094
095  private volatile boolean compactionsEnabled;
096  /**
097   * Splitting should not take place if the total number of regions exceed this.
098   * This is not a hard limit to the number of regions but it is a guideline to
099   * stop splitting after number of online regions is greater than this.
100   */
101  private int regionSplitLimit;
102
103  CompactSplit(HRegionServer server) {
104    this.server = server;
105    this.conf = server.getConfiguration();
106    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
107    createCompactionExecutors();
108    createSplitExcecutors();
109
110    // compaction throughput controller
111    this.compactionThroughputController =
112        CompactionThroughputControllerFactory.create(server, conf);
113  }
114
115  private void createSplitExcecutors() {
116    final String n = Thread.currentThread().getName();
117    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
118    this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
119      new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
120  }
121
122  private void createCompactionExecutors() {
123    this.regionSplitLimit =
124        conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
125
126    int largeThreads =
127        Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
128    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
129
130    // if we have throttle threads, make sure the user also specified size
131    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
132
133    final String n = Thread.currentThread().getName();
134
135    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
136    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
137        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d")
138            .setDaemon(true).build());
139    this.longCompactions.setRejectedExecutionHandler(new Rejection());
140    this.longCompactions.prestartAllCoreThreads();
141    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
142        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
143            .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
144    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
145  }
146
147  @Override
148  public String toString() {
149    return "compactionQueue=(longCompactions="
150        + longCompactions.getQueue().size() + ":shortCompactions="
151        + shortCompactions.getQueue().size() + ")"
152        + ", splitQueue=" + splits.getQueue().size();
153  }
154
155  public String dumpQueue() {
156    StringBuilder queueLists = new StringBuilder();
157    queueLists.append("Compaction/Split Queue dump:\n");
158    queueLists.append("  LargeCompation Queue:\n");
159    BlockingQueue<Runnable> lq = longCompactions.getQueue();
160    Iterator<Runnable> it = lq.iterator();
161    while (it.hasNext()) {
162      queueLists.append("    " + it.next().toString());
163      queueLists.append("\n");
164    }
165
166    if (shortCompactions != null) {
167      queueLists.append("\n");
168      queueLists.append("  SmallCompation Queue:\n");
169      lq = shortCompactions.getQueue();
170      it = lq.iterator();
171      while (it.hasNext()) {
172        queueLists.append("    " + it.next().toString());
173        queueLists.append("\n");
174      }
175    }
176
177    queueLists.append("\n");
178    queueLists.append("  Split Queue:\n");
179    lq = splits.getQueue();
180    it = lq.iterator();
181    while (it.hasNext()) {
182      queueLists.append("    " + it.next().toString());
183      queueLists.append("\n");
184    }
185
186    return queueLists.toString();
187  }
188
189  public synchronized boolean requestSplit(final Region r) {
190    // don't split regions that are blocking
191    HRegion hr = (HRegion)r;
192    try {
193      if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) {
194        byte[] midKey = hr.checkSplit();
195        if (midKey != null) {
196          requestSplit(r, midKey);
197          return true;
198        }
199      }
200    } catch (IndexOutOfBoundsException e) {
201      // We get this sometimes. Not sure why. Catch and return false; no split request.
202      LOG.warn("Catching out-of-bounds; region={}, policy={}", hr == null? null: hr.getRegionInfo(),
203        hr == null? "null": hr.getCompactPriority(), e);
204    }
205    return false;
206  }
207
208  public synchronized void requestSplit(final Region r, byte[] midKey) {
209    requestSplit(r, midKey, null);
210  }
211
212  /*
213   * The User parameter allows the split thread to assume the correct user identity
214   */
215  public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
216    if (midKey == null) {
217      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
218        " not splittable because midkey=null");
219      if (((HRegion)r).shouldForceSplit()) {
220        ((HRegion)r).clearSplit();
221      }
222      return;
223    }
224    try {
225      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
226      if (LOG.isDebugEnabled()) {
227        LOG.debug("Splitting " + r + ", " + this);
228      }
229    } catch (RejectedExecutionException ree) {
230      LOG.info("Could not execute split for " + r, ree);
231    }
232  }
233
234  private void interrupt() {
235    longCompactions.shutdownNow();
236    shortCompactions.shutdownNow();
237  }
238
239  private void reInitializeCompactionsExecutors() {
240    createCompactionExecutors();
241  }
242
243  private interface CompactionCompleteTracker {
244
245    default void completed(Store store) {
246    }
247  }
248
249  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
250    new CompactionCompleteTracker() {};
251
252  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
253
254    private final CompactionLifeCycleTracker tracker;
255
256    private final AtomicInteger remaining;
257
258    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
259      this.tracker = tracker;
260      this.remaining = new AtomicInteger(numberOfStores);
261    }
262
263    @Override
264    public void completed(Store store) {
265      if (remaining.decrementAndGet() == 0) {
266        tracker.completed();
267      }
268    }
269  }
270
271  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
272      IntSupplier numberOfStores) {
273    if (tracker == CompactionLifeCycleTracker.DUMMY) {
274      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
275      // the life cycle of a compaction.
276      return DUMMY_COMPLETE_TRACKER;
277    } else {
278      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
279    }
280  }
281
282  @Override
283  public synchronized void requestCompaction(HRegion region, String why, int priority,
284      CompactionLifeCycleTracker tracker, User user) throws IOException {
285    requestCompactionInternal(region, why, priority, true, tracker,
286      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
287  }
288
289  @Override
290  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
291      CompactionLifeCycleTracker tracker, User user) throws IOException {
292    requestCompactionInternal(region, store, why, priority, true, tracker,
293      getCompleteTracker(tracker, () -> 1), user);
294  }
295
296  @Override
297  public void switchCompaction(boolean onOrOff) {
298    if (onOrOff) {
299      // re-create executor pool if compactions are disabled.
300      if (!isCompactionsEnabled()) {
301        LOG.info("Re-Initializing compactions because user switched on compactions");
302        reInitializeCompactionsExecutors();
303      }
304    } else {
305      LOG.info("Interrupting running compactions because user switched off compactions");
306      interrupt();
307    }
308    setCompactionsEnabled(onOrOff);
309  }
310
311  private void requestCompactionInternal(HRegion region, String why, int priority,
312      boolean selectNow, CompactionLifeCycleTracker tracker,
313      CompactionCompleteTracker completeTracker, User user) throws IOException {
314    // request compaction on all stores
315    for (HStore store : region.stores.values()) {
316      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
317        user);
318    }
319  }
320
321  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
322      boolean selectNow, CompactionLifeCycleTracker tracker,
323      CompactionCompleteTracker completeTracker, User user) throws IOException {
324    if (this.server.isStopped() || (region.getTableDescriptor() != null &&
325        !region.getTableDescriptor().isCompactionEnabled())) {
326      return;
327    }
328    RegionServerSpaceQuotaManager spaceQuotaManager =
329        this.server.getRegionServerSpaceQuotaManager();
330
331    if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
332        && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
333      // Enter here only when:
334      // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
335      String reason = "Ignoring compaction request for " + region +
336          " as an active space quota violation " + " policy disallows compactions.";
337      tracker.notExecuted(store, reason);
338      completeTracker.completed(store);
339      LOG.debug(reason);
340      return;
341    }
342
343    CompactionContext compaction;
344    if (selectNow) {
345      Optional<CompactionContext> c =
346        selectCompaction(region, store, priority, tracker, completeTracker, user);
347      if (!c.isPresent()) {
348        // message logged inside
349        return;
350      }
351      compaction = c.get();
352    } else {
353      compaction = null;
354    }
355
356    ThreadPoolExecutor pool;
357    if (selectNow) {
358      // compaction.get is safe as we will just return if selectNow is true but no compaction is
359      // selected
360      pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
361          : shortCompactions;
362    } else {
363      // We assume that most compactions are small. So, put system compactions into small
364      // pool; we will do selection there, and move to large pool if necessary.
365      pool = shortCompactions;
366    }
367    pool.execute(
368      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
369    region.incrementCompactionsQueuedCount();
370    if (LOG.isDebugEnabled()) {
371      String type = (pool == shortCompactions) ? "Small " : "Large ";
372      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
373          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
374    }
375  }
376
377  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
378    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
379      DUMMY_COMPLETE_TRACKER, null);
380  }
381
382  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
383      throws IOException {
384    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
385      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
386  }
387
388  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
389      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
390      throws IOException {
391    // don't even select for compaction if disableCompactions is set to true
392    if (!isCompactionsEnabled()) {
393      LOG.info(String.format("User has disabled compactions"));
394      return Optional.empty();
395    }
396    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
397    if (!compaction.isPresent() && region.getRegionInfo() != null) {
398      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
399          " because compaction request was cancelled";
400      tracker.notExecuted(store, reason);
401      completeTracker.completed(store);
402      LOG.debug(reason);
403    }
404    return compaction;
405  }
406
407  /**
408   * Only interrupt once it's done with a run through the work loop.
409   */
410  void interruptIfNecessary() {
411    splits.shutdown();
412    longCompactions.shutdown();
413    shortCompactions.shutdown();
414  }
415
416  private void waitFor(ThreadPoolExecutor t, String name) {
417    boolean done = false;
418    while (!done) {
419      try {
420        done = t.awaitTermination(60, TimeUnit.SECONDS);
421        LOG.info("Waiting for " + name + " to finish...");
422        if (!done) {
423          t.shutdownNow();
424        }
425      } catch (InterruptedException ie) {
426        LOG.warn("Interrupted waiting for " + name + " to finish...");
427        t.shutdownNow();
428      }
429    }
430  }
431
432  void join() {
433    waitFor(splits, "Split Thread");
434    waitFor(longCompactions, "Large Compaction Thread");
435    waitFor(shortCompactions, "Small Compaction Thread");
436  }
437
438  /**
439   * Returns the current size of the queue containing regions that are
440   * processed.
441   *
442   * @return The current size of the regions queue.
443   */
444  public int getCompactionQueueSize() {
445    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
446  }
447
448  public int getLargeCompactionQueueSize() {
449    return longCompactions.getQueue().size();
450  }
451
452
453  public int getSmallCompactionQueueSize() {
454    return shortCompactions.getQueue().size();
455  }
456
457  public int getSplitQueueSize() {
458    return splits.getQueue().size();
459  }
460
461  private boolean shouldSplitRegion() {
462    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
463      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
464          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
465    }
466    return (regionSplitLimit > server.getNumberOfOnlineRegions());
467  }
468
469  /**
470   * @return the regionSplitLimit
471   */
472  public int getRegionSplitLimit() {
473    return this.regionSplitLimit;
474  }
475
476  private static final Comparator<Runnable> COMPARATOR =
477      new Comparator<Runnable>() {
478
479    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
480      if (r1 == r2) {
481        return 0; //they are the same request
482      }
483      // less first
484      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
485      if (cmp != 0) {
486        return cmp;
487      }
488      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
489      if (cmp != 0) {
490        return cmp;
491      }
492
493      // break the tie based on hash code
494      return System.identityHashCode(r1) - System.identityHashCode(r2);
495    }
496
497    @Override
498    public int compare(Runnable r1, Runnable r2) {
499      // CompactionRunner first
500      if (r1 instanceof CompactionRunner) {
501        if (!(r2 instanceof CompactionRunner)) {
502          return -1;
503        }
504      } else {
505        if (r2 instanceof CompactionRunner) {
506          return 1;
507        } else {
508          // break the tie based on hash code
509          return System.identityHashCode(r1) - System.identityHashCode(r2);
510        }
511      }
512      CompactionRunner o1 = (CompactionRunner) r1;
513      CompactionRunner o2 = (CompactionRunner) r2;
514      // less first
515      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
516      if (cmp != 0) {
517        return cmp;
518      }
519      CompactionContext c1 = o1.compaction;
520      CompactionContext c2 = o2.compaction;
521      if (c1 != null) {
522        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
523      } else {
524        return c2 != null ? 1 : 0;
525      }
526    }
527  };
528
529  private final class CompactionRunner implements Runnable {
530    private final HStore store;
531    private final HRegion region;
532    private final CompactionContext compaction;
533    private final CompactionLifeCycleTracker tracker;
534    private final CompactionCompleteTracker completeTracker;
535    private int queuedPriority;
536    private ThreadPoolExecutor parent;
537    private User user;
538    private long time;
539
540    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
541        CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
542        ThreadPoolExecutor parent, User user) {
543      this.store = store;
544      this.region = region;
545      this.compaction = compaction;
546      this.tracker = tracker;
547      this.completeTracker = completeTracker;
548      this.queuedPriority =
549          compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
550      this.parent = parent;
551      this.user = user;
552      this.time = EnvironmentEdgeManager.currentTime();
553    }
554
555    @Override
556    public String toString() {
557      if (compaction != null) {
558        return "Request=" + compaction.getRequest();
559      } else {
560        return "region=" + region.toString() + ", storeName=" + store.toString() +
561            ", priority=" + queuedPriority + ", startTime=" + time;
562      }
563    }
564
565    private void doCompaction(User user) {
566      CompactionContext c;
567      // Common case - system compaction without a file selection. Select now.
568      if (compaction == null) {
569        int oldPriority = this.queuedPriority;
570        this.queuedPriority = this.store.getCompactPriority();
571        if (this.queuedPriority > oldPriority) {
572          // Store priority decreased while we were in queue (due to some other compaction?),
573          // requeue with new priority to avoid blocking potential higher priorities.
574          this.parent.execute(this);
575          return;
576        }
577        Optional<CompactionContext> selected;
578        try {
579          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
580            completeTracker, user);
581        } catch (IOException ex) {
582          LOG.error("Compaction selection failed " + this, ex);
583          server.checkFileSystem();
584          region.decrementCompactionsQueuedCount();
585          return;
586        }
587        if (!selected.isPresent()) {
588          region.decrementCompactionsQueuedCount();
589          return; // nothing to do
590        }
591        c = selected.get();
592        assert c.hasSelection();
593        // Now see if we are in correct pool for the size; if not, go to the correct one.
594        // We might end up waiting for a while, so cancel the selection.
595
596        ThreadPoolExecutor pool =
597            store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
598
599        // Long compaction pool can process small job
600        // Short compaction pool should not process large job
601        if (this.parent == shortCompactions && pool == longCompactions) {
602          this.store.cancelRequestedCompaction(c);
603          this.parent = pool;
604          this.parent.execute(this);
605          return;
606        }
607      } else {
608        c = compaction;
609      }
610      // Finally we can compact something.
611      assert c != null;
612
613      tracker.beforeExecution(store);
614      try {
615        // Note: please don't put single-compaction logic here;
616        //       put it into region/store/etc. This is CST logic.
617        long start = EnvironmentEdgeManager.currentTime();
618        boolean completed =
619            region.compact(c, store, compactionThroughputController, user);
620        long now = EnvironmentEdgeManager.currentTime();
621        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
622              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
623        if (completed) {
624          // degenerate case: blocked regions require recursive enqueues
625          if (store.getCompactPriority() <= 0) {
626            requestSystemCompaction(region, store, "Recursive enqueue");
627          } else {
628            // see if the compaction has caused us to exceed max region size
629            requestSplit(region);
630          }
631        }
632      } catch (IOException ex) {
633        IOException remoteEx =
634            ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
635        LOG.error("Compaction failed " + this, remoteEx);
636        if (remoteEx != ex) {
637          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
638        }
639        region.reportCompactionRequestFailure();
640        server.checkFileSystem();
641      } catch (Exception ex) {
642        LOG.error("Compaction failed " + this, ex);
643        region.reportCompactionRequestFailure();
644        server.checkFileSystem();
645      } finally {
646        tracker.afterExecution(store);
647        completeTracker.completed(store);
648        region.decrementCompactionsQueuedCount();
649        LOG.debug("Status {}", CompactSplit.this);
650      }
651    }
652
653    @Override
654    public void run() {
655      Preconditions.checkNotNull(server);
656      if (server.isStopped() || (region.getTableDescriptor() != null &&
657          !region.getTableDescriptor().isCompactionEnabled())) {
658        region.decrementCompactionsQueuedCount();
659        return;
660      }
661      doCompaction(user);
662    }
663
664    private String formatStackTrace(Exception ex) {
665      StringWriter sw = new StringWriter();
666      PrintWriter pw = new PrintWriter(sw);
667      ex.printStackTrace(pw);
668      pw.flush();
669      return sw.toString();
670    }
671  }
672
673  /**
674   * Cleanup class to use when rejecting a compaction request from the queue.
675   */
676  private static class Rejection implements RejectedExecutionHandler {
677    @Override
678    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
679      if (runnable instanceof CompactionRunner) {
680        CompactionRunner runner = (CompactionRunner) runnable;
681        LOG.debug("Compaction Rejected: " + runner);
682        if (runner.compaction != null) {
683          runner.store.cancelRequestedCompaction(runner.compaction);
684        }
685      }
686    }
687  }
688
689  /**
690   * {@inheritDoc}
691   */
692  @Override
693  public void onConfigurationChange(Configuration newConf) {
694    // Check if number of large / small compaction threads has changed, and then
695    // adjust the core pool size of the thread pools, by using the
696    // setCorePoolSize() method. According to the javadocs, it is safe to
697    // change the core pool size on-the-fly. We need to reset the maximum
698    // pool size, as well.
699    int largeThreads = Math.max(1, newConf.getInt(
700            LARGE_COMPACTION_THREADS,
701            LARGE_COMPACTION_THREADS_DEFAULT));
702    if (this.longCompactions.getCorePoolSize() != largeThreads) {
703      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
704              " from " + this.longCompactions.getCorePoolSize() + " to " +
705              largeThreads);
706      if(this.longCompactions.getCorePoolSize() < largeThreads) {
707        this.longCompactions.setMaximumPoolSize(largeThreads);
708        this.longCompactions.setCorePoolSize(largeThreads);
709      } else {
710        this.longCompactions.setCorePoolSize(largeThreads);
711        this.longCompactions.setMaximumPoolSize(largeThreads);
712      }
713    }
714
715    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
716            SMALL_COMPACTION_THREADS_DEFAULT);
717    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
718      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
719                " from " + this.shortCompactions.getCorePoolSize() + " to " +
720                smallThreads);
721      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
722        this.shortCompactions.setMaximumPoolSize(smallThreads);
723        this.shortCompactions.setCorePoolSize(smallThreads);
724      } else {
725        this.shortCompactions.setCorePoolSize(smallThreads);
726        this.shortCompactions.setMaximumPoolSize(smallThreads);
727      }
728    }
729
730    int splitThreads = newConf.getInt(SPLIT_THREADS,
731            SPLIT_THREADS_DEFAULT);
732    if (this.splits.getCorePoolSize() != splitThreads) {
733      LOG.info("Changing the value of " + SPLIT_THREADS +
734                " from " + this.splits.getCorePoolSize() + " to " +
735                splitThreads);
736      if(this.splits.getCorePoolSize() < splitThreads) {
737        this.splits.setMaximumPoolSize(splitThreads);
738        this.splits.setCorePoolSize(splitThreads);
739      } else {
740        this.splits.setCorePoolSize(splitThreads);
741        this.splits.setMaximumPoolSize(splitThreads);
742      }
743    }
744
745    ThroughputController old = this.compactionThroughputController;
746    if (old != null) {
747      old.stop("configuration change");
748    }
749    this.compactionThroughputController =
750        CompactionThroughputControllerFactory.create(server, newConf);
751
752    // We change this atomically here instead of reloading the config in order that upstream
753    // would be the only one with the flexibility to reload the config.
754    this.conf.reloadConfiguration();
755  }
756
757  protected int getSmallCompactionThreadNum() {
758    return this.shortCompactions.getCorePoolSize();
759  }
760
761  protected int getLargeCompactionThreadNum() {
762    return this.longCompactions.getCorePoolSize();
763  }
764
765  protected int getSplitThreadNum() {
766    return this.splits.getCorePoolSize();
767  }
768
769  /**
770   * {@inheritDoc}
771   */
772  @Override
773  public void registerChildren(ConfigurationManager manager) {
774    // No children to register.
775  }
776
777  /**
778   * {@inheritDoc}
779   */
780  @Override
781  public void deregisterChildren(ConfigurationManager manager) {
782    // No children to register
783  }
784
785  @VisibleForTesting
786  public ThroughputController getCompactionThroughputController() {
787    return compactionThroughputController;
788  }
789
790  @VisibleForTesting
791  /**
792   * Shutdown the long compaction thread pool.
793   * Should only be used in unit test to prevent long compaction thread pool from stealing job
794   * from short compaction queue
795   */
796  void shutdownLongCompactions(){
797    this.longCompactions.shutdown();
798  }
799
800  public void clearLongCompactionsQueue() {
801    longCompactions.getQueue().clear();
802  }
803
804  public void clearShortCompactionsQueue() {
805    shortCompactions.getQueue().clear();
806  }
807
808  public boolean isCompactionsEnabled() {
809    return compactionsEnabled;
810  }
811
812  public void setCompactionsEnabled(boolean compactionsEnabled) {
813    this.compactionsEnabled = compactionsEnabled;
814    this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled));
815  }
816
817  /**
818   * @return the longCompactions thread pool executor
819   */
820  ThreadPoolExecutor getLongCompactions() {
821    return longCompactions;
822  }
823
824  /**
825   * @return the shortCompactions thread pool executor
826   */
827  ThreadPoolExecutor getShortCompactions() {
828    return shortCompactions;
829  }
830
831}