View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.PriorityBlockingQueue;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.RemoteExceptionHandler;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.conf.ConfigurationManager;
42  import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
43  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
44  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
45  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
46  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
47  import org.apache.hadoop.hbase.security.User;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.hbase.util.Pair;
50  import org.apache.hadoop.util.StringUtils;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  import com.google.common.base.Preconditions;
54  
55  /**
56   * Compact region on request and then run split if appropriate
57   */
58  @InterfaceAudience.Private
59  public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
60    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
61  
62    // Configuration key for the large compaction threads.
63    public final static String LARGE_COMPACTION_THREADS =
64        "hbase.regionserver.thread.compaction.large";
65    public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
66    
67    // Configuration key for the small compaction threads.
68    public final static String SMALL_COMPACTION_THREADS =
69        "hbase.regionserver.thread.compaction.small";
70    public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
71    
72    // Configuration key for split threads
73    public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
74    public final static int SPLIT_THREADS_DEFAULT = 1;
75    
76    // Configuration keys for merge threads
77    public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
78    public final static int MERGE_THREADS_DEFAULT = 1;
79  
80    public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
81        "hbase.regionserver.regionSplitLimit";
82    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
83    
84    private final HRegionServer server;
85    private final Configuration conf;
86  
87    private final ThreadPoolExecutor longCompactions;
88    private final ThreadPoolExecutor shortCompactions;
89    private final ThreadPoolExecutor splits;
90    private final ThreadPoolExecutor mergePool;
91  
92    private volatile CompactionThroughputController compactionThroughputController;
93  
94    /**
95     * Splitting should not take place if the total number of regions exceed this.
96     * This is not a hard limit to the number of regions but it is a guideline to
97     * stop splitting after number of online regions is greater than this.
98     */
99    private int regionSplitLimit;
100 
101   /** @param server */
102   CompactSplitThread(HRegionServer server) {
103     super();
104     this.server = server;
105     this.conf = server.getConfiguration();
106     this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
107         DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
108 
109     int largeThreads = Math.max(1, conf.getInt(
110         LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
111     int smallThreads = conf.getInt(
112         SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
113 
114     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
115 
116     // if we have throttle threads, make sure the user also specified size
117     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
118 
119     final String n = Thread.currentThread().getName();
120 
121     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
122         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
123         new ThreadFactory() {
124           @Override
125           public Thread newThread(Runnable r) {
126             Thread t = new Thread(r);
127             t.setName(n + "-longCompactions-" + System.currentTimeMillis());
128             return t;
129           }
130       });
131     this.longCompactions.setRejectedExecutionHandler(new Rejection());
132     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
133         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
134         new ThreadFactory() {
135           @Override
136           public Thread newThread(Runnable r) {
137             Thread t = new Thread(r);
138             t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
139             return t;
140           }
141       });
142     this.shortCompactions
143         .setRejectedExecutionHandler(new Rejection());
144     this.splits = (ThreadPoolExecutor)
145         Executors.newFixedThreadPool(splitThreads,
146             new ThreadFactory() {
147           @Override
148           public Thread newThread(Runnable r) {
149             Thread t = new Thread(r);
150             t.setName(n + "-splits-" + System.currentTimeMillis());
151             return t;
152           }
153       });
154     int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
155     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
156         mergeThreads, new ThreadFactory() {
157           @Override
158           public Thread newThread(Runnable r) {
159             Thread t = new Thread(r);
160             t.setName(n + "-merges-" + System.currentTimeMillis());
161             return t;
162           }
163         });
164 
165     // compaction throughput controller
166     this.compactionThroughputController =
167         CompactionThroughputControllerFactory.create(server, conf);
168   }
169 
170   @Override
171   public String toString() {
172     return "compaction_queue=("
173         + longCompactions.getQueue().size() + ":"
174         + shortCompactions.getQueue().size() + ")"
175         + ", split_queue=" + splits.getQueue().size()
176         + ", merge_queue=" + mergePool.getQueue().size();
177   }
178   
179   public String dumpQueue() {
180     StringBuffer queueLists = new StringBuffer();
181     queueLists.append("Compaction/Split Queue dump:\n");
182     queueLists.append("  LargeCompation Queue:\n");
183     BlockingQueue<Runnable> lq = longCompactions.getQueue();
184     Iterator<Runnable> it = lq.iterator();
185     while (it.hasNext()) {
186       queueLists.append("    " + it.next().toString());
187       queueLists.append("\n");
188     }
189 
190     if (shortCompactions != null) {
191       queueLists.append("\n");
192       queueLists.append("  SmallCompation Queue:\n");
193       lq = shortCompactions.getQueue();
194       it = lq.iterator();
195       while (it.hasNext()) {
196         queueLists.append("    " + it.next().toString());
197         queueLists.append("\n");
198       }
199     }
200 
201     queueLists.append("\n");
202     queueLists.append("  Split Queue:\n");
203     lq = splits.getQueue();
204     it = lq.iterator();
205     while (it.hasNext()) {
206       queueLists.append("    " + it.next().toString());
207       queueLists.append("\n");
208     }
209 
210     queueLists.append("\n");
211     queueLists.append("  Region Merge Queue:\n");
212     lq = mergePool.getQueue();
213     it = lq.iterator();
214     while (it.hasNext()) {
215       queueLists.append("    " + it.next().toString());
216       queueLists.append("\n");
217     }
218 
219     return queueLists.toString();
220   }
221 
222   public synchronized void requestRegionsMerge(final Region a,
223       final Region b, final boolean forcible, long masterSystemTime, User user) {
224     try {
225       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
226       if (LOG.isDebugEnabled()) {
227         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
228             + forcible + ".  " + this);
229       }
230     } catch (RejectedExecutionException ree) {
231       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
232           + forcible, ree);
233     }
234   }
235 
236   public synchronized boolean requestSplit(final Region r) {
237     // don't split regions that are blocking
238     if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
239       byte[] midKey = ((HRegion)r).checkSplit();
240       if (midKey != null) {
241         requestSplit(r, midKey);
242         return true;
243       }
244     }
245     return false;
246   }
247 
248   public synchronized void requestSplit(final Region r, byte[] midKey) {
249     requestSplit(r, midKey, null);
250   }
251 
252   /*
253    * The User parameter allows the split thread to assume the correct user identity
254    */
255   public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
256     if (midKey == null) {
257       LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
258         " not splittable because midkey=null");
259       if (((HRegion)r).shouldForceSplit()) {
260         ((HRegion)r).clearSplit();
261       }
262       return;
263     }
264     try {
265       this.splits.execute(new SplitRequest(r, midKey, this.server, user));
266       if (LOG.isDebugEnabled()) {
267         LOG.debug("Split requested for " + r + ".  " + this);
268       }
269     } catch (RejectedExecutionException ree) {
270       LOG.info("Could not execute split for " + r, ree);
271     }
272   }
273 
274   @Override
275   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
276       throws IOException {
277     return requestCompaction(r, why, null);
278   }
279 
280   @Override
281   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
282       List<Pair<CompactionRequest, Store>> requests) throws IOException {
283     return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
284   }
285 
286   @Override
287   public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
288       final String why, CompactionRequest request) throws IOException {
289     return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
290   }
291 
292   @Override
293   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
294       int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
295     return requestCompactionInternal(r, why, p, requests, true, user);
296   }
297 
298   private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
299       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
300           throws IOException {
301     // not a special compaction request, so make our own list
302     List<CompactionRequest> ret = null;
303     if (requests == null) {
304       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
305       for (Store s : r.getStores()) {
306         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
307         if (selectNow) ret.add(cr);
308       }
309     } else {
310       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
311       ret = new ArrayList<CompactionRequest>(requests.size());
312       for (Pair<CompactionRequest, Store> pair : requests) {
313         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
314       }
315     }
316     return ret;
317   }
318 
319   public CompactionRequest requestCompaction(final Region r, final Store s,
320       final String why, int priority, CompactionRequest request, User user) throws IOException {
321     return requestCompactionInternal(r, s, why, priority, request, true, user);
322   }
323 
324   public synchronized void requestSystemCompaction(
325       final Region r, final String why) throws IOException {
326     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
327   }
328 
329   public void requestSystemCompaction(
330       final Region r, final Store s, final String why) throws IOException {
331     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
332   }
333 
334   /**
335    * @param r region store belongs to
336    * @param s Store to request compaction on
337    * @param why Why compaction requested -- used in debug messages
338    * @param priority override the default priority (NO_PRIORITY == decide)
339    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
340    *          compaction will be used.
341    */
342   private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
343       final String why, int priority, CompactionRequest request, boolean selectNow, User user)
344           throws IOException {
345     if (this.server.isStopped()
346         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
347       return null;
348     }
349 
350     CompactionContext compaction = null;
351     if (selectNow) {
352       compaction = selectCompaction(r, s, priority, request, user);
353       if (compaction == null) return null; // message logged inside
354     }
355 
356     // We assume that most compactions are small. So, put system compactions into small
357     // pool; we will do selection there, and move to large pool if necessary.
358     ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
359       ? longCompactions : shortCompactions;
360     pool.execute(new CompactionRunner(s, r, compaction, pool, user));
361     if (LOG.isDebugEnabled()) {
362       String type = (pool == shortCompactions) ? "Small " : "Large ";
363       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
364           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
365     }
366     return selectNow ? compaction.getRequest() : null;
367   }
368 
369   private CompactionContext selectCompaction(final Region r, final Store s,
370       int priority, CompactionRequest request, User user) throws IOException {
371     CompactionContext compaction = s.requestCompaction(priority, request, user);
372     if (compaction == null) {
373       if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
374         LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
375             " because compaction request was cancelled");
376       }
377       return null;
378     }
379     assert compaction.hasSelection();
380     if (priority != Store.NO_PRIORITY) {
381       compaction.getRequest().setPriority(priority);
382     }
383     return compaction;
384   }
385 
386   /**
387    * Only interrupt once it's done with a run through the work loop.
388    */
389   void interruptIfNecessary() {
390     splits.shutdown();
391     mergePool.shutdown();
392     longCompactions.shutdown();
393     shortCompactions.shutdown();
394   }
395 
396   private void waitFor(ThreadPoolExecutor t, String name) {
397     boolean done = false;
398     while (!done) {
399       try {
400         done = t.awaitTermination(60, TimeUnit.SECONDS);
401         LOG.info("Waiting for " + name + " to finish...");
402         if (!done) {
403           t.shutdownNow();
404         }
405       } catch (InterruptedException ie) {
406         LOG.warn("Interrupted waiting for " + name + " to finish...");
407       }
408     }
409   }
410 
411   void join() {
412     waitFor(splits, "Split Thread");
413     waitFor(mergePool, "Merge Thread");
414     waitFor(longCompactions, "Large Compaction Thread");
415     waitFor(shortCompactions, "Small Compaction Thread");
416   }
417 
418   /**
419    * Returns the current size of the queue containing regions that are
420    * processed.
421    *
422    * @return The current size of the regions queue.
423    */
424   public int getCompactionQueueSize() {
425     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
426   }
427 
428   public int getLargeCompactionQueueSize() {
429     return longCompactions.getQueue().size();
430   }
431 
432 
433   public int getSmallCompactionQueueSize() {
434     return shortCompactions.getQueue().size();
435   }
436 
437   public int getSplitQueueSize() {
438     return splits.getQueue().size();
439   }
440 
441   private boolean shouldSplitRegion() {
442     if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
443       LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
444           + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
445     }
446     return (regionSplitLimit > server.getNumberOfOnlineRegions());
447   }
448 
449   /**
450    * @return the regionSplitLimit
451    */
452   public int getRegionSplitLimit() {
453     return this.regionSplitLimit;
454   }
455 
456   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
457       justification="Contrived use of compareTo")
458   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
459     private final Store store;
460     private final HRegion region;
461     private CompactionContext compaction;
462     private int queuedPriority;
463     private ThreadPoolExecutor parent;
464     private User user;
465 
466     public CompactionRunner(Store store, Region region,
467         CompactionContext compaction, ThreadPoolExecutor parent, User user) {
468       super();
469       this.store = store;
470       this.region = (HRegion)region;
471       this.compaction = compaction;
472       this.queuedPriority = (this.compaction == null)
473           ? store.getCompactPriority() : compaction.getRequest().getPriority();
474       this.parent = parent;
475       this.user = user;
476     }
477 
478     @Override
479     public String toString() {
480       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
481           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
482     }
483 
484     private void doCompaction(User user) {
485       // Common case - system compaction without a file selection. Select now.
486       if (this.compaction == null) {
487         int oldPriority = this.queuedPriority;
488         this.queuedPriority = this.store.getCompactPriority();
489         if (this.queuedPriority > oldPriority) {
490           // Store priority decreased while we were in queue (due to some other compaction?),
491           // requeue with new priority to avoid blocking potential higher priorities.
492           this.parent.execute(this);
493           return;
494         }
495         try {
496           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
497         } catch (IOException ex) {
498           LOG.error("Compaction selection failed " + this, ex);
499           server.checkFileSystem();
500           return;
501         }
502         if (this.compaction == null) return; // nothing to do
503         // Now see if we are in correct pool for the size; if not, go to the correct one.
504         // We might end up waiting for a while, so cancel the selection.
505         assert this.compaction.hasSelection();
506         ThreadPoolExecutor pool = store.throttleCompaction(
507             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
508         if (this.parent != pool) {
509           this.store.cancelRequestedCompaction(this.compaction);
510           this.compaction = null;
511           this.parent = pool;
512           this.parent.execute(this);
513           return;
514         }
515       }
516       // Finally we can compact something.
517       assert this.compaction != null;
518 
519       this.compaction.getRequest().beforeExecute();
520       try {
521         // Note: please don't put single-compaction logic here;
522         //       put it into region/store/etc. This is CST logic.
523         long start = EnvironmentEdgeManager.currentTime();
524         boolean completed =
525             region.compact(compaction, store, compactionThroughputController, user);
526         long now = EnvironmentEdgeManager.currentTime();
527         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
528               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
529         if (completed) {
530           // degenerate case: blocked regions require recursive enqueues
531           if (store.getCompactPriority() <= 0) {
532             requestSystemCompaction(region, store, "Recursive enqueue");
533           } else {
534             // see if the compaction has caused us to exceed max region size
535             requestSplit(region);
536           }
537         }
538       } catch (IOException ex) {
539         IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
540         LOG.error("Compaction failed " + this, remoteEx);
541         if (remoteEx != ex) {
542           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
543         }
544         server.checkFileSystem();
545       } catch (Exception ex) {
546         LOG.error("Compaction failed " + this, ex);
547         server.checkFileSystem();
548       } finally {
549         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
550       }
551       this.compaction.getRequest().afterExecute();
552     }
553 
554     @Override
555     public void run() {
556       Preconditions.checkNotNull(server);
557       if (server.isStopped()
558           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
559         return;
560       }
561       doCompaction(user);
562     }
563 
564     private String formatStackTrace(Exception ex) {
565       StringWriter sw = new StringWriter();
566       PrintWriter pw = new PrintWriter(sw);
567       ex.printStackTrace(pw);
568       pw.flush();
569       return sw.toString();
570     }
571 
572     @Override
573     public int compareTo(CompactionRunner o) {
574       // Only compare the underlying request (if any), for queue sorting purposes.
575       int compareVal = queuedPriority - o.queuedPriority; // compare priority
576       if (compareVal != 0) return compareVal;
577       CompactionContext tc = this.compaction, oc = o.compaction;
578       // Sort pre-selected (user?) compactions before system ones with equal priority.
579       return (tc == null) ? ((oc == null) ? 0 : 1)
580           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
581     }
582   }
583 
584   /**
585    * Cleanup class to use when rejecting a compaction request from the queue.
586    */
587   private static class Rejection implements RejectedExecutionHandler {
588     @Override
589     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
590       if (runnable instanceof CompactionRunner) {
591         CompactionRunner runner = (CompactionRunner)runnable;
592         LOG.debug("Compaction Rejected: " + runner);
593         runner.store.cancelRequestedCompaction(runner.compaction);
594       }
595     }
596   }
597 
598   /**
599    * {@inheritDoc}
600    */
601   @Override
602   public void onConfigurationChange(Configuration newConf) {
603     // Check if number of large / small compaction threads has changed, and then
604     // adjust the core pool size of the thread pools, by using the
605     // setCorePoolSize() method. According to the javadocs, it is safe to
606     // change the core pool size on-the-fly. We need to reset the maximum
607     // pool size, as well.
608     int largeThreads = Math.max(1, newConf.getInt(
609             LARGE_COMPACTION_THREADS,
610             LARGE_COMPACTION_THREADS_DEFAULT));
611     if (this.longCompactions.getCorePoolSize() != largeThreads) {
612       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
613               " from " + this.longCompactions.getCorePoolSize() + " to " +
614               largeThreads);
615       if(this.longCompactions.getCorePoolSize() < largeThreads) {
616         this.longCompactions.setMaximumPoolSize(largeThreads);
617         this.longCompactions.setCorePoolSize(largeThreads);
618       } else {
619         this.longCompactions.setCorePoolSize(largeThreads);
620         this.longCompactions.setMaximumPoolSize(largeThreads);
621       }
622     }
623 
624     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
625             SMALL_COMPACTION_THREADS_DEFAULT);
626     if (this.shortCompactions.getCorePoolSize() != smallThreads) {
627       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
628                 " from " + this.shortCompactions.getCorePoolSize() + " to " +
629                 smallThreads);
630       if(this.shortCompactions.getCorePoolSize() < smallThreads) {
631         this.shortCompactions.setMaximumPoolSize(smallThreads);
632         this.shortCompactions.setCorePoolSize(smallThreads);
633       } else {
634         this.shortCompactions.setCorePoolSize(smallThreads);
635         this.shortCompactions.setMaximumPoolSize(smallThreads);
636       }
637     }
638 
639     int splitThreads = newConf.getInt(SPLIT_THREADS,
640             SPLIT_THREADS_DEFAULT);
641     if (this.splits.getCorePoolSize() != splitThreads) {
642       LOG.info("Changing the value of " + SPLIT_THREADS +
643                 " from " + this.splits.getCorePoolSize() + " to " +
644                 splitThreads);
645       if(this.splits.getCorePoolSize() < splitThreads) {
646         this.splits.setMaximumPoolSize(splitThreads);
647         this.splits.setCorePoolSize(splitThreads);
648       } else {
649         this.splits.setCorePoolSize(splitThreads);
650         this.splits.setMaximumPoolSize(splitThreads);
651       }
652     }
653 
654     int mergeThreads = newConf.getInt(MERGE_THREADS,
655             MERGE_THREADS_DEFAULT);
656     if (this.mergePool.getCorePoolSize() != mergeThreads) {
657       LOG.info("Changing the value of " + MERGE_THREADS +
658                 " from " + this.mergePool.getCorePoolSize() + " to " +
659                 mergeThreads);
660       if(this.mergePool.getCorePoolSize() < mergeThreads) {
661         this.mergePool.setMaximumPoolSize(mergeThreads);
662         this.mergePool.setCorePoolSize(mergeThreads);
663       } else {
664         this.mergePool.setCorePoolSize(mergeThreads);
665         this.mergePool.setMaximumPoolSize(mergeThreads);
666       }
667     }
668 
669     CompactionThroughputController old = this.compactionThroughputController;
670     if (old != null) {
671       old.stop("configuration change");
672     }
673     this.compactionThroughputController =
674         CompactionThroughputControllerFactory.create(server, newConf);
675 
676     // We change this atomically here instead of reloading the config in order that upstream
677     // would be the only one with the flexibility to reload the config.
678     this.conf.reloadConfiguration();
679   }
680 
681   protected int getSmallCompactionThreadNum() {
682     return this.shortCompactions.getCorePoolSize();
683   }
684 
685   protected int getLargeCompactionThreadNum() {
686     return this.longCompactions.getCorePoolSize();
687   }
688 
689   protected int getSplitThreadNum() {
690     return this.splits.getCorePoolSize();
691   }
692 
693   protected int getMergeThreadNum() {
694     return this.mergePool.getCorePoolSize();
695   }
696 
697   /**
698    * {@inheritDoc}
699    */
700   @Override
701   public void registerChildren(ConfigurationManager manager) {
702     // No children to register.
703   }
704 
705   /**
706    * {@inheritDoc}
707    */
708   @Override
709   public void deregisterChildren(ConfigurationManager manager) {
710     // No children to register
711   }
712 
713   @VisibleForTesting
714   public CompactionThroughputController getCompactionThroughputController() {
715     return compactionThroughputController;
716   }
717 
718 }