View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.RejectedExecutionHandler;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.conf.ConfigurationManager;
40  import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
41  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
42  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
43  import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
44  import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
45  import org.apache.hadoop.hbase.security.User;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.Pair;
48  import org.apache.hadoop.hbase.util.StealJobQueue;
49  import org.apache.hadoop.ipc.RemoteException;
50  import org.apache.hadoop.util.StringUtils;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  import com.google.common.base.Preconditions;
54  
55  /**
56   * Compact region on request and then run split if appropriate
57   */
58  @InterfaceAudience.Private
59  public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
60    private static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
61  
62    // Configuration key for the large compaction threads.
63    public final static String LARGE_COMPACTION_THREADS =
64        "hbase.regionserver.thread.compaction.large";
65    public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
66    
67    // Configuration key for the small compaction threads.
68    public final static String SMALL_COMPACTION_THREADS =
69        "hbase.regionserver.thread.compaction.small";
70    public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
71    
72    // Configuration key for split threads
73    public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
74    public final static int SPLIT_THREADS_DEFAULT = 1;
75    
76    // Configuration keys for merge threads
77    public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
78    public final static int MERGE_THREADS_DEFAULT = 1;
79  
80    public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
81        "hbase.regionserver.regionSplitLimit";
82    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
83    
84    private final HRegionServer server;
85    private final Configuration conf;
86  
87    private final ThreadPoolExecutor longCompactions;
88    private final ThreadPoolExecutor shortCompactions;
89    private final ThreadPoolExecutor splits;
90    private final ThreadPoolExecutor mergePool;
91  
92    private volatile ThroughputController compactionThroughputController;
93  
94    /**
95     * Splitting should not take place if the total number of regions exceed this.
96     * This is not a hard limit to the number of regions but it is a guideline to
97     * stop splitting after number of online regions is greater than this.
98     */
99    private int regionSplitLimit;
100 
101   /** @param server */
102   CompactSplitThread(HRegionServer server) {
103     super();
104     this.server = server;
105     this.conf = server.getConfiguration();
106     this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
107         DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
108 
109     int largeThreads = Math.max(1, conf.getInt(
110         LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
111     int smallThreads = conf.getInt(
112         SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
113 
114     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
115 
116     // if we have throttle threads, make sure the user also specified size
117     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
118 
119     final String n = Thread.currentThread().getName();
120 
121     StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
122     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
123         60, TimeUnit.SECONDS, stealJobQueue,
124         new ThreadFactory() {
125           @Override
126           public Thread newThread(Runnable r) {
127             Thread t = new Thread(r);
128             t.setName(n + "-longCompactions-" + System.currentTimeMillis());
129             return t;
130           }
131       });
132     this.longCompactions.setRejectedExecutionHandler(new Rejection());
133     this.longCompactions.prestartAllCoreThreads();
134     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
135         60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
136         new ThreadFactory() {
137           @Override
138           public Thread newThread(Runnable r) {
139             Thread t = new Thread(r);
140             t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
141             return t;
142           }
143       });
144     this.shortCompactions
145         .setRejectedExecutionHandler(new Rejection());
146     this.splits = (ThreadPoolExecutor)
147         Executors.newFixedThreadPool(splitThreads,
148             new ThreadFactory() {
149           @Override
150           public Thread newThread(Runnable r) {
151             Thread t = new Thread(r);
152             t.setName(n + "-splits-" + System.currentTimeMillis());
153             return t;
154           }
155       });
156     int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
157     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
158         mergeThreads, new ThreadFactory() {
159           @Override
160           public Thread newThread(Runnable r) {
161             Thread t = new Thread(r);
162             t.setName(n + "-merges-" + System.currentTimeMillis());
163             return t;
164           }
165         });
166 
167     // compaction throughput controller
168     this.compactionThroughputController =
169         CompactionThroughputControllerFactory.create(server, conf);
170   }
171 
172   @Override
173   public String toString() {
174     return "compaction_queue=("
175         + longCompactions.getQueue().size() + ":"
176         + shortCompactions.getQueue().size() + ")"
177         + ", split_queue=" + splits.getQueue().size()
178         + ", merge_queue=" + mergePool.getQueue().size();
179   }
180   
181   public String dumpQueue() {
182     StringBuffer queueLists = new StringBuffer();
183     queueLists.append("Compaction/Split Queue dump:\n");
184     queueLists.append("  LargeCompation Queue:\n");
185     BlockingQueue<Runnable> lq = longCompactions.getQueue();
186     Iterator<Runnable> it = lq.iterator();
187     while (it.hasNext()) {
188       queueLists.append("    " + it.next().toString());
189       queueLists.append("\n");
190     }
191 
192     if (shortCompactions != null) {
193       queueLists.append("\n");
194       queueLists.append("  SmallCompation Queue:\n");
195       lq = shortCompactions.getQueue();
196       it = lq.iterator();
197       while (it.hasNext()) {
198         queueLists.append("    " + it.next().toString());
199         queueLists.append("\n");
200       }
201     }
202 
203     queueLists.append("\n");
204     queueLists.append("  Split Queue:\n");
205     lq = splits.getQueue();
206     it = lq.iterator();
207     while (it.hasNext()) {
208       queueLists.append("    " + it.next().toString());
209       queueLists.append("\n");
210     }
211 
212     queueLists.append("\n");
213     queueLists.append("  Region Merge Queue:\n");
214     lq = mergePool.getQueue();
215     it = lq.iterator();
216     while (it.hasNext()) {
217       queueLists.append("    " + it.next().toString());
218       queueLists.append("\n");
219     }
220 
221     return queueLists.toString();
222   }
223 
224   public synchronized void requestRegionsMerge(final Region a,
225       final Region b, final boolean forcible, long masterSystemTime, User user) {
226     try {
227       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
228       if (LOG.isDebugEnabled()) {
229         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
230             + forcible + ".  " + this);
231       }
232     } catch (RejectedExecutionException ree) {
233       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
234           + forcible, ree);
235     }
236   }
237 
238   public synchronized boolean requestSplit(final Region r) {
239     // don't split regions that are blocking
240     if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
241       byte[] midKey = ((HRegion)r).checkSplit();
242       if (midKey != null) {
243         requestSplit(r, midKey);
244         return true;
245       }
246     }
247     return false;
248   }
249 
250   public synchronized void requestSplit(final Region r, byte[] midKey) {
251     requestSplit(r, midKey, null);
252   }
253 
254   /*
255    * The User parameter allows the split thread to assume the correct user identity
256    */
257   public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
258     if (midKey == null) {
259       LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
260         " not splittable because midkey=null");
261       if (((HRegion)r).shouldForceSplit()) {
262         ((HRegion)r).clearSplit();
263       }
264       return;
265     }
266     try {
267       this.splits.execute(new SplitRequest(r, midKey, this.server, user));
268       if (LOG.isDebugEnabled()) {
269         LOG.debug("Split requested for " + r + ".  " + this);
270       }
271     } catch (RejectedExecutionException ree) {
272       LOG.info("Could not execute split for " + r, ree);
273     }
274   }
275 
276   @Override
277   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
278       throws IOException {
279     return requestCompaction(r, why, null);
280   }
281 
282   @Override
283   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
284       List<Pair<CompactionRequest, Store>> requests) throws IOException {
285     return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
286   }
287 
288   @Override
289   public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
290       final String why, CompactionRequest request) throws IOException {
291     return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
292   }
293 
294   @Override
295   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
296       int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
297     return requestCompactionInternal(r, why, p, requests, true, user);
298   }
299 
300   private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
301       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
302           throws IOException {
303     // not a special compaction request, so make our own list
304     List<CompactionRequest> ret = null;
305     if (requests == null) {
306       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
307       for (Store s : r.getStores()) {
308         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
309         if (selectNow) ret.add(cr);
310       }
311     } else {
312       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
313       ret = new ArrayList<CompactionRequest>(requests.size());
314       for (Pair<CompactionRequest, Store> pair : requests) {
315         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
316       }
317     }
318     return ret;
319   }
320 
321   public CompactionRequest requestCompaction(final Region r, final Store s,
322       final String why, int priority, CompactionRequest request, User user) throws IOException {
323     return requestCompactionInternal(r, s, why, priority, request, true, user);
324   }
325 
326   public synchronized void requestSystemCompaction(
327       final Region r, final String why) throws IOException {
328     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
329   }
330 
331   public void requestSystemCompaction(
332       final Region r, final Store s, final String why) throws IOException {
333     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
334   }
335 
336   /**
337    * @param r region store belongs to
338    * @param s Store to request compaction on
339    * @param why Why compaction requested -- used in debug messages
340    * @param priority override the default priority (NO_PRIORITY == decide)
341    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
342    *          compaction will be used.
343    */
344   private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
345       final String why, int priority, CompactionRequest request, boolean selectNow, User user)
346           throws IOException {
347     if (this.server.isStopped()
348         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
349       return null;
350     }
351 
352     CompactionContext compaction = null;
353     if (selectNow) {
354       compaction = selectCompaction(r, s, priority, request, user);
355       if (compaction == null) return null; // message logged inside
356     }
357 
358     // We assume that most compactions are small. So, put system compactions into small
359     // pool; we will do selection there, and move to large pool if necessary.
360     ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
361       ? longCompactions : shortCompactions;
362     pool.execute(new CompactionRunner(s, r, compaction, pool, user));
363     if (LOG.isDebugEnabled()) {
364       String type = (pool == shortCompactions) ? "Small " : "Large ";
365       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
366           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
367     }
368     return selectNow ? compaction.getRequest() : null;
369   }
370 
371   private CompactionContext selectCompaction(final Region r, final Store s,
372       int priority, CompactionRequest request, User user) throws IOException {
373     CompactionContext compaction = s.requestCompaction(priority, request, user);
374     if (compaction == null) {
375       if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
376         LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
377             " because compaction request was cancelled");
378       }
379       return null;
380     }
381     assert compaction.hasSelection();
382     if (priority != Store.NO_PRIORITY) {
383       compaction.getRequest().setPriority(priority);
384     }
385     return compaction;
386   }
387 
388   /**
389    * Only interrupt once it's done with a run through the work loop.
390    */
391   void interruptIfNecessary() {
392     splits.shutdown();
393     mergePool.shutdown();
394     longCompactions.shutdown();
395     shortCompactions.shutdown();
396   }
397 
398   private void waitFor(ThreadPoolExecutor t, String name) {
399     boolean done = false;
400     while (!done) {
401       try {
402         done = t.awaitTermination(60, TimeUnit.SECONDS);
403         LOG.info("Waiting for " + name + " to finish...");
404         if (!done) {
405           t.shutdownNow();
406         }
407       } catch (InterruptedException ie) {
408         LOG.warn("Interrupted waiting for " + name + " to finish...");
409       }
410     }
411   }
412 
413   void join() {
414     waitFor(splits, "Split Thread");
415     waitFor(mergePool, "Merge Thread");
416     waitFor(longCompactions, "Large Compaction Thread");
417     waitFor(shortCompactions, "Small Compaction Thread");
418   }
419 
420   /**
421    * Returns the current size of the queue containing regions that are
422    * processed.
423    *
424    * @return The current size of the regions queue.
425    */
426   public int getCompactionQueueSize() {
427     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
428   }
429 
430   public int getLargeCompactionQueueSize() {
431     return longCompactions.getQueue().size();
432   }
433 
434 
435   public int getSmallCompactionQueueSize() {
436     return shortCompactions.getQueue().size();
437   }
438 
439   public int getSplitQueueSize() {
440     return splits.getQueue().size();
441   }
442 
443   private boolean shouldSplitRegion() {
444     if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
445       LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
446           + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
447     }
448     return (regionSplitLimit > server.getNumberOfOnlineRegions());
449   }
450 
451   /**
452    * @return the regionSplitLimit
453    */
454   public int getRegionSplitLimit() {
455     return this.regionSplitLimit;
456   }
457 
458   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
459       justification="Contrived use of compareTo")
460   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
461     private final Store store;
462     private final HRegion region;
463     private CompactionContext compaction;
464     private int queuedPriority;
465     private ThreadPoolExecutor parent;
466     private User user;
467 
468     public CompactionRunner(Store store, Region region,
469         CompactionContext compaction, ThreadPoolExecutor parent, User user) {
470       super();
471       this.store = store;
472       this.region = (HRegion)region;
473       this.compaction = compaction;
474       this.queuedPriority = (this.compaction == null)
475           ? store.getCompactPriority() : compaction.getRequest().getPriority();
476       this.parent = parent;
477       this.user = user;
478     }
479 
480     @Override
481     public String toString() {
482       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
483           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
484     }
485 
486     private void doCompaction(User user) {
487       // Common case - system compaction without a file selection. Select now.
488       if (this.compaction == null) {
489         int oldPriority = this.queuedPriority;
490         this.queuedPriority = this.store.getCompactPriority();
491         if (this.queuedPriority > oldPriority) {
492           // Store priority decreased while we were in queue (due to some other compaction?),
493           // requeue with new priority to avoid blocking potential higher priorities.
494           this.parent.execute(this);
495           return;
496         }
497         try {
498           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
499         } catch (IOException ex) {
500           LOG.error("Compaction selection failed " + this, ex);
501           server.checkFileSystem();
502           return;
503         }
504         if (this.compaction == null) return; // nothing to do
505         // Now see if we are in correct pool for the size; if not, go to the correct one.
506         // We might end up waiting for a while, so cancel the selection.
507         assert this.compaction.hasSelection();
508         ThreadPoolExecutor pool = store.throttleCompaction(
509             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
510 
511         // Long compaction pool can process small job
512         // Short compaction pool should not process large job
513         if (this.parent == shortCompactions && pool == longCompactions) {
514           this.store.cancelRequestedCompaction(this.compaction);
515           this.compaction = null;
516           this.parent = pool;
517           this.parent.execute(this);
518           return;
519         }
520       }
521       // Finally we can compact something.
522       assert this.compaction != null;
523 
524       this.compaction.getRequest().beforeExecute();
525       try {
526         // Note: please don't put single-compaction logic here;
527         //       put it into region/store/etc. This is CST logic.
528         long start = EnvironmentEdgeManager.currentTime();
529         boolean completed =
530             region.compact(compaction, store, compactionThroughputController, user);
531         long now = EnvironmentEdgeManager.currentTime();
532         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
533               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
534         if (completed) {
535           // degenerate case: blocked regions require recursive enqueues
536           if (store.getCompactPriority() <= 0) {
537             requestSystemCompaction(region, store, "Recursive enqueue");
538           } else {
539             // see if the compaction has caused us to exceed max region size
540             requestSplit(region);
541           }
542         }
543       } catch (IOException ex) {
544         IOException remoteEx =
545             ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
546         LOG.error("Compaction failed " + this, remoteEx);
547         if (remoteEx != ex) {
548           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
549         }
550         server.checkFileSystem();
551       } catch (Exception ex) {
552         LOG.error("Compaction failed " + this, ex);
553         server.checkFileSystem();
554       } finally {
555         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
556       }
557       this.compaction.getRequest().afterExecute();
558     }
559 
560     @Override
561     public void run() {
562       Preconditions.checkNotNull(server);
563       if (server.isStopped()
564           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
565         return;
566       }
567       doCompaction(user);
568     }
569 
570     private String formatStackTrace(Exception ex) {
571       StringWriter sw = new StringWriter();
572       PrintWriter pw = new PrintWriter(sw);
573       ex.printStackTrace(pw);
574       pw.flush();
575       return sw.toString();
576     }
577 
578     @Override
579     public int compareTo(CompactionRunner o) {
580       // Only compare the underlying request (if any), for queue sorting purposes.
581       int compareVal = queuedPriority - o.queuedPriority; // compare priority
582       if (compareVal != 0) return compareVal;
583       CompactionContext tc = this.compaction, oc = o.compaction;
584       // Sort pre-selected (user?) compactions before system ones with equal priority.
585       return (tc == null) ? ((oc == null) ? 0 : 1)
586           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
587     }
588   }
589 
590   /**
591    * Cleanup class to use when rejecting a compaction request from the queue.
592    */
593   private static class Rejection implements RejectedExecutionHandler {
594     @Override
595     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
596       if (runnable instanceof CompactionRunner) {
597         CompactionRunner runner = (CompactionRunner)runnable;
598         LOG.debug("Compaction Rejected: " + runner);
599         runner.store.cancelRequestedCompaction(runner.compaction);
600       }
601     }
602   }
603 
604   /**
605    * {@inheritDoc}
606    */
607   @Override
608   public void onConfigurationChange(Configuration newConf) {
609     // Check if number of large / small compaction threads has changed, and then
610     // adjust the core pool size of the thread pools, by using the
611     // setCorePoolSize() method. According to the javadocs, it is safe to
612     // change the core pool size on-the-fly. We need to reset the maximum
613     // pool size, as well.
614     int largeThreads = Math.max(1, newConf.getInt(
615             LARGE_COMPACTION_THREADS,
616             LARGE_COMPACTION_THREADS_DEFAULT));
617     if (this.longCompactions.getCorePoolSize() != largeThreads) {
618       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
619               " from " + this.longCompactions.getCorePoolSize() + " to " +
620               largeThreads);
621       if(this.longCompactions.getCorePoolSize() < largeThreads) {
622         this.longCompactions.setMaximumPoolSize(largeThreads);
623         this.longCompactions.setCorePoolSize(largeThreads);
624       } else {
625         this.longCompactions.setCorePoolSize(largeThreads);
626         this.longCompactions.setMaximumPoolSize(largeThreads);
627       }
628     }
629 
630     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
631             SMALL_COMPACTION_THREADS_DEFAULT);
632     if (this.shortCompactions.getCorePoolSize() != smallThreads) {
633       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
634                 " from " + this.shortCompactions.getCorePoolSize() + " to " +
635                 smallThreads);
636       if(this.shortCompactions.getCorePoolSize() < smallThreads) {
637         this.shortCompactions.setMaximumPoolSize(smallThreads);
638         this.shortCompactions.setCorePoolSize(smallThreads);
639       } else {
640         this.shortCompactions.setCorePoolSize(smallThreads);
641         this.shortCompactions.setMaximumPoolSize(smallThreads);
642       }
643     }
644 
645     int splitThreads = newConf.getInt(SPLIT_THREADS,
646             SPLIT_THREADS_DEFAULT);
647     if (this.splits.getCorePoolSize() != splitThreads) {
648       LOG.info("Changing the value of " + SPLIT_THREADS +
649                 " from " + this.splits.getCorePoolSize() + " to " +
650                 splitThreads);
651       if(this.splits.getCorePoolSize() < splitThreads) {
652         this.splits.setMaximumPoolSize(splitThreads);
653         this.splits.setCorePoolSize(splitThreads);
654       } else {
655         this.splits.setCorePoolSize(splitThreads);
656         this.splits.setMaximumPoolSize(splitThreads);
657       }
658     }
659 
660     int mergeThreads = newConf.getInt(MERGE_THREADS,
661             MERGE_THREADS_DEFAULT);
662     if (this.mergePool.getCorePoolSize() != mergeThreads) {
663       LOG.info("Changing the value of " + MERGE_THREADS +
664                 " from " + this.mergePool.getCorePoolSize() + " to " +
665                 mergeThreads);
666       if(this.mergePool.getCorePoolSize() < mergeThreads) {
667         this.mergePool.setMaximumPoolSize(mergeThreads);
668         this.mergePool.setCorePoolSize(mergeThreads);
669       } else {
670         this.mergePool.setCorePoolSize(mergeThreads);
671         this.mergePool.setMaximumPoolSize(mergeThreads);
672       }
673     }
674 
675     ThroughputController old = this.compactionThroughputController;
676     if (old != null) {
677       old.stop("configuration change");
678     }
679     this.compactionThroughputController =
680         CompactionThroughputControllerFactory.create(server, newConf);
681 
682     // We change this atomically here instead of reloading the config in order that upstream
683     // would be the only one with the flexibility to reload the config.
684     this.conf.reloadConfiguration();
685   }
686 
687   protected int getSmallCompactionThreadNum() {
688     return this.shortCompactions.getCorePoolSize();
689   }
690 
691   protected int getLargeCompactionThreadNum() {
692     return this.longCompactions.getCorePoolSize();
693   }
694 
695   protected int getSplitThreadNum() {
696     return this.splits.getCorePoolSize();
697   }
698 
699   protected int getMergeThreadNum() {
700     return this.mergePool.getCorePoolSize();
701   }
702 
703   /**
704    * {@inheritDoc}
705    */
706   @Override
707   public void registerChildren(ConfigurationManager manager) {
708     // No children to register.
709   }
710 
711   /**
712    * {@inheritDoc}
713    */
714   @Override
715   public void deregisterChildren(ConfigurationManager manager) {
716     // No children to register
717   }
718 
719   @VisibleForTesting
720   public ThroughputController getCompactionThroughputController() {
721     return compactionThroughputController;
722   }
723 
724   @VisibleForTesting
725   /**
726    * Shutdown the long compaction thread pool.
727    * Should only be used in unit test to prevent long compaction thread pool from stealing job
728    * from short compaction queue
729    */
730   void shutdownLongCompactions(){
731     this.longCompactions.shutdown();
732   }
733 }