View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.RejectedExecutionHandler;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.conf.ConfigurationManager;
40  import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
41  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
42  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
43  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
44  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
45  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46  import org.apache.hadoop.hbase.util.Pair;
47  import org.apache.hadoop.hbase.util.StealJobQueue;
48  import org.apache.hadoop.ipc.RemoteException;
49  import org.apache.hadoop.util.StringUtils;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.base.Preconditions;
53  
54  /**
55   * Compact region on request and then run split if appropriate
56   */
57  @InterfaceAudience.Private
58  public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
59    private static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
60  
61    // Configuration key for the large compaction threads.
62    public final static String LARGE_COMPACTION_THREADS =
63        "hbase.regionserver.thread.compaction.large";
64    public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
65    
66    // Configuration key for the small compaction threads.
67    public final static String SMALL_COMPACTION_THREADS =
68        "hbase.regionserver.thread.compaction.small";
69    public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
70    
71    // Configuration key for split threads
72    public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
73    public final static int SPLIT_THREADS_DEFAULT = 1;
74    
75    // Configuration keys for merge threads
76    public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
77    public final static int MERGE_THREADS_DEFAULT = 1;
78  
79    public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
80        "hbase.regionserver.regionSplitLimit";
81    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
82    
83    private final HRegionServer server;
84    private final Configuration conf;
85  
86    private final ThreadPoolExecutor longCompactions;
87    private final ThreadPoolExecutor shortCompactions;
88    private final ThreadPoolExecutor splits;
89    private final ThreadPoolExecutor mergePool;
90  
91    private volatile CompactionThroughputController compactionThroughputController;
92  
93    /**
94     * Splitting should not take place if the total number of regions exceed this.
95     * This is not a hard limit to the number of regions but it is a guideline to
96     * stop splitting after number of online regions is greater than this.
97     */
98    private int regionSplitLimit;
99  
100   /** @param server */
101   CompactSplitThread(HRegionServer server) {
102     super();
103     this.server = server;
104     this.conf = server.getConfiguration();
105     this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
106         DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
107 
108     int largeThreads = Math.max(1, conf.getInt(
109         LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
110     int smallThreads = conf.getInt(
111         SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
112 
113     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
114 
115     // if we have throttle threads, make sure the user also specified size
116     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
117 
118     final String n = Thread.currentThread().getName();
119 
120     StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
121     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
122         60, TimeUnit.SECONDS, stealJobQueue,
123         new ThreadFactory() {
124           @Override
125           public Thread newThread(Runnable r) {
126             Thread t = new Thread(r);
127             t.setName(n + "-longCompactions-" + System.currentTimeMillis());
128             return t;
129           }
130       });
131     this.longCompactions.setRejectedExecutionHandler(new Rejection());
132     this.longCompactions.prestartAllCoreThreads();
133     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
134         60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
135         new ThreadFactory() {
136           @Override
137           public Thread newThread(Runnable r) {
138             Thread t = new Thread(r);
139             t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
140             return t;
141           }
142       });
143     this.shortCompactions
144         .setRejectedExecutionHandler(new Rejection());
145     this.splits = (ThreadPoolExecutor)
146         Executors.newFixedThreadPool(splitThreads,
147             new ThreadFactory() {
148           @Override
149           public Thread newThread(Runnable r) {
150             Thread t = new Thread(r);
151             t.setName(n + "-splits-" + System.currentTimeMillis());
152             return t;
153           }
154       });
155     int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
156     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
157         mergeThreads, new ThreadFactory() {
158           @Override
159           public Thread newThread(Runnable r) {
160             Thread t = new Thread(r);
161             t.setName(n + "-merges-" + System.currentTimeMillis());
162             return t;
163           }
164         });
165 
166     // compaction throughput controller
167     this.compactionThroughputController =
168         CompactionThroughputControllerFactory.create(server, conf);
169   }
170 
171   @Override
172   public String toString() {
173     return "compaction_queue=("
174         + longCompactions.getQueue().size() + ":"
175         + shortCompactions.getQueue().size() + ")"
176         + ", split_queue=" + splits.getQueue().size()
177         + ", merge_queue=" + mergePool.getQueue().size();
178   }
179   
180   public String dumpQueue() {
181     StringBuffer queueLists = new StringBuffer();
182     queueLists.append("Compaction/Split Queue dump:\n");
183     queueLists.append("  LargeCompation Queue:\n");
184     BlockingQueue<Runnable> lq = longCompactions.getQueue();
185     Iterator<Runnable> it = lq.iterator();
186     while (it.hasNext()) {
187       queueLists.append("    " + it.next().toString());
188       queueLists.append("\n");
189     }
190 
191     if (shortCompactions != null) {
192       queueLists.append("\n");
193       queueLists.append("  SmallCompation Queue:\n");
194       lq = shortCompactions.getQueue();
195       it = lq.iterator();
196       while (it.hasNext()) {
197         queueLists.append("    " + it.next().toString());
198         queueLists.append("\n");
199       }
200     }
201 
202     queueLists.append("\n");
203     queueLists.append("  Split Queue:\n");
204     lq = splits.getQueue();
205     it = lq.iterator();
206     while (it.hasNext()) {
207       queueLists.append("    " + it.next().toString());
208       queueLists.append("\n");
209     }
210 
211     queueLists.append("\n");
212     queueLists.append("  Region Merge Queue:\n");
213     lq = mergePool.getQueue();
214     it = lq.iterator();
215     while (it.hasNext()) {
216       queueLists.append("    " + it.next().toString());
217       queueLists.append("\n");
218     }
219 
220     return queueLists.toString();
221   }
222 
223   public synchronized void requestRegionsMerge(final Region a,
224       final Region b, final boolean forcible, long masterSystemTime) {
225     try {
226       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime));
227       if (LOG.isDebugEnabled()) {
228         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
229             + forcible + ".  " + this);
230       }
231     } catch (RejectedExecutionException ree) {
232       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
233           + forcible, ree);
234     }
235   }
236 
237   public synchronized boolean requestSplit(final Region r) {
238     // don't split regions that are blocking
239     if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
240       byte[] midKey = ((HRegion)r).checkSplit();
241       if (midKey != null) {
242         requestSplit(r, midKey);
243         return true;
244       }
245     }
246     return false;
247   }
248 
249   public synchronized void requestSplit(final Region r, byte[] midKey) {
250     if (midKey == null) {
251       LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
252         " not splittable because midkey=null");
253       if (((HRegion)r).shouldForceSplit()) {
254         ((HRegion)r).clearSplit();
255       }
256       return;
257     }
258     try {
259       this.splits.execute(new SplitRequest(r, midKey, this.server));
260       if (LOG.isDebugEnabled()) {
261         LOG.debug("Split requested for " + r + ".  " + this);
262       }
263     } catch (RejectedExecutionException ree) {
264       LOG.info("Could not execute split for " + r, ree);
265     }
266   }
267 
268   @Override
269   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
270       throws IOException {
271     return requestCompaction(r, why, null);
272   }
273 
274   @Override
275   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
276       List<Pair<CompactionRequest, Store>> requests) throws IOException {
277     return requestCompaction(r, why, Store.NO_PRIORITY, requests);
278   }
279 
280   @Override
281   public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
282       final String why, CompactionRequest request) throws IOException {
283     return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
284   }
285 
286   @Override
287   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
288       int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
289     return requestCompactionInternal(r, why, p, requests, true);
290   }
291 
292   private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
293       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
294     // not a special compaction request, so make our own list
295     List<CompactionRequest> ret = null;
296     if (requests == null) {
297       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
298       for (Store s : r.getStores()) {
299         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
300         if (selectNow) ret.add(cr);
301       }
302     } else {
303       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
304       ret = new ArrayList<CompactionRequest>(requests.size());
305       for (Pair<CompactionRequest, Store> pair : requests) {
306         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
307       }
308     }
309     return ret;
310   }
311 
312   public CompactionRequest requestCompaction(final Region r, final Store s,
313       final String why, int priority, CompactionRequest request) throws IOException {
314     return requestCompactionInternal(r, s, why, priority, request, true);
315   }
316 
317   public synchronized void requestSystemCompaction(
318       final Region r, final String why) throws IOException {
319     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
320   }
321 
322   public void requestSystemCompaction(
323       final Region r, final Store s, final String why) throws IOException {
324     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
325   }
326 
327   /**
328    * @param r region store belongs to
329    * @param s Store to request compaction on
330    * @param why Why compaction requested -- used in debug messages
331    * @param priority override the default priority (NO_PRIORITY == decide)
332    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
333    *          compaction will be used.
334    */
335   private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
336       final String why, int priority, CompactionRequest request, boolean selectNow)
337           throws IOException {
338     if (this.server.isStopped()
339         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
340       return null;
341     }
342 
343     CompactionContext compaction = null;
344     if (selectNow) {
345       compaction = selectCompaction(r, s, priority, request);
346       if (compaction == null) return null; // message logged inside
347     }
348 
349     // We assume that most compactions are small. So, put system compactions into small
350     // pool; we will do selection there, and move to large pool if necessary.
351     ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
352       ? longCompactions : shortCompactions;
353     pool.execute(new CompactionRunner(s, r, compaction, pool));
354     if (LOG.isDebugEnabled()) {
355       String type = (pool == shortCompactions) ? "Small " : "Large ";
356       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
357           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
358     }
359     return selectNow ? compaction.getRequest() : null;
360   }
361 
362   private CompactionContext selectCompaction(final Region r, final Store s,
363       int priority, CompactionRequest request) throws IOException {
364     CompactionContext compaction = s.requestCompaction(priority, request);
365     if (compaction == null) {
366       if(LOG.isDebugEnabled()) {
367         LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
368             " because compaction request was cancelled");
369       }
370       return null;
371     }
372     assert compaction.hasSelection();
373     if (priority != Store.NO_PRIORITY) {
374       compaction.getRequest().setPriority(priority);
375     }
376     return compaction;
377   }
378 
379   /**
380    * Only interrupt once it's done with a run through the work loop.
381    */
382   void interruptIfNecessary() {
383     splits.shutdown();
384     mergePool.shutdown();
385     longCompactions.shutdown();
386     shortCompactions.shutdown();
387   }
388 
389   private void waitFor(ThreadPoolExecutor t, String name) {
390     boolean done = false;
391     while (!done) {
392       try {
393         done = t.awaitTermination(60, TimeUnit.SECONDS);
394         LOG.info("Waiting for " + name + " to finish...");
395         if (!done) {
396           t.shutdownNow();
397         }
398       } catch (InterruptedException ie) {
399         LOG.warn("Interrupted waiting for " + name + " to finish...");
400       }
401     }
402   }
403 
404   void join() {
405     waitFor(splits, "Split Thread");
406     waitFor(mergePool, "Merge Thread");
407     waitFor(longCompactions, "Large Compaction Thread");
408     waitFor(shortCompactions, "Small Compaction Thread");
409   }
410 
411   /**
412    * Returns the current size of the queue containing regions that are
413    * processed.
414    *
415    * @return The current size of the regions queue.
416    */
417   public int getCompactionQueueSize() {
418     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
419   }
420 
421   public int getLargeCompactionQueueSize() {
422     return longCompactions.getQueue().size();
423   }
424 
425 
426   public int getSmallCompactionQueueSize() {
427     return shortCompactions.getQueue().size();
428   }
429 
430   public int getSplitQueueSize() {
431     return splits.getQueue().size();
432   }
433 
434   private boolean shouldSplitRegion() {
435     if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
436       LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
437           + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
438     }
439     return (regionSplitLimit > server.getNumberOfOnlineRegions());
440   }
441 
442   /**
443    * @return the regionSplitLimit
444    */
445   public int getRegionSplitLimit() {
446     return this.regionSplitLimit;
447   }
448 
449   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
450       justification="Contrived use of compareTo")
451   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
452     private final Store store;
453     private final HRegion region;
454     private CompactionContext compaction;
455     private int queuedPriority;
456     private ThreadPoolExecutor parent;
457 
458     public CompactionRunner(Store store, Region region,
459         CompactionContext compaction, ThreadPoolExecutor parent) {
460       super();
461       this.store = store;
462       this.region = (HRegion)region;
463       this.compaction = compaction;
464       this.queuedPriority = (this.compaction == null)
465           ? store.getCompactPriority() : compaction.getRequest().getPriority();
466       this.parent = parent;
467     }
468 
469     @Override
470     public String toString() {
471       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
472           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
473     }
474 
475     @Override
476     public void run() {
477       Preconditions.checkNotNull(server);
478       if (server.isStopped()
479           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
480         return;
481       }
482       // Common case - system compaction without a file selection. Select now.
483       if (this.compaction == null) {
484         int oldPriority = this.queuedPriority;
485         this.queuedPriority = this.store.getCompactPriority();
486         if (this.queuedPriority > oldPriority) {
487           // Store priority decreased while we were in queue (due to some other compaction?),
488           // requeue with new priority to avoid blocking potential higher priorities.
489           this.parent.execute(this);
490           return;
491         }
492         try {
493           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
494         } catch (IOException ex) {
495           LOG.error("Compaction selection failed " + this, ex);
496           server.checkFileSystem();
497           return;
498         }
499         if (this.compaction == null) return; // nothing to do
500         // Now see if we are in correct pool for the size; if not, go to the correct one.
501         // We might end up waiting for a while, so cancel the selection.
502         assert this.compaction.hasSelection();
503         ThreadPoolExecutor pool = store.throttleCompaction(
504             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
505 
506         // Long compaction pool can process small job
507         // Short compaction pool should not process large job
508         if (this.parent == shortCompactions && pool == longCompactions) {
509           this.store.cancelRequestedCompaction(this.compaction);
510           this.compaction = null;
511           this.parent = pool;
512           this.parent.execute(this);
513           return;
514         }
515       }
516       // Finally we can compact something.
517       assert this.compaction != null;
518 
519       this.compaction.getRequest().beforeExecute();
520       try {
521         // Note: please don't put single-compaction logic here;
522         //       put it into region/store/etc. This is CST logic.
523         long start = EnvironmentEdgeManager.currentTime();
524         boolean completed =
525             region.compact(compaction, store, compactionThroughputController);
526         long now = EnvironmentEdgeManager.currentTime();
527         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
528               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
529         if (completed) {
530           // degenerate case: blocked regions require recursive enqueues
531           if (store.getCompactPriority() <= 0) {
532             requestSystemCompaction(region, store, "Recursive enqueue");
533           } else {
534             // see if the compaction has caused us to exceed max region size
535             requestSplit(region);
536           }
537         }
538       } catch (IOException ex) {
539         IOException remoteEx =
540             ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
541         LOG.error("Compaction failed " + this, remoteEx);
542         if (remoteEx != ex) {
543           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
544         }
545         server.checkFileSystem();
546       } catch (Exception ex) {
547         LOG.error("Compaction failed " + this, ex);
548         server.checkFileSystem();
549       } finally {
550         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
551       }
552       this.compaction.getRequest().afterExecute();
553     }
554 
555     private String formatStackTrace(Exception ex) {
556       StringWriter sw = new StringWriter();
557       PrintWriter pw = new PrintWriter(sw);
558       ex.printStackTrace(pw);
559       pw.flush();
560       return sw.toString();
561     }
562 
563     @Override
564     public int compareTo(CompactionRunner o) {
565       // Only compare the underlying request (if any), for queue sorting purposes.
566       int compareVal = queuedPriority - o.queuedPriority; // compare priority
567       if (compareVal != 0) return compareVal;
568       CompactionContext tc = this.compaction, oc = o.compaction;
569       // Sort pre-selected (user?) compactions before system ones with equal priority.
570       return (tc == null) ? ((oc == null) ? 0 : 1)
571           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
572     }
573   }
574 
575   /**
576    * Cleanup class to use when rejecting a compaction request from the queue.
577    */
578   private static class Rejection implements RejectedExecutionHandler {
579     @Override
580     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
581       if (runnable instanceof CompactionRunner) {
582         CompactionRunner runner = (CompactionRunner)runnable;
583         LOG.debug("Compaction Rejected: " + runner);
584         runner.store.cancelRequestedCompaction(runner.compaction);
585       }
586     }
587   }
588 
589   /**
590    * {@inheritDoc}
591    */
592   @Override
593   public void onConfigurationChange(Configuration newConf) {
594     // Check if number of large / small compaction threads has changed, and then
595     // adjust the core pool size of the thread pools, by using the
596     // setCorePoolSize() method. According to the javadocs, it is safe to
597     // change the core pool size on-the-fly. We need to reset the maximum
598     // pool size, as well.
599     int largeThreads = Math.max(1, newConf.getInt(
600             LARGE_COMPACTION_THREADS,
601             LARGE_COMPACTION_THREADS_DEFAULT));
602     if (this.longCompactions.getCorePoolSize() != largeThreads) {
603       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
604               " from " + this.longCompactions.getCorePoolSize() + " to " +
605               largeThreads);
606       this.longCompactions.setMaximumPoolSize(largeThreads);
607       this.longCompactions.setCorePoolSize(largeThreads);
608     }
609 
610     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
611             SMALL_COMPACTION_THREADS_DEFAULT);
612     if (this.shortCompactions.getCorePoolSize() != smallThreads) {
613       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
614                 " from " + this.shortCompactions.getCorePoolSize() + " to " +
615                 smallThreads);
616       this.shortCompactions.setMaximumPoolSize(smallThreads);
617       this.shortCompactions.setCorePoolSize(smallThreads);
618     }
619 
620     int splitThreads = newConf.getInt(SPLIT_THREADS,
621             SPLIT_THREADS_DEFAULT);
622     if (this.splits.getCorePoolSize() != splitThreads) {
623       LOG.info("Changing the value of " + SPLIT_THREADS +
624                 " from " + this.splits.getCorePoolSize() + " to " +
625                 splitThreads);
626       this.splits.setMaximumPoolSize(smallThreads);
627       this.splits.setCorePoolSize(smallThreads);
628     }
629 
630     int mergeThreads = newConf.getInt(MERGE_THREADS,
631             MERGE_THREADS_DEFAULT);
632     if (this.mergePool.getCorePoolSize() != mergeThreads) {
633       LOG.info("Changing the value of " + MERGE_THREADS +
634                 " from " + this.mergePool.getCorePoolSize() + " to " +
635                 mergeThreads);
636       this.mergePool.setMaximumPoolSize(smallThreads);
637       this.mergePool.setCorePoolSize(smallThreads);
638     }
639 
640     CompactionThroughputController old = this.compactionThroughputController;
641     if (old != null) {
642       old.stop("configuration change");
643     }
644     this.compactionThroughputController =
645         CompactionThroughputControllerFactory.create(server, newConf);
646 
647     // We change this atomically here instead of reloading the config in order that upstream
648     // would be the only one with the flexibility to reload the config.
649     this.conf.reloadConfiguration();
650   }
651 
652   protected int getSmallCompactionThreadNum() {
653     return this.shortCompactions.getCorePoolSize();
654   }
655 
656   public int getLargeCompactionThreadNum() {
657     return this.longCompactions.getCorePoolSize();
658   }
659 
660   /**
661    * {@inheritDoc}
662    */
663   @Override
664   public void registerChildren(ConfigurationManager manager) {
665     // No children to register.
666   }
667 
668   /**
669    * {@inheritDoc}
670    */
671   @Override
672   public void deregisterChildren(ConfigurationManager manager) {
673     // No children to register
674   }
675 
676   @VisibleForTesting
677   public CompactionThroughputController getCompactionThroughputController() {
678     return compactionThroughputController;
679   }
680 
681   @VisibleForTesting
682   /**
683    * Shutdown the long compaction thread pool.
684    * Should only be used in unit test to prevent long compaction thread pool from stealing job
685    * from short compaction queue
686    */
687   void shutdownLongCompactions(){
688     this.longCompactions.shutdown();
689   }
690 }