001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
023
024import java.io.IOException;
025import java.io.PrintWriter;
026import java.io.StringWriter;
027import java.util.Comparator;
028import java.util.Iterator;
029import java.util.Optional;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.Executors;
032import java.util.concurrent.RejectedExecutionException;
033import java.util.concurrent.RejectedExecutionHandler;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.function.IntSupplier;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.conf.ConfigurationManager;
041import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
042import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
047import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
049import org.apache.hadoop.hbase.security.Superusers;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.StealJobQueue;
053import org.apache.hadoop.ipc.RemoteException;
054import org.apache.hadoop.util.StringUtils;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
062
063/**
064 * Compact region on request and then run split if appropriate
065 */
066@InterfaceAudience.Private
067public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
068  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
069
070  // Configuration key for the large compaction threads.
071  public final static String LARGE_COMPACTION_THREADS =
072      "hbase.regionserver.thread.compaction.large";
073  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
074
075  // Configuration key for the small compaction threads.
076  public final static String SMALL_COMPACTION_THREADS =
077      "hbase.regionserver.thread.compaction.small";
078  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
079
080  // Configuration key for split threads
081  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
082  public final static int SPLIT_THREADS_DEFAULT = 1;
083
084  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
085      "hbase.regionserver.regionSplitLimit";
086  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
087
088  private final HRegionServer server;
089  private final Configuration conf;
090
091  private final ThreadPoolExecutor longCompactions;
092  private final ThreadPoolExecutor shortCompactions;
093  private final ThreadPoolExecutor splits;
094
095  private volatile ThroughputController compactionThroughputController;
096
097  /**
098   * Splitting should not take place if the total number of regions exceed this.
099   * This is not a hard limit to the number of regions but it is a guideline to
100   * stop splitting after number of online regions is greater than this.
101   */
102  private int regionSplitLimit;
103
104  /** @param server */
105  CompactSplit(HRegionServer server) {
106    this.server = server;
107    this.conf = server.getConfiguration();
108    this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
109        DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
110
111    int largeThreads = Math.max(1, conf.getInt(
112        LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
113    int smallThreads = conf.getInt(
114        SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
115
116    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
117
118    // if we have throttle threads, make sure the user also specified size
119    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
120
121    final String n = Thread.currentThread().getName();
122
123    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
124    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
125        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d")
126            .setDaemon(true).build());
127    this.longCompactions.setRejectedExecutionHandler(new Rejection());
128    this.longCompactions.prestartAllCoreThreads();
129    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
130        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
131            .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
132    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
133    this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
134        new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
135
136    // compaction throughput controller
137    this.compactionThroughputController =
138        CompactionThroughputControllerFactory.create(server, conf);
139  }
140
141  @Override
142  public String toString() {
143    return "compactionQueue=(longCompactions="
144        + longCompactions.getQueue().size() + ":shortCompactions="
145        + shortCompactions.getQueue().size() + ")"
146        + ", splitQueue=" + splits.getQueue().size();
147  }
148
149  public String dumpQueue() {
150    StringBuilder queueLists = new StringBuilder();
151    queueLists.append("Compaction/Split Queue dump:\n");
152    queueLists.append("  LargeCompation Queue:\n");
153    BlockingQueue<Runnable> lq = longCompactions.getQueue();
154    Iterator<Runnable> it = lq.iterator();
155    while (it.hasNext()) {
156      queueLists.append("    " + it.next().toString());
157      queueLists.append("\n");
158    }
159
160    if (shortCompactions != null) {
161      queueLists.append("\n");
162      queueLists.append("  SmallCompation Queue:\n");
163      lq = shortCompactions.getQueue();
164      it = lq.iterator();
165      while (it.hasNext()) {
166        queueLists.append("    " + it.next().toString());
167        queueLists.append("\n");
168      }
169    }
170
171    queueLists.append("\n");
172    queueLists.append("  Split Queue:\n");
173    lq = splits.getQueue();
174    it = lq.iterator();
175    while (it.hasNext()) {
176      queueLists.append("    " + it.next().toString());
177      queueLists.append("\n");
178    }
179
180    return queueLists.toString();
181  }
182
183  public synchronized boolean requestSplit(final Region r) {
184    // don't split regions that are blocking
185    if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) {
186      byte[] midKey = ((HRegion)r).checkSplit();
187      if (midKey != null) {
188        requestSplit(r, midKey);
189        return true;
190      }
191    }
192    return false;
193  }
194
195  public synchronized void requestSplit(final Region r, byte[] midKey) {
196    requestSplit(r, midKey, null);
197  }
198
199  /*
200   * The User parameter allows the split thread to assume the correct user identity
201   */
202  public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
203    if (midKey == null) {
204      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
205        " not splittable because midkey=null");
206      if (((HRegion)r).shouldForceSplit()) {
207        ((HRegion)r).clearSplit();
208      }
209      return;
210    }
211    try {
212      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
213      if (LOG.isDebugEnabled()) {
214        LOG.debug("Splitting " + r + ", " + this);
215      }
216    } catch (RejectedExecutionException ree) {
217      LOG.info("Could not execute split for " + r, ree);
218    }
219  }
220
221  private interface CompactionCompleteTracker {
222
223    default void completed(Store store) {
224    }
225  }
226
227  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
228      new CompactionCompleteTracker() {
229      };
230
231  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
232
233    private final CompactionLifeCycleTracker tracker;
234
235    private final AtomicInteger remaining;
236
237    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
238      this.tracker = tracker;
239      this.remaining = new AtomicInteger(numberOfStores);
240    }
241
242    @Override
243    public void completed(Store store) {
244      if (remaining.decrementAndGet() == 0) {
245        tracker.completed();
246      }
247    }
248  }
249
250  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
251      IntSupplier numberOfStores) {
252    if (tracker == CompactionLifeCycleTracker.DUMMY) {
253      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
254      // the life cycle of a compaction.
255      return DUMMY_COMPLETE_TRACKER;
256    } else {
257      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
258    }
259  }
260
261  @Override
262  public synchronized void requestCompaction(HRegion region, String why, int priority,
263      CompactionLifeCycleTracker tracker, User user) throws IOException {
264    requestCompactionInternal(region, why, priority, true, tracker,
265      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
266  }
267
268  @Override
269  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
270      CompactionLifeCycleTracker tracker, User user) throws IOException {
271    requestCompactionInternal(region, store, why, priority, true, tracker,
272      getCompleteTracker(tracker, () -> 1), user);
273  }
274
275  private void requestCompactionInternal(HRegion region, String why, int priority,
276      boolean selectNow, CompactionLifeCycleTracker tracker,
277      CompactionCompleteTracker completeTracker, User user) throws IOException {
278    // request compaction on all stores
279    for (HStore store : region.stores.values()) {
280      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
281        user);
282    }
283  }
284
285  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
286      boolean selectNow, CompactionLifeCycleTracker tracker,
287      CompactionCompleteTracker completeTracker, User user) throws IOException {
288    if (this.server.isStopped() || (region.getTableDescriptor() != null &&
289        !region.getTableDescriptor().isCompactionEnabled())) {
290      return;
291    }
292    RegionServerSpaceQuotaManager spaceQuotaManager =
293        this.server.getRegionServerSpaceQuotaManager();
294
295    if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
296        && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
297      // Enter here only when:
298      // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
299      String reason = "Ignoring compaction request for " + region +
300          " as an active space quota violation " + " policy disallows compactions.";
301      tracker.notExecuted(store, reason);
302      completeTracker.completed(store);
303      LOG.debug(reason);
304      return;
305    }
306
307    CompactionContext compaction;
308    if (selectNow) {
309      Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user);
310      if (!c.isPresent()) {
311        // message logged inside
312        return;
313      }
314      compaction = c.get();
315    } else {
316      compaction = null;
317    }
318
319    ThreadPoolExecutor pool;
320    if (selectNow) {
321      // compaction.get is safe as we will just return if selectNow is true but no compaction is
322      // selected
323      pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
324          : shortCompactions;
325    } else {
326      // We assume that most compactions are small. So, put system compactions into small
327      // pool; we will do selection there, and move to large pool if necessary.
328      pool = shortCompactions;
329    }
330    pool.execute(
331      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
332    region.incrementCompactionsQueuedCount();
333    if (LOG.isDebugEnabled()) {
334      String type = (pool == shortCompactions) ? "Small " : "Large ";
335      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
336          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
337    }
338  }
339
340  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
341    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
342      DUMMY_COMPLETE_TRACKER, null);
343  }
344
345  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
346      throws IOException {
347    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
348      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
349  }
350
351  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
352      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
353      throws IOException {
354    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
355    if (!compaction.isPresent() && region.getRegionInfo() != null) {
356      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
357          " because compaction request was cancelled";
358      tracker.notExecuted(store, reason);
359      completeTracker.completed(store);
360      LOG.debug(reason);
361    }
362    return compaction;
363  }
364
365  /**
366   * Only interrupt once it's done with a run through the work loop.
367   */
368  void interruptIfNecessary() {
369    splits.shutdown();
370    longCompactions.shutdown();
371    shortCompactions.shutdown();
372  }
373
374  private void waitFor(ThreadPoolExecutor t, String name) {
375    boolean done = false;
376    while (!done) {
377      try {
378        done = t.awaitTermination(60, TimeUnit.SECONDS);
379        LOG.info("Waiting for " + name + " to finish...");
380        if (!done) {
381          t.shutdownNow();
382        }
383      } catch (InterruptedException ie) {
384        LOG.warn("Interrupted waiting for " + name + " to finish...");
385        t.shutdownNow();
386      }
387    }
388  }
389
390  void join() {
391    waitFor(splits, "Split Thread");
392    waitFor(longCompactions, "Large Compaction Thread");
393    waitFor(shortCompactions, "Small Compaction Thread");
394  }
395
396  /**
397   * Returns the current size of the queue containing regions that are
398   * processed.
399   *
400   * @return The current size of the regions queue.
401   */
402  public int getCompactionQueueSize() {
403    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
404  }
405
406  public int getLargeCompactionQueueSize() {
407    return longCompactions.getQueue().size();
408  }
409
410
411  public int getSmallCompactionQueueSize() {
412    return shortCompactions.getQueue().size();
413  }
414
415  public int getSplitQueueSize() {
416    return splits.getQueue().size();
417  }
418
419  private boolean shouldSplitRegion() {
420    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
421      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
422          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
423    }
424    return (regionSplitLimit > server.getNumberOfOnlineRegions());
425  }
426
427  /**
428   * @return the regionSplitLimit
429   */
430  public int getRegionSplitLimit() {
431    return this.regionSplitLimit;
432  }
433
434  private static final Comparator<Runnable> COMPARATOR =
435      new Comparator<Runnable>() {
436
437    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
438      if (r1 == r2) {
439        return 0; //they are the same request
440      }
441      // less first
442      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
443      if (cmp != 0) {
444        return cmp;
445      }
446      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
447      if (cmp != 0) {
448        return cmp;
449      }
450
451      // break the tie based on hash code
452      return System.identityHashCode(r1) - System.identityHashCode(r2);
453    }
454
455    @Override
456    public int compare(Runnable r1, Runnable r2) {
457      // CompactionRunner first
458      if (r1 instanceof CompactionRunner) {
459        if (!(r2 instanceof CompactionRunner)) {
460          return -1;
461        }
462      } else {
463        if (r2 instanceof CompactionRunner) {
464          return 1;
465        } else {
466          // break the tie based on hash code
467          return System.identityHashCode(r1) - System.identityHashCode(r2);
468        }
469      }
470      CompactionRunner o1 = (CompactionRunner) r1;
471      CompactionRunner o2 = (CompactionRunner) r2;
472      // less first
473      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
474      if (cmp != 0) {
475        return cmp;
476      }
477      CompactionContext c1 = o1.compaction;
478      CompactionContext c2 = o2.compaction;
479      if (c1 != null) {
480        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
481      } else {
482        return c2 != null ? 1 : 0;
483      }
484    }
485  };
486
487  private final class CompactionRunner implements Runnable {
488    private final HStore store;
489    private final HRegion region;
490    private final CompactionContext compaction;
491    private final CompactionLifeCycleTracker tracker;
492    private final CompactionCompleteTracker completeTracker;
493    private int queuedPriority;
494    private ThreadPoolExecutor parent;
495    private User user;
496    private long time;
497
498    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
499        CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
500        ThreadPoolExecutor parent, User user) {
501      this.store = store;
502      this.region = region;
503      this.compaction = compaction;
504      this.tracker = tracker;
505      this.completeTracker = completeTracker;
506      this.queuedPriority =
507          compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
508      this.parent = parent;
509      this.user = user;
510      this.time = EnvironmentEdgeManager.currentTime();
511    }
512
513    @Override
514    public String toString() {
515      if (compaction != null) {
516        return "Request=" + compaction.getRequest();
517      } else {
518        return "region=" + region.toString() + ", storeName=" + store.toString() +
519            ", priority=" + queuedPriority + ", startTime=" + time;
520      }
521    }
522
523    private void doCompaction(User user) {
524      CompactionContext c;
525      // Common case - system compaction without a file selection. Select now.
526      if (compaction == null) {
527        int oldPriority = this.queuedPriority;
528        this.queuedPriority = this.store.getCompactPriority();
529        if (this.queuedPriority > oldPriority) {
530          // Store priority decreased while we were in queue (due to some other compaction?),
531          // requeue with new priority to avoid blocking potential higher priorities.
532          this.parent.execute(this);
533          return;
534        }
535        Optional<CompactionContext> selected;
536        try {
537          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
538            completeTracker, user);
539        } catch (IOException ex) {
540          LOG.error("Compaction selection failed " + this, ex);
541          server.checkFileSystem();
542          region.decrementCompactionsQueuedCount();
543          return;
544        }
545        if (!selected.isPresent()) {
546          region.decrementCompactionsQueuedCount();
547          return; // nothing to do
548        }
549        c = selected.get();
550        assert c.hasSelection();
551        // Now see if we are in correct pool for the size; if not, go to the correct one.
552        // We might end up waiting for a while, so cancel the selection.
553
554        ThreadPoolExecutor pool =
555            store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
556
557        // Long compaction pool can process small job
558        // Short compaction pool should not process large job
559        if (this.parent == shortCompactions && pool == longCompactions) {
560          this.store.cancelRequestedCompaction(c);
561          this.parent = pool;
562          this.parent.execute(this);
563          return;
564        }
565      } else {
566        c = compaction;
567      }
568      // Finally we can compact something.
569      assert c != null;
570
571      tracker.beforeExecution(store);
572      try {
573        // Note: please don't put single-compaction logic here;
574        //       put it into region/store/etc. This is CST logic.
575        long start = EnvironmentEdgeManager.currentTime();
576        boolean completed =
577            region.compact(c, store, compactionThroughputController, user);
578        long now = EnvironmentEdgeManager.currentTime();
579        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
580              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
581        if (completed) {
582          // degenerate case: blocked regions require recursive enqueues
583          if (store.getCompactPriority() <= 0) {
584            requestSystemCompaction(region, store, "Recursive enqueue");
585          } else {
586            // see if the compaction has caused us to exceed max region size
587            requestSplit(region);
588          }
589        }
590      } catch (IOException ex) {
591        IOException remoteEx =
592            ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
593        LOG.error("Compaction failed " + this, remoteEx);
594        if (remoteEx != ex) {
595          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
596        }
597        region.reportCompactionRequestFailure();
598        server.checkFileSystem();
599      } catch (Exception ex) {
600        LOG.error("Compaction failed " + this, ex);
601        region.reportCompactionRequestFailure();
602        server.checkFileSystem();
603      } finally {
604        tracker.afterExecution(store);
605        completeTracker.completed(store);
606        region.decrementCompactionsQueuedCount();
607        LOG.debug("Status {}", CompactSplit.this);
608      }
609    }
610
611    @Override
612    public void run() {
613      Preconditions.checkNotNull(server);
614      if (server.isStopped()
615          || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) {
616        region.decrementCompactionsQueuedCount();
617        return;
618      }
619      doCompaction(user);
620    }
621
622    private String formatStackTrace(Exception ex) {
623      StringWriter sw = new StringWriter();
624      PrintWriter pw = new PrintWriter(sw);
625      ex.printStackTrace(pw);
626      pw.flush();
627      return sw.toString();
628    }
629  }
630
631  /**
632   * Cleanup class to use when rejecting a compaction request from the queue.
633   */
634  private static class Rejection implements RejectedExecutionHandler {
635    @Override
636    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
637      if (runnable instanceof CompactionRunner) {
638        CompactionRunner runner = (CompactionRunner) runnable;
639        LOG.debug("Compaction Rejected: " + runner);
640        if (runner.compaction != null) {
641          runner.store.cancelRequestedCompaction(runner.compaction);
642        }
643      }
644    }
645  }
646
647  /**
648   * {@inheritDoc}
649   */
650  @Override
651  public void onConfigurationChange(Configuration newConf) {
652    // Check if number of large / small compaction threads has changed, and then
653    // adjust the core pool size of the thread pools, by using the
654    // setCorePoolSize() method. According to the javadocs, it is safe to
655    // change the core pool size on-the-fly. We need to reset the maximum
656    // pool size, as well.
657    int largeThreads = Math.max(1, newConf.getInt(
658            LARGE_COMPACTION_THREADS,
659            LARGE_COMPACTION_THREADS_DEFAULT));
660    if (this.longCompactions.getCorePoolSize() != largeThreads) {
661      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
662              " from " + this.longCompactions.getCorePoolSize() + " to " +
663              largeThreads);
664      if(this.longCompactions.getCorePoolSize() < largeThreads) {
665        this.longCompactions.setMaximumPoolSize(largeThreads);
666        this.longCompactions.setCorePoolSize(largeThreads);
667      } else {
668        this.longCompactions.setCorePoolSize(largeThreads);
669        this.longCompactions.setMaximumPoolSize(largeThreads);
670      }
671    }
672
673    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
674            SMALL_COMPACTION_THREADS_DEFAULT);
675    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
676      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
677                " from " + this.shortCompactions.getCorePoolSize() + " to " +
678                smallThreads);
679      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
680        this.shortCompactions.setMaximumPoolSize(smallThreads);
681        this.shortCompactions.setCorePoolSize(smallThreads);
682      } else {
683        this.shortCompactions.setCorePoolSize(smallThreads);
684        this.shortCompactions.setMaximumPoolSize(smallThreads);
685      }
686    }
687
688    int splitThreads = newConf.getInt(SPLIT_THREADS,
689            SPLIT_THREADS_DEFAULT);
690    if (this.splits.getCorePoolSize() != splitThreads) {
691      LOG.info("Changing the value of " + SPLIT_THREADS +
692                " from " + this.splits.getCorePoolSize() + " to " +
693                splitThreads);
694      if(this.splits.getCorePoolSize() < splitThreads) {
695        this.splits.setMaximumPoolSize(splitThreads);
696        this.splits.setCorePoolSize(splitThreads);
697      } else {
698        this.splits.setCorePoolSize(splitThreads);
699        this.splits.setMaximumPoolSize(splitThreads);
700      }
701    }
702
703    ThroughputController old = this.compactionThroughputController;
704    if (old != null) {
705      old.stop("configuration change");
706    }
707    this.compactionThroughputController =
708        CompactionThroughputControllerFactory.create(server, newConf);
709
710    // We change this atomically here instead of reloading the config in order that upstream
711    // would be the only one with the flexibility to reload the config.
712    this.conf.reloadConfiguration();
713  }
714
715  protected int getSmallCompactionThreadNum() {
716    return this.shortCompactions.getCorePoolSize();
717  }
718
719  protected int getLargeCompactionThreadNum() {
720    return this.longCompactions.getCorePoolSize();
721  }
722
723  protected int getSplitThreadNum() {
724    return this.splits.getCorePoolSize();
725  }
726
727  /**
728   * {@inheritDoc}
729   */
730  @Override
731  public void registerChildren(ConfigurationManager manager) {
732    // No children to register.
733  }
734
735  /**
736   * {@inheritDoc}
737   */
738  @Override
739  public void deregisterChildren(ConfigurationManager manager) {
740    // No children to register
741  }
742
743  @VisibleForTesting
744  public ThroughputController getCompactionThroughputController() {
745    return compactionThroughputController;
746  }
747
748  @VisibleForTesting
749  /**
750   * Shutdown the long compaction thread pool.
751   * Should only be used in unit test to prevent long compaction thread pool from stealing job
752   * from short compaction queue
753   */
754  void shutdownLongCompactions(){
755    this.longCompactions.shutdown();
756  }
757
758  public void clearLongCompactionsQueue() {
759    longCompactions.getQueue().clear();
760  }
761
762  public void clearShortCompactionsQueue() {
763    shortCompactions.getQueue().clear();
764  }
765}