001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
022import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
023
024import java.io.IOException;
025import java.io.PrintWriter;
026import java.io.StringWriter;
027import java.util.Comparator;
028import java.util.Iterator;
029import java.util.Optional;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.Executors;
032import java.util.concurrent.RejectedExecutionException;
033import java.util.concurrent.RejectedExecutionHandler;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicInteger;
038import java.util.function.IntSupplier;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.conf.ConfigurationManager;
042import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
043import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
047import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
048import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
049import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.StealJobQueue;
053import org.apache.hadoop.ipc.RemoteException;
054import org.apache.hadoop.util.StringUtils;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060
061/**
062 * Compact region on request and then run split if appropriate
063 */
064@InterfaceAudience.Private
065public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
066  private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
067
068  // Configuration key for the large compaction threads.
069  public final static String LARGE_COMPACTION_THREADS =
070      "hbase.regionserver.thread.compaction.large";
071  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
072
073  // Configuration key for the small compaction threads.
074  public final static String SMALL_COMPACTION_THREADS =
075      "hbase.regionserver.thread.compaction.small";
076  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
077
078  // Configuration key for split threads
079  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
080  public final static int SPLIT_THREADS_DEFAULT = 1;
081
082  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
083      "hbase.regionserver.regionSplitLimit";
084  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
085
086  private final HRegionServer server;
087  private final Configuration conf;
088
089  private final ThreadPoolExecutor longCompactions;
090  private final ThreadPoolExecutor shortCompactions;
091  private final ThreadPoolExecutor splits;
092
093  private volatile ThroughputController compactionThroughputController;
094
095  /**
096   * Splitting should not take place if the total number of regions exceed this.
097   * This is not a hard limit to the number of regions but it is a guideline to
098   * stop splitting after number of online regions is greater than this.
099   */
100  private int regionSplitLimit;
101
102  /** @param server */
103  CompactSplit(HRegionServer server) {
104    this.server = server;
105    this.conf = server.getConfiguration();
106    this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
107        DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
108
109    int largeThreads = Math.max(1, conf.getInt(
110        LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
111    int smallThreads = conf.getInt(
112        SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
113
114    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
115
116    // if we have throttle threads, make sure the user also specified size
117    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
118
119    final String n = Thread.currentThread().getName();
120
121    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
122    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
123        60, TimeUnit.SECONDS, stealJobQueue,
124        new ThreadFactory() {
125          @Override
126          public Thread newThread(Runnable r) {
127            String name = n + "-longCompactions-" + System.currentTimeMillis();
128            return new Thread(r, name);
129          }
130      });
131    this.longCompactions.setRejectedExecutionHandler(new Rejection());
132    this.longCompactions.prestartAllCoreThreads();
133    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
134        60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
135        new ThreadFactory() {
136          @Override
137          public Thread newThread(Runnable r) {
138            String name = n + "-shortCompactions-" + System.currentTimeMillis();
139            return new Thread(r, name);
140          }
141      });
142    this.shortCompactions
143        .setRejectedExecutionHandler(new Rejection());
144    this.splits = (ThreadPoolExecutor)
145        Executors.newFixedThreadPool(splitThreads,
146            new ThreadFactory() {
147          @Override
148          public Thread newThread(Runnable r) {
149            String name = n + "-splits-" + System.currentTimeMillis();
150            return new Thread(r, name);
151          }
152      });
153
154    // compaction throughput controller
155    this.compactionThroughputController =
156        CompactionThroughputControllerFactory.create(server, conf);
157  }
158
159  @Override
160  public String toString() {
161    return "compactionQueue=(longCompactions="
162        + longCompactions.getQueue().size() + ":shortCompactions="
163        + shortCompactions.getQueue().size() + ")"
164        + ", splitQueue=" + splits.getQueue().size();
165  }
166
167  public String dumpQueue() {
168    StringBuilder queueLists = new StringBuilder();
169    queueLists.append("Compaction/Split Queue dump:\n");
170    queueLists.append("  LargeCompation Queue:\n");
171    BlockingQueue<Runnable> lq = longCompactions.getQueue();
172    Iterator<Runnable> it = lq.iterator();
173    while (it.hasNext()) {
174      queueLists.append("    " + it.next().toString());
175      queueLists.append("\n");
176    }
177
178    if (shortCompactions != null) {
179      queueLists.append("\n");
180      queueLists.append("  SmallCompation Queue:\n");
181      lq = shortCompactions.getQueue();
182      it = lq.iterator();
183      while (it.hasNext()) {
184        queueLists.append("    " + it.next().toString());
185        queueLists.append("\n");
186      }
187    }
188
189    queueLists.append("\n");
190    queueLists.append("  Split Queue:\n");
191    lq = splits.getQueue();
192    it = lq.iterator();
193    while (it.hasNext()) {
194      queueLists.append("    " + it.next().toString());
195      queueLists.append("\n");
196    }
197
198    return queueLists.toString();
199  }
200
201  public synchronized boolean requestSplit(final Region r) {
202    // don't split regions that are blocking
203    if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) {
204      byte[] midKey = ((HRegion)r).checkSplit();
205      if (midKey != null) {
206        requestSplit(r, midKey);
207        return true;
208      }
209    }
210    return false;
211  }
212
213  public synchronized void requestSplit(final Region r, byte[] midKey) {
214    requestSplit(r, midKey, null);
215  }
216
217  /*
218   * The User parameter allows the split thread to assume the correct user identity
219   */
220  public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
221    if (midKey == null) {
222      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
223        " not splittable because midkey=null");
224      if (((HRegion)r).shouldForceSplit()) {
225        ((HRegion)r).clearSplit();
226      }
227      return;
228    }
229    try {
230      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
231      if (LOG.isDebugEnabled()) {
232        LOG.debug("Splitting " + r + ", " + this);
233      }
234    } catch (RejectedExecutionException ree) {
235      LOG.info("Could not execute split for " + r, ree);
236    }
237  }
238
239  private interface CompactionCompleteTracker {
240
241    default void completed(Store store) {
242    }
243  }
244
245  private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
246      new CompactionCompleteTracker() {
247      };
248
249  private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
250
251    private final CompactionLifeCycleTracker tracker;
252
253    private final AtomicInteger remaining;
254
255    public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
256      this.tracker = tracker;
257      this.remaining = new AtomicInteger(numberOfStores);
258    }
259
260    @Override
261    public void completed(Store store) {
262      if (remaining.decrementAndGet() == 0) {
263        tracker.completed();
264      }
265    }
266  }
267
268  private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
269      IntSupplier numberOfStores) {
270    if (tracker == CompactionLifeCycleTracker.DUMMY) {
271      // a simple optimization to avoid creating unnecessary objects as usually we do not care about
272      // the life cycle of a compaction.
273      return DUMMY_COMPLETE_TRACKER;
274    } else {
275      return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
276    }
277  }
278
279  @Override
280  public synchronized void requestCompaction(HRegion region, String why, int priority,
281      CompactionLifeCycleTracker tracker, User user) throws IOException {
282    requestCompactionInternal(region, why, priority, true, tracker,
283      getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
284  }
285
286  @Override
287  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
288      CompactionLifeCycleTracker tracker, User user) throws IOException {
289    requestCompactionInternal(region, store, why, priority, true, tracker,
290      getCompleteTracker(tracker, () -> 1), user);
291  }
292
293  private void requestCompactionInternal(HRegion region, String why, int priority,
294      boolean selectNow, CompactionLifeCycleTracker tracker,
295      CompactionCompleteTracker completeTracker, User user) throws IOException {
296    // request compaction on all stores
297    for (HStore store : region.stores.values()) {
298      requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
299        user);
300    }
301  }
302
303  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
304      boolean selectNow, CompactionLifeCycleTracker tracker,
305      CompactionCompleteTracker completeTracker, User user) throws IOException {
306    if (this.server.isStopped() || (region.getTableDescriptor() != null &&
307        !region.getTableDescriptor().isCompactionEnabled())) {
308      return;
309    }
310    RegionServerSpaceQuotaManager spaceQuotaManager =
311        this.server.getRegionServerSpaceQuotaManager();
312    if (spaceQuotaManager != null &&
313        spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
314      String reason = "Ignoring compaction request for " + region +
315          " as an active space quota violation " + " policy disallows compactions.";
316      tracker.notExecuted(store, reason);
317      completeTracker.completed(store);
318      LOG.debug(reason);
319      return;
320    }
321
322    CompactionContext compaction;
323    if (selectNow) {
324      Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user);
325      if (!c.isPresent()) {
326        // message logged inside
327        return;
328      }
329      compaction = c.get();
330    } else {
331      compaction = null;
332    }
333
334    ThreadPoolExecutor pool;
335    if (selectNow) {
336      // compaction.get is safe as we will just return if selectNow is true but no compaction is
337      // selected
338      pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
339          : shortCompactions;
340    } else {
341      // We assume that most compactions are small. So, put system compactions into small
342      // pool; we will do selection there, and move to large pool if necessary.
343      pool = shortCompactions;
344    }
345    pool.execute(
346      new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
347    region.incrementCompactionsQueuedCount();
348    if (LOG.isDebugEnabled()) {
349      String type = (pool == shortCompactions) ? "Small " : "Large ";
350      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
351          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
352    }
353  }
354
355  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
356    requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
357      DUMMY_COMPLETE_TRACKER, null);
358  }
359
360  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
361      throws IOException {
362    requestCompactionInternal(region, store, why, NO_PRIORITY, false,
363      CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
364  }
365
366  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
367      CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
368      throws IOException {
369    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
370    if (!compaction.isPresent() && region.getRegionInfo() != null) {
371      String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
372          " because compaction request was cancelled";
373      tracker.notExecuted(store, reason);
374      completeTracker.completed(store);
375      LOG.debug(reason);
376    }
377    return compaction;
378  }
379
380  /**
381   * Only interrupt once it's done with a run through the work loop.
382   */
383  void interruptIfNecessary() {
384    splits.shutdown();
385    longCompactions.shutdown();
386    shortCompactions.shutdown();
387  }
388
389  private void waitFor(ThreadPoolExecutor t, String name) {
390    boolean done = false;
391    while (!done) {
392      try {
393        done = t.awaitTermination(60, TimeUnit.SECONDS);
394        LOG.info("Waiting for " + name + " to finish...");
395        if (!done) {
396          t.shutdownNow();
397        }
398      } catch (InterruptedException ie) {
399        LOG.warn("Interrupted waiting for " + name + " to finish...");
400        t.shutdownNow();
401      }
402    }
403  }
404
405  void join() {
406    waitFor(splits, "Split Thread");
407    waitFor(longCompactions, "Large Compaction Thread");
408    waitFor(shortCompactions, "Small Compaction Thread");
409  }
410
411  /**
412   * Returns the current size of the queue containing regions that are
413   * processed.
414   *
415   * @return The current size of the regions queue.
416   */
417  public int getCompactionQueueSize() {
418    return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
419  }
420
421  public int getLargeCompactionQueueSize() {
422    return longCompactions.getQueue().size();
423  }
424
425
426  public int getSmallCompactionQueueSize() {
427    return shortCompactions.getQueue().size();
428  }
429
430  public int getSplitQueueSize() {
431    return splits.getQueue().size();
432  }
433
434  private boolean shouldSplitRegion() {
435    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
436      LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
437          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
438    }
439    return (regionSplitLimit > server.getNumberOfOnlineRegions());
440  }
441
442  /**
443   * @return the regionSplitLimit
444   */
445  public int getRegionSplitLimit() {
446    return this.regionSplitLimit;
447  }
448
449  private static final Comparator<Runnable> COMPARATOR =
450      new Comparator<Runnable>() {
451
452    private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
453      if (r1 == r2) {
454        return 0; //they are the same request
455      }
456      // less first
457      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
458      if (cmp != 0) {
459        return cmp;
460      }
461      cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
462      if (cmp != 0) {
463        return cmp;
464      }
465
466      // break the tie based on hash code
467      return System.identityHashCode(r1) - System.identityHashCode(r2);
468    }
469
470    @Override
471    public int compare(Runnable r1, Runnable r2) {
472      // CompactionRunner first
473      if (r1 instanceof CompactionRunner) {
474        if (!(r2 instanceof CompactionRunner)) {
475          return -1;
476        }
477      } else {
478        if (r2 instanceof CompactionRunner) {
479          return 1;
480        } else {
481          // break the tie based on hash code
482          return System.identityHashCode(r1) - System.identityHashCode(r2);
483        }
484      }
485      CompactionRunner o1 = (CompactionRunner) r1;
486      CompactionRunner o2 = (CompactionRunner) r2;
487      // less first
488      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
489      if (cmp != 0) {
490        return cmp;
491      }
492      CompactionContext c1 = o1.compaction;
493      CompactionContext c2 = o2.compaction;
494      if (c1 != null) {
495        return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
496      } else {
497        return c2 != null ? 1 : 0;
498      }
499    }
500  };
501
502  private final class CompactionRunner implements Runnable {
503    private final HStore store;
504    private final HRegion region;
505    private final CompactionContext compaction;
506    private final CompactionLifeCycleTracker tracker;
507    private final CompactionCompleteTracker completeTracker;
508    private int queuedPriority;
509    private ThreadPoolExecutor parent;
510    private User user;
511    private long time;
512
513    public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
514        CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
515        ThreadPoolExecutor parent, User user) {
516      this.store = store;
517      this.region = region;
518      this.compaction = compaction;
519      this.tracker = tracker;
520      this.completeTracker = completeTracker;
521      this.queuedPriority =
522          compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
523      this.parent = parent;
524      this.user = user;
525      this.time = EnvironmentEdgeManager.currentTime();
526    }
527
528    @Override
529    public String toString() {
530      if (compaction != null) {
531        return "Request=" + compaction.getRequest();
532      } else {
533        return "region=" + region.toString() + ", storeName=" + store.toString() +
534            ", priority=" + queuedPriority + ", startTime=" + time;
535      }
536    }
537
538    private void doCompaction(User user) {
539      CompactionContext c;
540      // Common case - system compaction without a file selection. Select now.
541      if (compaction == null) {
542        int oldPriority = this.queuedPriority;
543        this.queuedPriority = this.store.getCompactPriority();
544        if (this.queuedPriority > oldPriority) {
545          // Store priority decreased while we were in queue (due to some other compaction?),
546          // requeue with new priority to avoid blocking potential higher priorities.
547          this.parent.execute(this);
548          return;
549        }
550        Optional<CompactionContext> selected;
551        try {
552          selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
553            completeTracker, user);
554        } catch (IOException ex) {
555          LOG.error("Compaction selection failed " + this, ex);
556          server.checkFileSystem();
557          region.decrementCompactionsQueuedCount();
558          return;
559        }
560        if (!selected.isPresent()) {
561          region.decrementCompactionsQueuedCount();
562          return; // nothing to do
563        }
564        c = selected.get();
565        assert c.hasSelection();
566        // Now see if we are in correct pool for the size; if not, go to the correct one.
567        // We might end up waiting for a while, so cancel the selection.
568
569        ThreadPoolExecutor pool =
570            store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
571
572        // Long compaction pool can process small job
573        // Short compaction pool should not process large job
574        if (this.parent == shortCompactions && pool == longCompactions) {
575          this.store.cancelRequestedCompaction(c);
576          this.parent = pool;
577          this.parent.execute(this);
578          return;
579        }
580      } else {
581        c = compaction;
582      }
583      // Finally we can compact something.
584      assert c != null;
585
586      tracker.beforeExecution(store);
587      try {
588        // Note: please don't put single-compaction logic here;
589        //       put it into region/store/etc. This is CST logic.
590        long start = EnvironmentEdgeManager.currentTime();
591        boolean completed =
592            region.compact(c, store, compactionThroughputController, user);
593        long now = EnvironmentEdgeManager.currentTime();
594        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
595              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
596        if (completed) {
597          // degenerate case: blocked regions require recursive enqueues
598          if (store.getCompactPriority() <= 0) {
599            requestSystemCompaction(region, store, "Recursive enqueue");
600          } else {
601            // see if the compaction has caused us to exceed max region size
602            requestSplit(region);
603          }
604        }
605      } catch (IOException ex) {
606        IOException remoteEx =
607            ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
608        LOG.error("Compaction failed " + this, remoteEx);
609        if (remoteEx != ex) {
610          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
611        }
612        region.reportCompactionRequestFailure();
613        server.checkFileSystem();
614      } catch (Exception ex) {
615        LOG.error("Compaction failed " + this, ex);
616        region.reportCompactionRequestFailure();
617        server.checkFileSystem();
618      } finally {
619        tracker.afterExecution(store);
620        completeTracker.completed(store);
621        region.decrementCompactionsQueuedCount();
622        LOG.debug("Status {}", CompactSplit.this);
623      }
624    }
625
626    @Override
627    public void run() {
628      Preconditions.checkNotNull(server);
629      if (server.isStopped()
630          || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) {
631        region.decrementCompactionsQueuedCount();
632        return;
633      }
634      doCompaction(user);
635    }
636
637    private String formatStackTrace(Exception ex) {
638      StringWriter sw = new StringWriter();
639      PrintWriter pw = new PrintWriter(sw);
640      ex.printStackTrace(pw);
641      pw.flush();
642      return sw.toString();
643    }
644  }
645
646  /**
647   * Cleanup class to use when rejecting a compaction request from the queue.
648   */
649  private static class Rejection implements RejectedExecutionHandler {
650    @Override
651    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
652      if (runnable instanceof CompactionRunner) {
653        CompactionRunner runner = (CompactionRunner) runnable;
654        LOG.debug("Compaction Rejected: " + runner);
655        if (runner.compaction != null) {
656          runner.store.cancelRequestedCompaction(runner.compaction);
657        }
658      }
659    }
660  }
661
662  /**
663   * {@inheritDoc}
664   */
665  @Override
666  public void onConfigurationChange(Configuration newConf) {
667    // Check if number of large / small compaction threads has changed, and then
668    // adjust the core pool size of the thread pools, by using the
669    // setCorePoolSize() method. According to the javadocs, it is safe to
670    // change the core pool size on-the-fly. We need to reset the maximum
671    // pool size, as well.
672    int largeThreads = Math.max(1, newConf.getInt(
673            LARGE_COMPACTION_THREADS,
674            LARGE_COMPACTION_THREADS_DEFAULT));
675    if (this.longCompactions.getCorePoolSize() != largeThreads) {
676      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
677              " from " + this.longCompactions.getCorePoolSize() + " to " +
678              largeThreads);
679      if(this.longCompactions.getCorePoolSize() < largeThreads) {
680        this.longCompactions.setMaximumPoolSize(largeThreads);
681        this.longCompactions.setCorePoolSize(largeThreads);
682      } else {
683        this.longCompactions.setCorePoolSize(largeThreads);
684        this.longCompactions.setMaximumPoolSize(largeThreads);
685      }
686    }
687
688    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
689            SMALL_COMPACTION_THREADS_DEFAULT);
690    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
691      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
692                " from " + this.shortCompactions.getCorePoolSize() + " to " +
693                smallThreads);
694      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
695        this.shortCompactions.setMaximumPoolSize(smallThreads);
696        this.shortCompactions.setCorePoolSize(smallThreads);
697      } else {
698        this.shortCompactions.setCorePoolSize(smallThreads);
699        this.shortCompactions.setMaximumPoolSize(smallThreads);
700      }
701    }
702
703    int splitThreads = newConf.getInt(SPLIT_THREADS,
704            SPLIT_THREADS_DEFAULT);
705    if (this.splits.getCorePoolSize() != splitThreads) {
706      LOG.info("Changing the value of " + SPLIT_THREADS +
707                " from " + this.splits.getCorePoolSize() + " to " +
708                splitThreads);
709      if(this.splits.getCorePoolSize() < splitThreads) {
710        this.splits.setMaximumPoolSize(splitThreads);
711        this.splits.setCorePoolSize(splitThreads);
712      } else {
713        this.splits.setCorePoolSize(splitThreads);
714        this.splits.setMaximumPoolSize(splitThreads);
715      }
716    }
717
718    ThroughputController old = this.compactionThroughputController;
719    if (old != null) {
720      old.stop("configuration change");
721    }
722    this.compactionThroughputController =
723        CompactionThroughputControllerFactory.create(server, newConf);
724
725    // We change this atomically here instead of reloading the config in order that upstream
726    // would be the only one with the flexibility to reload the config.
727    this.conf.reloadConfiguration();
728  }
729
730  protected int getSmallCompactionThreadNum() {
731    return this.shortCompactions.getCorePoolSize();
732  }
733
734  protected int getLargeCompactionThreadNum() {
735    return this.longCompactions.getCorePoolSize();
736  }
737
738  protected int getSplitThreadNum() {
739    return this.splits.getCorePoolSize();
740  }
741
742  /**
743   * {@inheritDoc}
744   */
745  @Override
746  public void registerChildren(ConfigurationManager manager) {
747    // No children to register.
748  }
749
750  /**
751   * {@inheritDoc}
752   */
753  @Override
754  public void deregisterChildren(ConfigurationManager manager) {
755    // No children to register
756  }
757
758  @VisibleForTesting
759  public ThroughputController getCompactionThroughputController() {
760    return compactionThroughputController;
761  }
762
763  @VisibleForTesting
764  /**
765   * Shutdown the long compaction thread pool.
766   * Should only be used in unit test to prevent long compaction thread pool from stealing job
767   * from short compaction queue
768   */
769  void shutdownLongCompactions(){
770    this.longCompactions.shutdown();
771  }
772
773  public void clearLongCompactionsQueue() {
774    longCompactions.getQueue().clear();
775  }
776
777  public void clearShortCompactionsQueue() {
778    shortCompactions.getQueue().clear();
779  }
780}