View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.PriorityBlockingQueue;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.conf.ConfigurationManager;
41  import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
42  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
43  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
44  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
45  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.Pair;
48  import org.apache.hadoop.ipc.RemoteException;
49  import org.apache.hadoop.util.StringUtils;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.base.Preconditions;
53  
54  /**
55   * Compact region on request and then run split if appropriate
56   */
57  @InterfaceAudience.Private
58  public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
59    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
60  
61    // Configuration key for the large compaction threads.
62    public final static String LARGE_COMPACTION_THREADS =
63        "hbase.regionserver.thread.compaction.large";
64    public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
65    
66    // Configuration key for the small compaction threads.
67    public final static String SMALL_COMPACTION_THREADS =
68        "hbase.regionserver.thread.compaction.small";
69    public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
70    
71    // Configuration key for split threads
72    public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
73    public final static int SPLIT_THREADS_DEFAULT = 1;
74    
75    // Configuration keys for merge threads
76    public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
77    public final static int MERGE_THREADS_DEFAULT = 1;
78    
79    private final HRegionServer server;
80    private final Configuration conf;
81  
82    private final ThreadPoolExecutor longCompactions;
83    private final ThreadPoolExecutor shortCompactions;
84    private final ThreadPoolExecutor splits;
85    private final ThreadPoolExecutor mergePool;
86  
87    private volatile CompactionThroughputController compactionThroughputController;
88  
89    /**
90     * Splitting should not take place if the total number of regions exceed this.
91     * This is not a hard limit to the number of regions but it is a guideline to
92     * stop splitting after number of online regions is greater than this.
93     */
94    private int regionSplitLimit;
95  
96    /** @param server */
97    CompactSplitThread(HRegionServer server) {
98      super();
99      this.server = server;
100     this.conf = server.getConfiguration();
101     this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
102         Integer.MAX_VALUE);
103 
104     int largeThreads = Math.max(1, conf.getInt(
105         LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
106     int smallThreads = conf.getInt(
107         SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
108 
109     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
110 
111     // if we have throttle threads, make sure the user also specified size
112     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
113 
114     final String n = Thread.currentThread().getName();
115 
116     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
117         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
118         new ThreadFactory() {
119           @Override
120           public Thread newThread(Runnable r) {
121             Thread t = new Thread(r);
122             t.setName(n + "-longCompactions-" + System.currentTimeMillis());
123             return t;
124           }
125       });
126     this.longCompactions.setRejectedExecutionHandler(new Rejection());
127     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
128         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
129         new ThreadFactory() {
130           @Override
131           public Thread newThread(Runnable r) {
132             Thread t = new Thread(r);
133             t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
134             return t;
135           }
136       });
137     this.shortCompactions
138         .setRejectedExecutionHandler(new Rejection());
139     this.splits = (ThreadPoolExecutor)
140         Executors.newFixedThreadPool(splitThreads,
141             new ThreadFactory() {
142           @Override
143           public Thread newThread(Runnable r) {
144             Thread t = new Thread(r);
145             t.setName(n + "-splits-" + System.currentTimeMillis());
146             return t;
147           }
148       });
149     int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
150     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
151         mergeThreads, new ThreadFactory() {
152           @Override
153           public Thread newThread(Runnable r) {
154             Thread t = new Thread(r);
155             t.setName(n + "-merges-" + System.currentTimeMillis());
156             return t;
157           }
158         });
159 
160     // compaction throughput controller
161     this.compactionThroughputController =
162         CompactionThroughputControllerFactory.create(server, conf);
163   }
164 
165   @Override
166   public String toString() {
167     return "compaction_queue=("
168         + longCompactions.getQueue().size() + ":"
169         + shortCompactions.getQueue().size() + ")"
170         + ", split_queue=" + splits.getQueue().size()
171         + ", merge_queue=" + mergePool.getQueue().size();
172   }
173   
174   public String dumpQueue() {
175     StringBuffer queueLists = new StringBuffer();
176     queueLists.append("Compaction/Split Queue dump:\n");
177     queueLists.append("  LargeCompation Queue:\n");
178     BlockingQueue<Runnable> lq = longCompactions.getQueue();
179     Iterator<Runnable> it = lq.iterator();
180     while (it.hasNext()) {
181       queueLists.append("    " + it.next().toString());
182       queueLists.append("\n");
183     }
184 
185     if (shortCompactions != null) {
186       queueLists.append("\n");
187       queueLists.append("  SmallCompation Queue:\n");
188       lq = shortCompactions.getQueue();
189       it = lq.iterator();
190       while (it.hasNext()) {
191         queueLists.append("    " + it.next().toString());
192         queueLists.append("\n");
193       }
194     }
195 
196     queueLists.append("\n");
197     queueLists.append("  Split Queue:\n");
198     lq = splits.getQueue();
199     it = lq.iterator();
200     while (it.hasNext()) {
201       queueLists.append("    " + it.next().toString());
202       queueLists.append("\n");
203     }
204 
205     queueLists.append("\n");
206     queueLists.append("  Region Merge Queue:\n");
207     lq = mergePool.getQueue();
208     it = lq.iterator();
209     while (it.hasNext()) {
210       queueLists.append("    " + it.next().toString());
211       queueLists.append("\n");
212     }
213 
214     return queueLists.toString();
215   }
216 
217   public synchronized void requestRegionsMerge(final HRegion a,
218       final HRegion b, final boolean forcible) {
219     try {
220       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible));
221       if (LOG.isDebugEnabled()) {
222         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
223             + forcible + ".  " + this);
224       }
225     } catch (RejectedExecutionException ree) {
226       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
227           + forcible, ree);
228     }
229   }
230 
231   public synchronized boolean requestSplit(final HRegion r) {
232     // don't split regions that are blocking
233     if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
234       byte[] midKey = r.checkSplit();
235       if (midKey != null) {
236         requestSplit(r, midKey);
237         return true;
238       }
239     }
240     return false;
241   }
242 
243   public synchronized void requestSplit(final HRegion r, byte[] midKey) {
244     if (midKey == null) {
245       LOG.debug("Region " + r.getRegionNameAsString() +
246         " not splittable because midkey=null");
247       if (r.shouldForceSplit()) {
248         r.clearSplit();
249       }
250       return;
251     }
252     try {
253       this.splits.execute(new SplitRequest(r, midKey, this.server));
254       if (LOG.isDebugEnabled()) {
255         LOG.debug("Split requested for " + r + ".  " + this);
256       }
257     } catch (RejectedExecutionException ree) {
258       LOG.info("Could not execute split for " + r, ree);
259     }
260   }
261 
262   @Override
263   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
264       throws IOException {
265     return requestCompaction(r, why, null);
266   }
267 
268   @Override
269   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
270       List<Pair<CompactionRequest, Store>> requests) throws IOException {
271     return requestCompaction(r, why, Store.NO_PRIORITY, requests);
272   }
273 
274   @Override
275   public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
276       final String why, CompactionRequest request) throws IOException {
277     return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
278   }
279 
280   @Override
281   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
282       int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
283     return requestCompactionInternal(r, why, p, requests, true);
284   }
285 
286   private List<CompactionRequest> requestCompactionInternal(final HRegion r, final String why,
287       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
288     // not a special compaction request, so make our own list
289     List<CompactionRequest> ret = null;
290     if (requests == null) {
291       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
292       for (Store s : r.getStores().values()) {
293         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
294         if (selectNow) ret.add(cr);
295       }
296     } else {
297       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
298       ret = new ArrayList<CompactionRequest>(requests.size());
299       for (Pair<CompactionRequest, Store> pair : requests) {
300         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
301       }
302     }
303     return ret;
304   }
305 
306   public CompactionRequest requestCompaction(final HRegion r, final Store s,
307       final String why, int priority, CompactionRequest request) throws IOException {
308     return requestCompactionInternal(r, s, why, priority, request, true);
309   }
310 
311   public synchronized void requestSystemCompaction(
312       final HRegion r, final String why) throws IOException {
313     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
314   }
315 
316   public void requestSystemCompaction(
317       final HRegion r, final Store s, final String why) throws IOException {
318     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
319   }
320 
321   /**
322    * @param r HRegion store belongs to
323    * @param s Store to request compaction on
324    * @param why Why compaction requested -- used in debug messages
325    * @param priority override the default priority (NO_PRIORITY == decide)
326    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
327    *          compaction will be used.
328    */
329   private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
330       final String why, int priority, CompactionRequest request, boolean selectNow)
331           throws IOException {
332     if (this.server.isStopped()
333         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
334       return null;
335     }
336 
337     CompactionContext compaction = null;
338     if (selectNow) {
339       compaction = selectCompaction(r, s, priority, request);
340       if (compaction == null) return null; // message logged inside
341     }
342 
343     // We assume that most compactions are small. So, put system compactions into small
344     // pool; we will do selection there, and move to large pool if necessary.
345     long size = selectNow ? compaction.getRequest().getSize() : 0;
346     ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
347       ? longCompactions : shortCompactions;
348     pool.execute(new CompactionRunner(s, r, compaction, pool));
349     if (LOG.isDebugEnabled()) {
350       String type = (pool == shortCompactions) ? "Small " : "Large ";
351       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
352           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
353     }
354     return selectNow ? compaction.getRequest() : null;
355   }
356 
357   private CompactionContext selectCompaction(final HRegion r, final Store s,
358       int priority, CompactionRequest request) throws IOException {
359     CompactionContext compaction = s.requestCompaction(priority, request);
360     if (compaction == null) {
361       if(LOG.isDebugEnabled()) {
362         LOG.debug("Not compacting " + r.getRegionNameAsString() +
363             " because compaction request was cancelled");
364       }
365       return null;
366     }
367     assert compaction.hasSelection();
368     if (priority != Store.NO_PRIORITY) {
369       compaction.getRequest().setPriority(priority);
370     }
371     return compaction;
372   }
373 
374   /**
375    * Only interrupt once it's done with a run through the work loop.
376    */
377   void interruptIfNecessary() {
378     splits.shutdown();
379     mergePool.shutdown();
380     longCompactions.shutdown();
381     shortCompactions.shutdown();
382   }
383 
384   private void waitFor(ThreadPoolExecutor t, String name) {
385     boolean done = false;
386     while (!done) {
387       try {
388         done = t.awaitTermination(60, TimeUnit.SECONDS);
389         LOG.info("Waiting for " + name + " to finish...");
390         if (!done) {
391           t.shutdownNow();
392         }
393       } catch (InterruptedException ie) {
394         LOG.warn("Interrupted waiting for " + name + " to finish...");
395       }
396     }
397   }
398 
399   void join() {
400     waitFor(splits, "Split Thread");
401     waitFor(mergePool, "Merge Thread");
402     waitFor(longCompactions, "Large Compaction Thread");
403     waitFor(shortCompactions, "Small Compaction Thread");
404   }
405 
406   /**
407    * Returns the current size of the queue containing regions that are
408    * processed.
409    *
410    * @return The current size of the regions queue.
411    */
412   public int getCompactionQueueSize() {
413     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
414   }
415 
416   public int getLargeCompactionQueueSize() {
417     return longCompactions.getQueue().size();
418   }
419 
420 
421   public int getSmallCompactionQueueSize() {
422     return shortCompactions.getQueue().size();
423   }
424 
425   public int getSplitQueueSize() {
426     return splits.getQueue().size();
427   }
428 
429   private boolean shouldSplitRegion() {
430     return (regionSplitLimit > server.getNumberOfOnlineRegions());
431   }
432 
433   /**
434    * @return the regionSplitLimit
435    */
436   public int getRegionSplitLimit() {
437     return this.regionSplitLimit;
438   }
439 
440   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
441       justification="Contrived use of compareTo")
442   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
443     private final Store store;
444     private final HRegion region;
445     private CompactionContext compaction;
446     private int queuedPriority;
447     private ThreadPoolExecutor parent;
448 
449     public CompactionRunner(Store store, HRegion region,
450         CompactionContext compaction, ThreadPoolExecutor parent) {
451       super();
452       this.store = store;
453       this.region = region;
454       this.compaction = compaction;
455       this.queuedPriority = (this.compaction == null)
456           ? store.getCompactPriority() : compaction.getRequest().getPriority();
457       this.parent = parent;
458     }
459 
460     @Override
461     public String toString() {
462       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
463           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
464     }
465 
466     @Override
467     public void run() {
468       Preconditions.checkNotNull(server);
469       if (server.isStopped()
470           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
471         return;
472       }
473       // Common case - system compaction without a file selection. Select now.
474       if (this.compaction == null) {
475         int oldPriority = this.queuedPriority;
476         this.queuedPriority = this.store.getCompactPriority();
477         if (this.queuedPriority > oldPriority) {
478           // Store priority decreased while we were in queue (due to some other compaction?),
479           // requeue with new priority to avoid blocking potential higher priorities.
480           this.parent.execute(this);
481           return;
482         }
483         try {
484           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
485         } catch (IOException ex) {
486           LOG.error("Compaction selection failed " + this, ex);
487           server.checkFileSystem();
488           return;
489         }
490         if (this.compaction == null) return; // nothing to do
491         // Now see if we are in correct pool for the size; if not, go to the correct one.
492         // We might end up waiting for a while, so cancel the selection.
493         assert this.compaction.hasSelection();
494         ThreadPoolExecutor pool = store.throttleCompaction(
495             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
496         if (this.parent != pool) {
497           this.store.cancelRequestedCompaction(this.compaction);
498           this.compaction = null;
499           this.parent = pool;
500           this.parent.execute(this);
501           return;
502         }
503       }
504       // Finally we can compact something.
505       assert this.compaction != null;
506 
507       this.compaction.getRequest().beforeExecute();
508       try {
509         // Note: please don't put single-compaction logic here;
510         //       put it into region/store/etc. This is CST logic.
511         long start = EnvironmentEdgeManager.currentTime();
512         boolean completed =
513             region.compact(compaction, store, compactionThroughputController);
514         long now = EnvironmentEdgeManager.currentTime();
515         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
516               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
517         if (completed) {
518           // degenerate case: blocked regions require recursive enqueues
519           if (store.getCompactPriority() <= 0) {
520             requestSystemCompaction(region, store, "Recursive enqueue");
521           } else {
522             // see if the compaction has caused us to exceed max region size
523             requestSplit(region);
524           }
525         }
526       } catch (IOException ex) {
527         IOException remoteEx =
528             ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
529         LOG.error("Compaction failed " + this, remoteEx);
530         if (remoteEx != ex) {
531           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
532         }
533         server.checkFileSystem();
534       } catch (Exception ex) {
535         LOG.error("Compaction failed " + this, ex);
536         server.checkFileSystem();
537       } finally {
538         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
539       }
540       this.compaction.getRequest().afterExecute();
541     }
542 
543     private String formatStackTrace(Exception ex) {
544       StringWriter sw = new StringWriter();
545       PrintWriter pw = new PrintWriter(sw);
546       ex.printStackTrace(pw);
547       pw.flush();
548       return sw.toString();
549     }
550 
551     @Override
552     public int compareTo(CompactionRunner o) {
553       // Only compare the underlying request (if any), for queue sorting purposes.
554       int compareVal = queuedPriority - o.queuedPriority; // compare priority
555       if (compareVal != 0) return compareVal;
556       CompactionContext tc = this.compaction, oc = o.compaction;
557       // Sort pre-selected (user?) compactions before system ones with equal priority.
558       return (tc == null) ? ((oc == null) ? 0 : 1)
559           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
560     }
561   }
562 
563   /**
564    * Cleanup class to use when rejecting a compaction request from the queue.
565    */
566   private static class Rejection implements RejectedExecutionHandler {
567     @Override
568     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
569       if (runnable instanceof CompactionRunner) {
570         CompactionRunner runner = (CompactionRunner)runnable;
571         LOG.debug("Compaction Rejected: " + runner);
572         runner.store.cancelRequestedCompaction(runner.compaction);
573       }
574     }
575   }
576 
577   /**
578    * {@inheritDoc}
579    */
580   @Override
581   public void onConfigurationChange(Configuration newConf) {
582     // Check if number of large / small compaction threads has changed, and then
583     // adjust the core pool size of the thread pools, by using the
584     // setCorePoolSize() method. According to the javadocs, it is safe to
585     // change the core pool size on-the-fly. We need to reset the maximum
586     // pool size, as well.
587     int largeThreads = Math.max(1, newConf.getInt(
588             LARGE_COMPACTION_THREADS,
589             LARGE_COMPACTION_THREADS_DEFAULT));
590     if (this.longCompactions.getCorePoolSize() != largeThreads) {
591       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
592               " from " + this.longCompactions.getCorePoolSize() + " to " +
593               largeThreads);
594       this.longCompactions.setMaximumPoolSize(largeThreads);
595       this.longCompactions.setCorePoolSize(largeThreads);
596     }
597 
598     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
599             SMALL_COMPACTION_THREADS_DEFAULT);
600     if (this.shortCompactions.getCorePoolSize() != smallThreads) {
601       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
602                 " from " + this.shortCompactions.getCorePoolSize() + " to " +
603                 smallThreads);
604       this.shortCompactions.setMaximumPoolSize(smallThreads);
605       this.shortCompactions.setCorePoolSize(smallThreads);
606     }
607 
608     int splitThreads = newConf.getInt(SPLIT_THREADS,
609             SPLIT_THREADS_DEFAULT);
610     if (this.splits.getCorePoolSize() != splitThreads) {
611       LOG.info("Changing the value of " + SPLIT_THREADS +
612                 " from " + this.splits.getCorePoolSize() + " to " +
613                 splitThreads);
614       this.splits.setMaximumPoolSize(smallThreads);
615       this.splits.setCorePoolSize(smallThreads);
616     }
617 
618     int mergeThreads = newConf.getInt(MERGE_THREADS,
619             MERGE_THREADS_DEFAULT);
620     if (this.mergePool.getCorePoolSize() != mergeThreads) {
621       LOG.info("Changing the value of " + MERGE_THREADS +
622                 " from " + this.mergePool.getCorePoolSize() + " to " +
623                 mergeThreads);
624       this.mergePool.setMaximumPoolSize(smallThreads);
625       this.mergePool.setCorePoolSize(smallThreads);
626     }
627 
628     CompactionThroughputController old = this.compactionThroughputController;
629     if (old != null) {
630       old.stop("configuration change");
631     }
632     this.compactionThroughputController =
633         CompactionThroughputControllerFactory.create(server, newConf);
634 
635     // We change this atomically here instead of reloading the config in order that upstream
636     // would be the only one with the flexibility to reload the config.
637     this.conf.reloadConfiguration();
638   }
639 
640   protected int getSmallCompactionThreadNum() {
641     return this.shortCompactions.getCorePoolSize();
642   }
643 
644   public int getLargeCompactionThreadNum() {
645     return this.longCompactions.getCorePoolSize();
646   }
647 
648   /**
649    * {@inheritDoc}
650    */
651   @Override
652   public void registerChildren(ConfigurationManager manager) {
653     // No children to register.
654   }
655 
656   /**
657    * {@inheritDoc}
658    */
659   @Override
660   public void deregisterChildren(ConfigurationManager manager) {
661     // No children to register
662   }
663 
664   @VisibleForTesting
665   public CompactionThroughputController getCompactionThroughputController() {
666     return compactionThroughputController;
667   }
668 
669 }