View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.PriorityBlockingQueue;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.conf.ConfigurationManager;
41  import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
42  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
43  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
44  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
45  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.Pair;
48  import org.apache.hadoop.ipc.RemoteException;
49  import org.apache.hadoop.util.StringUtils;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.base.Preconditions;
53  
54  /**
55   * Compact region on request and then run split if appropriate
56   */
57  @InterfaceAudience.Private
58  public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
59    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
60  
61    // Configuration key for the large compaction threads.
62    public final static String LARGE_COMPACTION_THREADS =
63        "hbase.regionserver.thread.compaction.large";
64    public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
65    
66    // Configuration key for the small compaction threads.
67    public final static String SMALL_COMPACTION_THREADS =
68        "hbase.regionserver.thread.compaction.small";
69    public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
70    
71    // Configuration key for split threads
72    public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
73    public final static int SPLIT_THREADS_DEFAULT = 1;
74    
75    // Configuration keys for merge threads
76    public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
77    public final static int MERGE_THREADS_DEFAULT = 1;
78  
79    public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
80        "hbase.regionserver.regionSplitLimit";
81    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
82    
83    private final HRegionServer server;
84    private final Configuration conf;
85  
86    private final ThreadPoolExecutor longCompactions;
87    private final ThreadPoolExecutor shortCompactions;
88    private final ThreadPoolExecutor splits;
89    private final ThreadPoolExecutor mergePool;
90  
91    private volatile CompactionThroughputController compactionThroughputController;
92  
93    /**
94     * Splitting should not take place if the total number of regions exceed this.
95     * This is not a hard limit to the number of regions but it is a guideline to
96     * stop splitting after number of online regions is greater than this.
97     */
98    private int regionSplitLimit;
99  
100   /** @param server */
101   CompactSplitThread(HRegionServer server) {
102     super();
103     this.server = server;
104     this.conf = server.getConfiguration();
105     this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
106         DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
107 
108     int largeThreads = Math.max(1, conf.getInt(
109         LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
110     int smallThreads = conf.getInt(
111         SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
112 
113     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
114 
115     // if we have throttle threads, make sure the user also specified size
116     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
117 
118     final String n = Thread.currentThread().getName();
119 
120     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
121         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
122         new ThreadFactory() {
123           @Override
124           public Thread newThread(Runnable r) {
125             Thread t = new Thread(r);
126             t.setName(n + "-longCompactions-" + System.currentTimeMillis());
127             return t;
128           }
129       });
130     this.longCompactions.setRejectedExecutionHandler(new Rejection());
131     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
132         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
133         new ThreadFactory() {
134           @Override
135           public Thread newThread(Runnable r) {
136             Thread t = new Thread(r);
137             t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
138             return t;
139           }
140       });
141     this.shortCompactions
142         .setRejectedExecutionHandler(new Rejection());
143     this.splits = (ThreadPoolExecutor)
144         Executors.newFixedThreadPool(splitThreads,
145             new ThreadFactory() {
146           @Override
147           public Thread newThread(Runnable r) {
148             Thread t = new Thread(r);
149             t.setName(n + "-splits-" + System.currentTimeMillis());
150             return t;
151           }
152       });
153     int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
154     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
155         mergeThreads, new ThreadFactory() {
156           @Override
157           public Thread newThread(Runnable r) {
158             Thread t = new Thread(r);
159             t.setName(n + "-merges-" + System.currentTimeMillis());
160             return t;
161           }
162         });
163 
164     // compaction throughput controller
165     this.compactionThroughputController =
166         CompactionThroughputControllerFactory.create(server, conf);
167   }
168 
169   @Override
170   public String toString() {
171     return "compaction_queue=("
172         + longCompactions.getQueue().size() + ":"
173         + shortCompactions.getQueue().size() + ")"
174         + ", split_queue=" + splits.getQueue().size()
175         + ", merge_queue=" + mergePool.getQueue().size();
176   }
177   
178   public String dumpQueue() {
179     StringBuffer queueLists = new StringBuffer();
180     queueLists.append("Compaction/Split Queue dump:\n");
181     queueLists.append("  LargeCompation Queue:\n");
182     BlockingQueue<Runnable> lq = longCompactions.getQueue();
183     Iterator<Runnable> it = lq.iterator();
184     while (it.hasNext()) {
185       queueLists.append("    " + it.next().toString());
186       queueLists.append("\n");
187     }
188 
189     if (shortCompactions != null) {
190       queueLists.append("\n");
191       queueLists.append("  SmallCompation Queue:\n");
192       lq = shortCompactions.getQueue();
193       it = lq.iterator();
194       while (it.hasNext()) {
195         queueLists.append("    " + it.next().toString());
196         queueLists.append("\n");
197       }
198     }
199 
200     queueLists.append("\n");
201     queueLists.append("  Split Queue:\n");
202     lq = splits.getQueue();
203     it = lq.iterator();
204     while (it.hasNext()) {
205       queueLists.append("    " + it.next().toString());
206       queueLists.append("\n");
207     }
208 
209     queueLists.append("\n");
210     queueLists.append("  Region Merge Queue:\n");
211     lq = mergePool.getQueue();
212     it = lq.iterator();
213     while (it.hasNext()) {
214       queueLists.append("    " + it.next().toString());
215       queueLists.append("\n");
216     }
217 
218     return queueLists.toString();
219   }
220 
221   public synchronized void requestRegionsMerge(final Region a,
222       final Region b, final boolean forcible) {
223     try {
224       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible));
225       if (LOG.isDebugEnabled()) {
226         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
227             + forcible + ".  " + this);
228       }
229     } catch (RejectedExecutionException ree) {
230       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
231           + forcible, ree);
232     }
233   }
234 
235   public synchronized boolean requestSplit(final Region r) {
236     // don't split regions that are blocking
237     if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
238       byte[] midKey = ((HRegion)r).checkSplit();
239       if (midKey != null) {
240         requestSplit(r, midKey);
241         return true;
242       }
243     }
244     return false;
245   }
246 
247   public synchronized void requestSplit(final Region r, byte[] midKey) {
248     if (midKey == null) {
249       LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
250         " not splittable because midkey=null");
251       if (((HRegion)r).shouldForceSplit()) {
252         ((HRegion)r).clearSplit();
253       }
254       return;
255     }
256     try {
257       this.splits.execute(new SplitRequest(r, midKey, this.server));
258       if (LOG.isDebugEnabled()) {
259         LOG.debug("Split requested for " + r + ".  " + this);
260       }
261     } catch (RejectedExecutionException ree) {
262       LOG.info("Could not execute split for " + r, ree);
263     }
264   }
265 
266   @Override
267   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
268       throws IOException {
269     return requestCompaction(r, why, null);
270   }
271 
272   @Override
273   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
274       List<Pair<CompactionRequest, Store>> requests) throws IOException {
275     return requestCompaction(r, why, Store.NO_PRIORITY, requests);
276   }
277 
278   @Override
279   public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
280       final String why, CompactionRequest request) throws IOException {
281     return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
282   }
283 
284   @Override
285   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
286       int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
287     return requestCompactionInternal(r, why, p, requests, true);
288   }
289 
290   private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
291       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
292     // not a special compaction request, so make our own list
293     List<CompactionRequest> ret = null;
294     if (requests == null) {
295       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
296       for (Store s : r.getStores()) {
297         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
298         if (selectNow) ret.add(cr);
299       }
300     } else {
301       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
302       ret = new ArrayList<CompactionRequest>(requests.size());
303       for (Pair<CompactionRequest, Store> pair : requests) {
304         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
305       }
306     }
307     return ret;
308   }
309 
310   public CompactionRequest requestCompaction(final Region r, final Store s,
311       final String why, int priority, CompactionRequest request) throws IOException {
312     return requestCompactionInternal(r, s, why, priority, request, true);
313   }
314 
315   public synchronized void requestSystemCompaction(
316       final Region r, final String why) throws IOException {
317     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
318   }
319 
320   public void requestSystemCompaction(
321       final Region r, final Store s, final String why) throws IOException {
322     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
323   }
324 
325   /**
326    * @param r region store belongs to
327    * @param s Store to request compaction on
328    * @param why Why compaction requested -- used in debug messages
329    * @param priority override the default priority (NO_PRIORITY == decide)
330    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
331    *          compaction will be used.
332    */
333   private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
334       final String why, int priority, CompactionRequest request, boolean selectNow)
335           throws IOException {
336     if (this.server.isStopped()
337         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
338       return null;
339     }
340 
341     CompactionContext compaction = null;
342     if (selectNow) {
343       compaction = selectCompaction(r, s, priority, request);
344       if (compaction == null) return null; // message logged inside
345     }
346 
347     // We assume that most compactions are small. So, put system compactions into small
348     // pool; we will do selection there, and move to large pool if necessary.
349     ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
350       ? longCompactions : shortCompactions;
351     pool.execute(new CompactionRunner(s, r, compaction, pool));
352     if (LOG.isDebugEnabled()) {
353       String type = (pool == shortCompactions) ? "Small " : "Large ";
354       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
355           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
356     }
357     return selectNow ? compaction.getRequest() : null;
358   }
359 
360   private CompactionContext selectCompaction(final Region r, final Store s,
361       int priority, CompactionRequest request) throws IOException {
362     CompactionContext compaction = s.requestCompaction(priority, request);
363     if (compaction == null) {
364       if(LOG.isDebugEnabled()) {
365         LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
366             " because compaction request was cancelled");
367       }
368       return null;
369     }
370     assert compaction.hasSelection();
371     if (priority != Store.NO_PRIORITY) {
372       compaction.getRequest().setPriority(priority);
373     }
374     return compaction;
375   }
376 
377   /**
378    * Only interrupt once it's done with a run through the work loop.
379    */
380   void interruptIfNecessary() {
381     splits.shutdown();
382     mergePool.shutdown();
383     longCompactions.shutdown();
384     shortCompactions.shutdown();
385   }
386 
387   private void waitFor(ThreadPoolExecutor t, String name) {
388     boolean done = false;
389     while (!done) {
390       try {
391         done = t.awaitTermination(60, TimeUnit.SECONDS);
392         LOG.info("Waiting for " + name + " to finish...");
393         if (!done) {
394           t.shutdownNow();
395         }
396       } catch (InterruptedException ie) {
397         LOG.warn("Interrupted waiting for " + name + " to finish...");
398       }
399     }
400   }
401 
402   void join() {
403     waitFor(splits, "Split Thread");
404     waitFor(mergePool, "Merge Thread");
405     waitFor(longCompactions, "Large Compaction Thread");
406     waitFor(shortCompactions, "Small Compaction Thread");
407   }
408 
409   /**
410    * Returns the current size of the queue containing regions that are
411    * processed.
412    *
413    * @return The current size of the regions queue.
414    */
415   public int getCompactionQueueSize() {
416     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
417   }
418 
419   public int getLargeCompactionQueueSize() {
420     return longCompactions.getQueue().size();
421   }
422 
423 
424   public int getSmallCompactionQueueSize() {
425     return shortCompactions.getQueue().size();
426   }
427 
428   public int getSplitQueueSize() {
429     return splits.getQueue().size();
430   }
431 
432   private boolean shouldSplitRegion() {
433     if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
434       LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
435           + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
436     }
437     return (regionSplitLimit > server.getNumberOfOnlineRegions());
438   }
439 
440   /**
441    * @return the regionSplitLimit
442    */
443   public int getRegionSplitLimit() {
444     return this.regionSplitLimit;
445   }
446 
447   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
448       justification="Contrived use of compareTo")
449   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
450     private final Store store;
451     private final HRegion region;
452     private CompactionContext compaction;
453     private int queuedPriority;
454     private ThreadPoolExecutor parent;
455 
456     public CompactionRunner(Store store, Region region,
457         CompactionContext compaction, ThreadPoolExecutor parent) {
458       super();
459       this.store = store;
460       this.region = (HRegion)region;
461       this.compaction = compaction;
462       this.queuedPriority = (this.compaction == null)
463           ? store.getCompactPriority() : compaction.getRequest().getPriority();
464       this.parent = parent;
465     }
466 
467     @Override
468     public String toString() {
469       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
470           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
471     }
472 
473     @Override
474     public void run() {
475       Preconditions.checkNotNull(server);
476       if (server.isStopped()
477           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
478         return;
479       }
480       // Common case - system compaction without a file selection. Select now.
481       if (this.compaction == null) {
482         int oldPriority = this.queuedPriority;
483         this.queuedPriority = this.store.getCompactPriority();
484         if (this.queuedPriority > oldPriority) {
485           // Store priority decreased while we were in queue (due to some other compaction?),
486           // requeue with new priority to avoid blocking potential higher priorities.
487           this.parent.execute(this);
488           return;
489         }
490         try {
491           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
492         } catch (IOException ex) {
493           LOG.error("Compaction selection failed " + this, ex);
494           server.checkFileSystem();
495           return;
496         }
497         if (this.compaction == null) return; // nothing to do
498         // Now see if we are in correct pool for the size; if not, go to the correct one.
499         // We might end up waiting for a while, so cancel the selection.
500         assert this.compaction.hasSelection();
501         ThreadPoolExecutor pool = store.throttleCompaction(
502             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
503         if (this.parent != pool) {
504           this.store.cancelRequestedCompaction(this.compaction);
505           this.compaction = null;
506           this.parent = pool;
507           this.parent.execute(this);
508           return;
509         }
510       }
511       // Finally we can compact something.
512       assert this.compaction != null;
513 
514       this.compaction.getRequest().beforeExecute();
515       try {
516         // Note: please don't put single-compaction logic here;
517         //       put it into region/store/etc. This is CST logic.
518         long start = EnvironmentEdgeManager.currentTime();
519         boolean completed =
520             region.compact(compaction, store, compactionThroughputController);
521         long now = EnvironmentEdgeManager.currentTime();
522         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
523               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
524         if (completed) {
525           // degenerate case: blocked regions require recursive enqueues
526           if (store.getCompactPriority() <= 0) {
527             requestSystemCompaction(region, store, "Recursive enqueue");
528           } else {
529             // see if the compaction has caused us to exceed max region size
530             requestSplit(region);
531           }
532         }
533       } catch (IOException ex) {
534         IOException remoteEx =
535             ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
536         LOG.error("Compaction failed " + this, remoteEx);
537         if (remoteEx != ex) {
538           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
539         }
540         server.checkFileSystem();
541       } catch (Exception ex) {
542         LOG.error("Compaction failed " + this, ex);
543         server.checkFileSystem();
544       } finally {
545         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
546       }
547       this.compaction.getRequest().afterExecute();
548     }
549 
550     private String formatStackTrace(Exception ex) {
551       StringWriter sw = new StringWriter();
552       PrintWriter pw = new PrintWriter(sw);
553       ex.printStackTrace(pw);
554       pw.flush();
555       return sw.toString();
556     }
557 
558     @Override
559     public int compareTo(CompactionRunner o) {
560       // Only compare the underlying request (if any), for queue sorting purposes.
561       int compareVal = queuedPriority - o.queuedPriority; // compare priority
562       if (compareVal != 0) return compareVal;
563       CompactionContext tc = this.compaction, oc = o.compaction;
564       // Sort pre-selected (user?) compactions before system ones with equal priority.
565       return (tc == null) ? ((oc == null) ? 0 : 1)
566           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
567     }
568   }
569 
570   /**
571    * Cleanup class to use when rejecting a compaction request from the queue.
572    */
573   private static class Rejection implements RejectedExecutionHandler {
574     @Override
575     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
576       if (runnable instanceof CompactionRunner) {
577         CompactionRunner runner = (CompactionRunner)runnable;
578         LOG.debug("Compaction Rejected: " + runner);
579         runner.store.cancelRequestedCompaction(runner.compaction);
580       }
581     }
582   }
583 
584   /**
585    * {@inheritDoc}
586    */
587   @Override
588   public void onConfigurationChange(Configuration newConf) {
589     // Check if number of large / small compaction threads has changed, and then
590     // adjust the core pool size of the thread pools, by using the
591     // setCorePoolSize() method. According to the javadocs, it is safe to
592     // change the core pool size on-the-fly. We need to reset the maximum
593     // pool size, as well.
594     int largeThreads = Math.max(1, newConf.getInt(
595             LARGE_COMPACTION_THREADS,
596             LARGE_COMPACTION_THREADS_DEFAULT));
597     if (this.longCompactions.getCorePoolSize() != largeThreads) {
598       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
599               " from " + this.longCompactions.getCorePoolSize() + " to " +
600               largeThreads);
601       this.longCompactions.setMaximumPoolSize(largeThreads);
602       this.longCompactions.setCorePoolSize(largeThreads);
603     }
604 
605     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
606             SMALL_COMPACTION_THREADS_DEFAULT);
607     if (this.shortCompactions.getCorePoolSize() != smallThreads) {
608       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
609                 " from " + this.shortCompactions.getCorePoolSize() + " to " +
610                 smallThreads);
611       this.shortCompactions.setMaximumPoolSize(smallThreads);
612       this.shortCompactions.setCorePoolSize(smallThreads);
613     }
614 
615     int splitThreads = newConf.getInt(SPLIT_THREADS,
616             SPLIT_THREADS_DEFAULT);
617     if (this.splits.getCorePoolSize() != splitThreads) {
618       LOG.info("Changing the value of " + SPLIT_THREADS +
619                 " from " + this.splits.getCorePoolSize() + " to " +
620                 splitThreads);
621       this.splits.setMaximumPoolSize(smallThreads);
622       this.splits.setCorePoolSize(smallThreads);
623     }
624 
625     int mergeThreads = newConf.getInt(MERGE_THREADS,
626             MERGE_THREADS_DEFAULT);
627     if (this.mergePool.getCorePoolSize() != mergeThreads) {
628       LOG.info("Changing the value of " + MERGE_THREADS +
629                 " from " + this.mergePool.getCorePoolSize() + " to " +
630                 mergeThreads);
631       this.mergePool.setMaximumPoolSize(smallThreads);
632       this.mergePool.setCorePoolSize(smallThreads);
633     }
634 
635     CompactionThroughputController old = this.compactionThroughputController;
636     if (old != null) {
637       old.stop("configuration change");
638     }
639     this.compactionThroughputController =
640         CompactionThroughputControllerFactory.create(server, newConf);
641 
642     // We change this atomically here instead of reloading the config in order that upstream
643     // would be the only one with the flexibility to reload the config.
644     this.conf.reloadConfiguration();
645   }
646 
647   protected int getSmallCompactionThreadNum() {
648     return this.shortCompactions.getCorePoolSize();
649   }
650 
651   public int getLargeCompactionThreadNum() {
652     return this.longCompactions.getCorePoolSize();
653   }
654 
655   /**
656    * {@inheritDoc}
657    */
658   @Override
659   public void registerChildren(ConfigurationManager manager) {
660     // No children to register.
661   }
662 
663   /**
664    * {@inheritDoc}
665    */
666   @Override
667   public void deregisterChildren(ConfigurationManager manager) {
668     // No children to register
669   }
670 
671   @VisibleForTesting
672   public CompactionThroughputController getCompactionThroughputController() {
673     return compactionThroughputController;
674   }
675 
676 }