View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.RejectedExecutionHandler;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.RemoteExceptionHandler;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.conf.ConfigurationManager;
41  import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
42  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
43  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
44  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
45  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
46  import org.apache.hadoop.hbase.security.User;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.hbase.util.StealJobQueue;
50  import org.apache.hadoop.util.StringUtils;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  import com.google.common.base.Preconditions;
54  
55  /**
56   * Compact region on request and then run split if appropriate
57   */
58  @InterfaceAudience.Private
59  public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
60    private static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
61  
62    // Configuration key for the large compaction threads.
63    public final static String LARGE_COMPACTION_THREADS =
64        "hbase.regionserver.thread.compaction.large";
65    public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
66    
67    // Configuration key for the small compaction threads.
68    public final static String SMALL_COMPACTION_THREADS =
69        "hbase.regionserver.thread.compaction.small";
70    public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
71    
72    // Configuration key for split threads
73    public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
74    public final static int SPLIT_THREADS_DEFAULT = 1;
75    
76    // Configuration keys for merge threads
77    public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
78    public final static int MERGE_THREADS_DEFAULT = 1;
79  
80    public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
81        "hbase.regionserver.regionSplitLimit";
82    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
83    
84    private final HRegionServer server;
85    private final Configuration conf;
86  
87    private final ThreadPoolExecutor longCompactions;
88    private final ThreadPoolExecutor shortCompactions;
89    private final ThreadPoolExecutor splits;
90    private final ThreadPoolExecutor mergePool;
91  
92    private volatile CompactionThroughputController compactionThroughputController;
93  
94    /**
95     * Splitting should not take place if the total number of regions exceed this.
96     * This is not a hard limit to the number of regions but it is a guideline to
97     * stop splitting after number of online regions is greater than this.
98     */
99    private int regionSplitLimit;
100 
101   /** @param server */
102   CompactSplitThread(HRegionServer server) {
103     super();
104     this.server = server;
105     this.conf = server.getConfiguration();
106     this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
107         DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
108 
109     int largeThreads = Math.max(1, conf.getInt(
110         LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
111     int smallThreads = conf.getInt(
112         SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
113 
114     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
115 
116     // if we have throttle threads, make sure the user also specified size
117     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
118 
119     final String n = Thread.currentThread().getName();
120 
121     StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
122     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
123         60, TimeUnit.SECONDS, stealJobQueue,
124         new ThreadFactory() {
125           @Override
126           public Thread newThread(Runnable r) {
127             String name = n + "-longCompactions-" + System.currentTimeMillis();
128             return new Thread(r, name);
129           }
130       });
131     this.longCompactions.setRejectedExecutionHandler(new Rejection());
132     this.longCompactions.prestartAllCoreThreads();
133     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
134         60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
135         new ThreadFactory() {
136           @Override
137           public Thread newThread(Runnable r) {
138             String name = n + "-shortCompactions-" + System.currentTimeMillis();
139             return new Thread(r, name);
140           }
141       });
142     this.shortCompactions
143         .setRejectedExecutionHandler(new Rejection());
144     this.splits = (ThreadPoolExecutor)
145         Executors.newFixedThreadPool(splitThreads,
146             new ThreadFactory() {
147           @Override
148           public Thread newThread(Runnable r) {
149             String name = n + "-splits-" + System.currentTimeMillis();
150             return new Thread(r, name);
151           }
152       });
153     int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
154     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
155         mergeThreads, new ThreadFactory() {
156           @Override
157           public Thread newThread(Runnable r) {
158             String name = n + "-merges-" + System.currentTimeMillis();
159             return new Thread(r, name);
160           }
161         });
162 
163     // compaction throughput controller
164     this.compactionThroughputController =
165         CompactionThroughputControllerFactory.create(server, conf);
166   }
167 
168   @Override
169   public String toString() {
170     return "compaction_queue=("
171         + longCompactions.getQueue().size() + ":"
172         + shortCompactions.getQueue().size() + ")"
173         + ", split_queue=" + splits.getQueue().size()
174         + ", merge_queue=" + mergePool.getQueue().size();
175   }
176   
177   public String dumpQueue() {
178     StringBuffer queueLists = new StringBuffer();
179     queueLists.append("Compaction/Split Queue dump:\n");
180     queueLists.append("  LargeCompation Queue:\n");
181     BlockingQueue<Runnable> lq = longCompactions.getQueue();
182     Iterator<Runnable> it = lq.iterator();
183     while (it.hasNext()) {
184       queueLists.append("    " + it.next().toString());
185       queueLists.append("\n");
186     }
187 
188     if (shortCompactions != null) {
189       queueLists.append("\n");
190       queueLists.append("  SmallCompation Queue:\n");
191       lq = shortCompactions.getQueue();
192       it = lq.iterator();
193       while (it.hasNext()) {
194         queueLists.append("    " + it.next().toString());
195         queueLists.append("\n");
196       }
197     }
198 
199     queueLists.append("\n");
200     queueLists.append("  Split Queue:\n");
201     lq = splits.getQueue();
202     it = lq.iterator();
203     while (it.hasNext()) {
204       queueLists.append("    " + it.next().toString());
205       queueLists.append("\n");
206     }
207 
208     queueLists.append("\n");
209     queueLists.append("  Region Merge Queue:\n");
210     lq = mergePool.getQueue();
211     it = lq.iterator();
212     while (it.hasNext()) {
213       queueLists.append("    " + it.next().toString());
214       queueLists.append("\n");
215     }
216 
217     return queueLists.toString();
218   }
219 
220   public synchronized void requestRegionsMerge(final Region a,
221       final Region b, final boolean forcible, long masterSystemTime, User user) {
222     try {
223       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
224       if (LOG.isDebugEnabled()) {
225         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
226             + forcible + ".  " + this);
227       }
228     } catch (RejectedExecutionException ree) {
229       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
230           + forcible, ree);
231     }
232   }
233 
234   public synchronized boolean requestSplit(final Region r) {
235     // don't split regions that are blocking
236     if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
237       byte[] midKey = ((HRegion)r).checkSplit();
238       if (midKey != null) {
239         requestSplit(r, midKey);
240         return true;
241       }
242     }
243     return false;
244   }
245 
246   public synchronized void requestSplit(final Region r, byte[] midKey) {
247     requestSplit(r, midKey, null);
248   }
249 
250   /*
251    * The User parameter allows the split thread to assume the correct user identity
252    */
253   public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
254     if (midKey == null) {
255       LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
256         " not splittable because midkey=null");
257       if (((HRegion)r).shouldForceSplit()) {
258         ((HRegion)r).clearSplit();
259       }
260       return;
261     }
262     try {
263       this.splits.execute(new SplitRequest(r, midKey, this.server, user));
264       if (LOG.isDebugEnabled()) {
265         LOG.debug("Split requested for " + r + ".  " + this);
266       }
267     } catch (RejectedExecutionException ree) {
268       LOG.info("Could not execute split for " + r, ree);
269     }
270   }
271 
272   @Override
273   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
274       throws IOException {
275     return requestCompaction(r, why, null);
276   }
277 
278   @Override
279   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
280       List<Pair<CompactionRequest, Store>> requests) throws IOException {
281     return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
282   }
283 
284   @Override
285   public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
286       final String why, CompactionRequest request) throws IOException {
287     return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
288   }
289 
290   @Override
291   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
292       int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
293     return requestCompactionInternal(r, why, p, requests, true, user);
294   }
295 
296   private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
297       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
298           throws IOException {
299     // not a special compaction request, so make our own list
300     List<CompactionRequest> ret = null;
301     if (requests == null) {
302       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
303       for (Store s : r.getStores()) {
304         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
305         if (selectNow) ret.add(cr);
306       }
307     } else {
308       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
309       ret = new ArrayList<CompactionRequest>(requests.size());
310       for (Pair<CompactionRequest, Store> pair : requests) {
311         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
312       }
313     }
314     return ret;
315   }
316 
317   public CompactionRequest requestCompaction(final Region r, final Store s,
318       final String why, int priority, CompactionRequest request, User user) throws IOException {
319     return requestCompactionInternal(r, s, why, priority, request, true, user);
320   }
321 
322   public synchronized void requestSystemCompaction(
323       final Region r, final String why) throws IOException {
324     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
325   }
326 
327   public void requestSystemCompaction(
328       final Region r, final Store s, final String why) throws IOException {
329     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
330   }
331 
332   /**
333    * @param r region store belongs to
334    * @param s Store to request compaction on
335    * @param why Why compaction requested -- used in debug messages
336    * @param priority override the default priority (NO_PRIORITY == decide)
337    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
338    *          compaction will be used.
339    */
340   private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
341       final String why, int priority, CompactionRequest request, boolean selectNow, User user)
342           throws IOException {
343     if (this.server.isStopped()
344         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
345       return null;
346     }
347 
348     CompactionContext compaction = null;
349     if (selectNow) {
350       compaction = selectCompaction(r, s, priority, request, user);
351       if (compaction == null) return null; // message logged inside
352     }
353 
354     // We assume that most compactions are small. So, put system compactions into small
355     // pool; we will do selection there, and move to large pool if necessary.
356     ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
357       ? longCompactions : shortCompactions;
358     pool.execute(new CompactionRunner(s, r, compaction, pool, user));
359     if (LOG.isDebugEnabled()) {
360       String type = (pool == shortCompactions) ? "Small " : "Large ";
361       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
362           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
363     }
364     return selectNow ? compaction.getRequest() : null;
365   }
366 
367   private CompactionContext selectCompaction(final Region r, final Store s,
368       int priority, CompactionRequest request, User user) throws IOException {
369     CompactionContext compaction = s.requestCompaction(priority, request, user);
370     if (compaction == null) {
371       if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
372         LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
373             " because compaction request was cancelled");
374       }
375       return null;
376     }
377     assert compaction.hasSelection();
378     if (priority != Store.NO_PRIORITY) {
379       compaction.getRequest().setPriority(priority);
380     }
381     return compaction;
382   }
383 
384   /**
385    * Only interrupt once it's done with a run through the work loop.
386    */
387   void interruptIfNecessary() {
388     splits.shutdown();
389     mergePool.shutdown();
390     longCompactions.shutdown();
391     shortCompactions.shutdown();
392   }
393 
394   private void waitFor(ThreadPoolExecutor t, String name) {
395     boolean done = false;
396     while (!done) {
397       try {
398         done = t.awaitTermination(60, TimeUnit.SECONDS);
399         LOG.info("Waiting for " + name + " to finish...");
400         if (!done) {
401           t.shutdownNow();
402         }
403       } catch (InterruptedException ie) {
404         LOG.warn("Interrupted waiting for " + name + " to finish...");
405       }
406     }
407   }
408 
409   void join() {
410     waitFor(splits, "Split Thread");
411     waitFor(mergePool, "Merge Thread");
412     waitFor(longCompactions, "Large Compaction Thread");
413     waitFor(shortCompactions, "Small Compaction Thread");
414   }
415 
416   /**
417    * Returns the current size of the queue containing regions that are
418    * processed.
419    *
420    * @return The current size of the regions queue.
421    */
422   public int getCompactionQueueSize() {
423     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
424   }
425 
426   public int getLargeCompactionQueueSize() {
427     return longCompactions.getQueue().size();
428   }
429 
430 
431   public int getSmallCompactionQueueSize() {
432     return shortCompactions.getQueue().size();
433   }
434 
435   public int getSplitQueueSize() {
436     return splits.getQueue().size();
437   }
438 
439   private boolean shouldSplitRegion() {
440     if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
441       LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
442           + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
443     }
444     return (regionSplitLimit > server.getNumberOfOnlineRegions());
445   }
446 
447   /**
448    * @return the regionSplitLimit
449    */
450   public int getRegionSplitLimit() {
451     return this.regionSplitLimit;
452   }
453 
454   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
455       justification="Contrived use of compareTo")
456   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
457     private final Store store;
458     private final HRegion region;
459     private CompactionContext compaction;
460     private int queuedPriority;
461     private ThreadPoolExecutor parent;
462     private User user;
463 
464     public CompactionRunner(Store store, Region region,
465         CompactionContext compaction, ThreadPoolExecutor parent, User user) {
466       super();
467       this.store = store;
468       this.region = (HRegion)region;
469       this.compaction = compaction;
470       this.queuedPriority = (this.compaction == null)
471           ? store.getCompactPriority() : compaction.getRequest().getPriority();
472       this.parent = parent;
473       this.user = user;
474     }
475 
476     @Override
477     public String toString() {
478       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
479           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
480     }
481 
482     private void doCompaction(User user) {
483       // Common case - system compaction without a file selection. Select now.
484       if (this.compaction == null) {
485         int oldPriority = this.queuedPriority;
486         this.queuedPriority = this.store.getCompactPriority();
487         if (this.queuedPriority > oldPriority) {
488           // Store priority decreased while we were in queue (due to some other compaction?),
489           // requeue with new priority to avoid blocking potential higher priorities.
490           this.parent.execute(this);
491           return;
492         }
493         try {
494           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
495         } catch (IOException ex) {
496           LOG.error("Compaction selection failed " + this, ex);
497           server.checkFileSystem();
498           return;
499         }
500         if (this.compaction == null) return; // nothing to do
501         // Now see if we are in correct pool for the size; if not, go to the correct one.
502         // We might end up waiting for a while, so cancel the selection.
503         assert this.compaction.hasSelection();
504         ThreadPoolExecutor pool = store.throttleCompaction(
505             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
506 
507         // Long compaction pool can process small job
508         // Short compaction pool should not process large job
509         if (this.parent == shortCompactions && pool == longCompactions) {
510           this.store.cancelRequestedCompaction(this.compaction);
511           this.compaction = null;
512           this.parent = pool;
513           this.parent.execute(this);
514           return;
515         }
516       }
517       // Finally we can compact something.
518       assert this.compaction != null;
519 
520       this.compaction.getRequest().beforeExecute();
521       try {
522         // Note: please don't put single-compaction logic here;
523         //       put it into region/store/etc. This is CST logic.
524         long start = EnvironmentEdgeManager.currentTime();
525         boolean completed =
526             region.compact(compaction, store, compactionThroughputController, user);
527         long now = EnvironmentEdgeManager.currentTime();
528         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
529               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
530         if (completed) {
531           // degenerate case: blocked regions require recursive enqueues
532           if (store.getCompactPriority() <= 0) {
533             requestSystemCompaction(region, store, "Recursive enqueue");
534           } else {
535             // see if the compaction has caused us to exceed max region size
536             requestSplit(region);
537           }
538         }
539       } catch (IOException ex) {
540         IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
541         LOG.error("Compaction failed " + this, remoteEx);
542         if (remoteEx != ex) {
543           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
544         }
545         server.checkFileSystem();
546       } catch (Exception ex) {
547         LOG.error("Compaction failed " + this, ex);
548         server.checkFileSystem();
549       } finally {
550         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
551       }
552       this.compaction.getRequest().afterExecute();
553     }
554 
555     @Override
556     public void run() {
557       Preconditions.checkNotNull(server);
558       if (server.isStopped()
559           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
560         return;
561       }
562       doCompaction(user);
563     }
564 
565     private String formatStackTrace(Exception ex) {
566       StringWriter sw = new StringWriter();
567       PrintWriter pw = new PrintWriter(sw);
568       ex.printStackTrace(pw);
569       pw.flush();
570       return sw.toString();
571     }
572 
573     @Override
574     public int compareTo(CompactionRunner o) {
575       // Only compare the underlying request (if any), for queue sorting purposes.
576       int compareVal = queuedPriority - o.queuedPriority; // compare priority
577       if (compareVal != 0) return compareVal;
578       CompactionContext tc = this.compaction, oc = o.compaction;
579       // Sort pre-selected (user?) compactions before system ones with equal priority.
580       return (tc == null) ? ((oc == null) ? 0 : 1)
581           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
582     }
583   }
584 
585   /**
586    * Cleanup class to use when rejecting a compaction request from the queue.
587    */
588   private static class Rejection implements RejectedExecutionHandler {
589     @Override
590     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
591       if (runnable instanceof CompactionRunner) {
592         CompactionRunner runner = (CompactionRunner)runnable;
593         LOG.debug("Compaction Rejected: " + runner);
594         runner.store.cancelRequestedCompaction(runner.compaction);
595       }
596     }
597   }
598 
599   /**
600    * {@inheritDoc}
601    */
602   @Override
603   public void onConfigurationChange(Configuration newConf) {
604     // Check if number of large / small compaction threads has changed, and then
605     // adjust the core pool size of the thread pools, by using the
606     // setCorePoolSize() method. According to the javadocs, it is safe to
607     // change the core pool size on-the-fly. We need to reset the maximum
608     // pool size, as well.
609     int largeThreads = Math.max(1, newConf.getInt(
610             LARGE_COMPACTION_THREADS,
611             LARGE_COMPACTION_THREADS_DEFAULT));
612     if (this.longCompactions.getCorePoolSize() != largeThreads) {
613       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
614               " from " + this.longCompactions.getCorePoolSize() + " to " +
615               largeThreads);
616       if(this.longCompactions.getCorePoolSize() < largeThreads) {
617         this.longCompactions.setMaximumPoolSize(largeThreads);
618         this.longCompactions.setCorePoolSize(largeThreads);
619       } else {
620         this.longCompactions.setCorePoolSize(largeThreads);
621         this.longCompactions.setMaximumPoolSize(largeThreads);
622       }
623     }
624 
625     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
626             SMALL_COMPACTION_THREADS_DEFAULT);
627     if (this.shortCompactions.getCorePoolSize() != smallThreads) {
628       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
629                 " from " + this.shortCompactions.getCorePoolSize() + " to " +
630                 smallThreads);
631       if(this.shortCompactions.getCorePoolSize() < smallThreads) {
632         this.shortCompactions.setMaximumPoolSize(smallThreads);
633         this.shortCompactions.setCorePoolSize(smallThreads);
634       } else {
635         this.shortCompactions.setCorePoolSize(smallThreads);
636         this.shortCompactions.setMaximumPoolSize(smallThreads);
637       }
638     }
639 
640     int splitThreads = newConf.getInt(SPLIT_THREADS,
641             SPLIT_THREADS_DEFAULT);
642     if (this.splits.getCorePoolSize() != splitThreads) {
643       LOG.info("Changing the value of " + SPLIT_THREADS +
644                 " from " + this.splits.getCorePoolSize() + " to " +
645                 splitThreads);
646       if(this.splits.getCorePoolSize() < splitThreads) {
647         this.splits.setMaximumPoolSize(splitThreads);
648         this.splits.setCorePoolSize(splitThreads);
649       } else {
650         this.splits.setCorePoolSize(splitThreads);
651         this.splits.setMaximumPoolSize(splitThreads);
652       }
653     }
654 
655     int mergeThreads = newConf.getInt(MERGE_THREADS,
656             MERGE_THREADS_DEFAULT);
657     if (this.mergePool.getCorePoolSize() != mergeThreads) {
658       LOG.info("Changing the value of " + MERGE_THREADS +
659                 " from " + this.mergePool.getCorePoolSize() + " to " +
660                 mergeThreads);
661       if(this.mergePool.getCorePoolSize() < mergeThreads) {
662         this.mergePool.setMaximumPoolSize(mergeThreads);
663         this.mergePool.setCorePoolSize(mergeThreads);
664       } else {
665         this.mergePool.setCorePoolSize(mergeThreads);
666         this.mergePool.setMaximumPoolSize(mergeThreads);
667       }
668     }
669 
670     CompactionThroughputController old = this.compactionThroughputController;
671     if (old != null) {
672       old.stop("configuration change");
673     }
674     this.compactionThroughputController =
675         CompactionThroughputControllerFactory.create(server, newConf);
676 
677     // We change this atomically here instead of reloading the config in order that upstream
678     // would be the only one with the flexibility to reload the config.
679     this.conf.reloadConfiguration();
680   }
681 
682   protected int getSmallCompactionThreadNum() {
683     return this.shortCompactions.getCorePoolSize();
684   }
685 
686   protected int getLargeCompactionThreadNum() {
687     return this.longCompactions.getCorePoolSize();
688   }
689 
690   protected int getSplitThreadNum() {
691     return this.splits.getCorePoolSize();
692   }
693 
694   protected int getMergeThreadNum() {
695     return this.mergePool.getCorePoolSize();
696   }
697 
698   /**
699    * {@inheritDoc}
700    */
701   @Override
702   public void registerChildren(ConfigurationManager manager) {
703     // No children to register.
704   }
705 
706   /**
707    * {@inheritDoc}
708    */
709   @Override
710   public void deregisterChildren(ConfigurationManager manager) {
711     // No children to register
712   }
713 
714   @VisibleForTesting
715   public CompactionThroughputController getCompactionThroughputController() {
716     return compactionThroughputController;
717   }
718 
719   @VisibleForTesting
720   /**
721    * Shutdown the long compaction thread pool.
722    * Should only be used in unit test to prevent long compaction thread pool from stealing job
723    * from short compaction queue
724    */
725   void shutdownLongCompactions(){
726     this.longCompactions.shutdown();
727   }
728 }