1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.RejectedExecutionHandler;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.RemoteExceptionHandler;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.conf.ConfigurationManager;
41 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
42 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
43 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
44 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
45 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
46 import org.apache.hadoop.hbase.security.User;
47 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.util.StealJobQueue;
50 import org.apache.hadoop.util.StringUtils;
51
52 import com.google.common.annotations.VisibleForTesting;
53 import com.google.common.base.Preconditions;
54
55
56
57
58 @InterfaceAudience.Private
59 public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
60 private static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
61
62
63 public final static String LARGE_COMPACTION_THREADS =
64 "hbase.regionserver.thread.compaction.large";
65 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
66
67
68 public final static String SMALL_COMPACTION_THREADS =
69 "hbase.regionserver.thread.compaction.small";
70 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
71
72
73 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
74 public final static int SPLIT_THREADS_DEFAULT = 1;
75
76
77 public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
78 public final static int MERGE_THREADS_DEFAULT = 1;
79
80 public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
81 "hbase.regionserver.regionSplitLimit";
82 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
83
84 private final HRegionServer server;
85 private final Configuration conf;
86
87 private final ThreadPoolExecutor longCompactions;
88 private final ThreadPoolExecutor shortCompactions;
89 private final ThreadPoolExecutor splits;
90 private final ThreadPoolExecutor mergePool;
91
92 private volatile CompactionThroughputController compactionThroughputController;
93
94
95
96
97
98
99 private int regionSplitLimit;
100
101
102 CompactSplitThread(HRegionServer server) {
103 super();
104 this.server = server;
105 this.conf = server.getConfiguration();
106 this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
107 DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
108
109 int largeThreads = Math.max(1, conf.getInt(
110 LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
111 int smallThreads = conf.getInt(
112 SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
113
114 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
115
116
117 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
118
119 final String n = Thread.currentThread().getName();
120
121 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
122 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
123 60, TimeUnit.SECONDS, stealJobQueue,
124 new ThreadFactory() {
125 @Override
126 public Thread newThread(Runnable r) {
127 String name = n + "-longCompactions-" + System.currentTimeMillis();
128 return new Thread(r, name);
129 }
130 });
131 this.longCompactions.setRejectedExecutionHandler(new Rejection());
132 this.longCompactions.prestartAllCoreThreads();
133 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
134 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
135 new ThreadFactory() {
136 @Override
137 public Thread newThread(Runnable r) {
138 String name = n + "-shortCompactions-" + System.currentTimeMillis();
139 return new Thread(r, name);
140 }
141 });
142 this.shortCompactions
143 .setRejectedExecutionHandler(new Rejection());
144 this.splits = (ThreadPoolExecutor)
145 Executors.newFixedThreadPool(splitThreads,
146 new ThreadFactory() {
147 @Override
148 public Thread newThread(Runnable r) {
149 String name = n + "-splits-" + System.currentTimeMillis();
150 return new Thread(r, name);
151 }
152 });
153 int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
154 this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
155 mergeThreads, new ThreadFactory() {
156 @Override
157 public Thread newThread(Runnable r) {
158 String name = n + "-merges-" + System.currentTimeMillis();
159 return new Thread(r, name);
160 }
161 });
162
163
164 this.compactionThroughputController =
165 CompactionThroughputControllerFactory.create(server, conf);
166 }
167
168 @Override
169 public String toString() {
170 return "compaction_queue=("
171 + longCompactions.getQueue().size() + ":"
172 + shortCompactions.getQueue().size() + ")"
173 + ", split_queue=" + splits.getQueue().size()
174 + ", merge_queue=" + mergePool.getQueue().size();
175 }
176
177 public String dumpQueue() {
178 StringBuffer queueLists = new StringBuffer();
179 queueLists.append("Compaction/Split Queue dump:\n");
180 queueLists.append(" LargeCompation Queue:\n");
181 BlockingQueue<Runnable> lq = longCompactions.getQueue();
182 Iterator<Runnable> it = lq.iterator();
183 while (it.hasNext()) {
184 queueLists.append(" " + it.next().toString());
185 queueLists.append("\n");
186 }
187
188 if (shortCompactions != null) {
189 queueLists.append("\n");
190 queueLists.append(" SmallCompation Queue:\n");
191 lq = shortCompactions.getQueue();
192 it = lq.iterator();
193 while (it.hasNext()) {
194 queueLists.append(" " + it.next().toString());
195 queueLists.append("\n");
196 }
197 }
198
199 queueLists.append("\n");
200 queueLists.append(" Split Queue:\n");
201 lq = splits.getQueue();
202 it = lq.iterator();
203 while (it.hasNext()) {
204 queueLists.append(" " + it.next().toString());
205 queueLists.append("\n");
206 }
207
208 queueLists.append("\n");
209 queueLists.append(" Region Merge Queue:\n");
210 lq = mergePool.getQueue();
211 it = lq.iterator();
212 while (it.hasNext()) {
213 queueLists.append(" " + it.next().toString());
214 queueLists.append("\n");
215 }
216
217 return queueLists.toString();
218 }
219
220 public synchronized void requestRegionsMerge(final Region a,
221 final Region b, final boolean forcible, long masterSystemTime, User user) {
222 try {
223 mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
224 if (LOG.isDebugEnabled()) {
225 LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
226 + forcible + ". " + this);
227 }
228 } catch (RejectedExecutionException ree) {
229 LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
230 + forcible, ree);
231 }
232 }
233
234 public synchronized boolean requestSplit(final Region r) {
235
236 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
237 byte[] midKey = ((HRegion)r).checkSplit();
238 if (midKey != null) {
239 requestSplit(r, midKey);
240 return true;
241 }
242 }
243 return false;
244 }
245
246 public synchronized void requestSplit(final Region r, byte[] midKey) {
247 requestSplit(r, midKey, null);
248 }
249
250
251
252
253 public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
254 if (midKey == null) {
255 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
256 " not splittable because midkey=null");
257 if (((HRegion)r).shouldForceSplit()) {
258 ((HRegion)r).clearSplit();
259 }
260 return;
261 }
262 try {
263 this.splits.execute(new SplitRequest(r, midKey, this.server, user));
264 if (LOG.isDebugEnabled()) {
265 LOG.debug("Split requested for " + r + ". " + this);
266 }
267 } catch (RejectedExecutionException ree) {
268 LOG.info("Could not execute split for " + r, ree);
269 }
270 }
271
272 @Override
273 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
274 throws IOException {
275 return requestCompaction(r, why, null);
276 }
277
278 @Override
279 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
280 List<Pair<CompactionRequest, Store>> requests) throws IOException {
281 return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
282 }
283
284 @Override
285 public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
286 final String why, CompactionRequest request) throws IOException {
287 return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
288 }
289
290 @Override
291 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
292 int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
293 return requestCompactionInternal(r, why, p, requests, true, user);
294 }
295
296 private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
297 int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
298 throws IOException {
299
300 List<CompactionRequest> ret = null;
301 if (requests == null) {
302 ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
303 for (Store s : r.getStores()) {
304 CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
305 if (selectNow) ret.add(cr);
306 }
307 } else {
308 Preconditions.checkArgument(selectNow);
309 ret = new ArrayList<CompactionRequest>(requests.size());
310 for (Pair<CompactionRequest, Store> pair : requests) {
311 ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
312 }
313 }
314 return ret;
315 }
316
317 public CompactionRequest requestCompaction(final Region r, final Store s,
318 final String why, int priority, CompactionRequest request, User user) throws IOException {
319 return requestCompactionInternal(r, s, why, priority, request, true, user);
320 }
321
322 public synchronized void requestSystemCompaction(
323 final Region r, final String why) throws IOException {
324 requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
325 }
326
327 public void requestSystemCompaction(
328 final Region r, final Store s, final String why) throws IOException {
329 requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
330 }
331
332
333
334
335
336
337
338
339
340 private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
341 final String why, int priority, CompactionRequest request, boolean selectNow, User user)
342 throws IOException {
343 if (this.server.isStopped()
344 || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
345 return null;
346 }
347
348 CompactionContext compaction = null;
349 if (selectNow) {
350 compaction = selectCompaction(r, s, priority, request, user);
351 if (compaction == null) return null;
352 }
353
354
355
356 ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
357 ? longCompactions : shortCompactions;
358 pool.execute(new CompactionRunner(s, r, compaction, pool, user));
359 if (LOG.isDebugEnabled()) {
360 String type = (pool == shortCompactions) ? "Small " : "Large ";
361 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
362 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
363 }
364 return selectNow ? compaction.getRequest() : null;
365 }
366
367 private CompactionContext selectCompaction(final Region r, final Store s,
368 int priority, CompactionRequest request, User user) throws IOException {
369 CompactionContext compaction = s.requestCompaction(priority, request, user);
370 if (compaction == null) {
371 if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
372 LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
373 " because compaction request was cancelled");
374 }
375 return null;
376 }
377 assert compaction.hasSelection();
378 if (priority != Store.NO_PRIORITY) {
379 compaction.getRequest().setPriority(priority);
380 }
381 return compaction;
382 }
383
384
385
386
387 void interruptIfNecessary() {
388 splits.shutdown();
389 mergePool.shutdown();
390 longCompactions.shutdown();
391 shortCompactions.shutdown();
392 }
393
394 private void waitFor(ThreadPoolExecutor t, String name) {
395 boolean done = false;
396 while (!done) {
397 try {
398 done = t.awaitTermination(60, TimeUnit.SECONDS);
399 LOG.info("Waiting for " + name + " to finish...");
400 if (!done) {
401 t.shutdownNow();
402 }
403 } catch (InterruptedException ie) {
404 LOG.warn("Interrupted waiting for " + name + " to finish...");
405 }
406 }
407 }
408
409 void join() {
410 waitFor(splits, "Split Thread");
411 waitFor(mergePool, "Merge Thread");
412 waitFor(longCompactions, "Large Compaction Thread");
413 waitFor(shortCompactions, "Small Compaction Thread");
414 }
415
416
417
418
419
420
421
422 public int getCompactionQueueSize() {
423 return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
424 }
425
426 public int getLargeCompactionQueueSize() {
427 return longCompactions.getQueue().size();
428 }
429
430
431 public int getSmallCompactionQueueSize() {
432 return shortCompactions.getQueue().size();
433 }
434
435 public int getSplitQueueSize() {
436 return splits.getQueue().size();
437 }
438
439 private boolean shouldSplitRegion() {
440 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
441 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
442 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
443 }
444 return (regionSplitLimit > server.getNumberOfOnlineRegions());
445 }
446
447
448
449
450 public int getRegionSplitLimit() {
451 return this.regionSplitLimit;
452 }
453
454 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
455 justification="Contrived use of compareTo")
456 private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
457 private final Store store;
458 private final HRegion region;
459 private CompactionContext compaction;
460 private int queuedPriority;
461 private ThreadPoolExecutor parent;
462 private User user;
463
464 public CompactionRunner(Store store, Region region,
465 CompactionContext compaction, ThreadPoolExecutor parent, User user) {
466 super();
467 this.store = store;
468 this.region = (HRegion)region;
469 this.compaction = compaction;
470 this.queuedPriority = (this.compaction == null)
471 ? store.getCompactPriority() : compaction.getRequest().getPriority();
472 this.parent = parent;
473 this.user = user;
474 }
475
476 @Override
477 public String toString() {
478 return (this.compaction != null) ? ("Request = " + compaction.getRequest())
479 : ("Store = " + store.toString() + ", pri = " + queuedPriority);
480 }
481
482 private void doCompaction(User user) {
483
484 if (this.compaction == null) {
485 int oldPriority = this.queuedPriority;
486 this.queuedPriority = this.store.getCompactPriority();
487 if (this.queuedPriority > oldPriority) {
488
489
490 this.parent.execute(this);
491 return;
492 }
493 try {
494 this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
495 } catch (IOException ex) {
496 LOG.error("Compaction selection failed " + this, ex);
497 server.checkFileSystem();
498 return;
499 }
500 if (this.compaction == null) return;
501
502
503 assert this.compaction.hasSelection();
504 ThreadPoolExecutor pool = store.throttleCompaction(
505 compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
506
507
508
509 if (this.parent == shortCompactions && pool == longCompactions) {
510 this.store.cancelRequestedCompaction(this.compaction);
511 this.compaction = null;
512 this.parent = pool;
513 this.parent.execute(this);
514 return;
515 }
516 }
517
518 assert this.compaction != null;
519
520 this.compaction.getRequest().beforeExecute();
521 try {
522
523
524 long start = EnvironmentEdgeManager.currentTime();
525 boolean completed =
526 region.compact(compaction, store, compactionThroughputController, user);
527 long now = EnvironmentEdgeManager.currentTime();
528 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
529 this + "; duration=" + StringUtils.formatTimeDiff(now, start));
530 if (completed) {
531
532 if (store.getCompactPriority() <= 0) {
533 requestSystemCompaction(region, store, "Recursive enqueue");
534 } else {
535
536 requestSplit(region);
537 }
538 }
539 } catch (IOException ex) {
540 IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
541 LOG.error("Compaction failed " + this, remoteEx);
542 if (remoteEx != ex) {
543 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
544 }
545 server.checkFileSystem();
546 } catch (Exception ex) {
547 LOG.error("Compaction failed " + this, ex);
548 server.checkFileSystem();
549 } finally {
550 LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
551 }
552 this.compaction.getRequest().afterExecute();
553 }
554
555 @Override
556 public void run() {
557 Preconditions.checkNotNull(server);
558 if (server.isStopped()
559 || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
560 return;
561 }
562 doCompaction(user);
563 }
564
565 private String formatStackTrace(Exception ex) {
566 StringWriter sw = new StringWriter();
567 PrintWriter pw = new PrintWriter(sw);
568 ex.printStackTrace(pw);
569 pw.flush();
570 return sw.toString();
571 }
572
573 @Override
574 public int compareTo(CompactionRunner o) {
575
576 int compareVal = queuedPriority - o.queuedPriority;
577 if (compareVal != 0) return compareVal;
578 CompactionContext tc = this.compaction, oc = o.compaction;
579
580 return (tc == null) ? ((oc == null) ? 0 : 1)
581 : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
582 }
583 }
584
585
586
587
588 private static class Rejection implements RejectedExecutionHandler {
589 @Override
590 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
591 if (runnable instanceof CompactionRunner) {
592 CompactionRunner runner = (CompactionRunner)runnable;
593 LOG.debug("Compaction Rejected: " + runner);
594 runner.store.cancelRequestedCompaction(runner.compaction);
595 }
596 }
597 }
598
599
600
601
602 @Override
603 public void onConfigurationChange(Configuration newConf) {
604
605
606
607
608
609 int largeThreads = Math.max(1, newConf.getInt(
610 LARGE_COMPACTION_THREADS,
611 LARGE_COMPACTION_THREADS_DEFAULT));
612 if (this.longCompactions.getCorePoolSize() != largeThreads) {
613 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
614 " from " + this.longCompactions.getCorePoolSize() + " to " +
615 largeThreads);
616 if(this.longCompactions.getCorePoolSize() < largeThreads) {
617 this.longCompactions.setMaximumPoolSize(largeThreads);
618 this.longCompactions.setCorePoolSize(largeThreads);
619 } else {
620 this.longCompactions.setCorePoolSize(largeThreads);
621 this.longCompactions.setMaximumPoolSize(largeThreads);
622 }
623 }
624
625 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
626 SMALL_COMPACTION_THREADS_DEFAULT);
627 if (this.shortCompactions.getCorePoolSize() != smallThreads) {
628 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
629 " from " + this.shortCompactions.getCorePoolSize() + " to " +
630 smallThreads);
631 if(this.shortCompactions.getCorePoolSize() < smallThreads) {
632 this.shortCompactions.setMaximumPoolSize(smallThreads);
633 this.shortCompactions.setCorePoolSize(smallThreads);
634 } else {
635 this.shortCompactions.setCorePoolSize(smallThreads);
636 this.shortCompactions.setMaximumPoolSize(smallThreads);
637 }
638 }
639
640 int splitThreads = newConf.getInt(SPLIT_THREADS,
641 SPLIT_THREADS_DEFAULT);
642 if (this.splits.getCorePoolSize() != splitThreads) {
643 LOG.info("Changing the value of " + SPLIT_THREADS +
644 " from " + this.splits.getCorePoolSize() + " to " +
645 splitThreads);
646 if(this.splits.getCorePoolSize() < splitThreads) {
647 this.splits.setMaximumPoolSize(splitThreads);
648 this.splits.setCorePoolSize(splitThreads);
649 } else {
650 this.splits.setCorePoolSize(splitThreads);
651 this.splits.setMaximumPoolSize(splitThreads);
652 }
653 }
654
655 int mergeThreads = newConf.getInt(MERGE_THREADS,
656 MERGE_THREADS_DEFAULT);
657 if (this.mergePool.getCorePoolSize() != mergeThreads) {
658 LOG.info("Changing the value of " + MERGE_THREADS +
659 " from " + this.mergePool.getCorePoolSize() + " to " +
660 mergeThreads);
661 if(this.mergePool.getCorePoolSize() < mergeThreads) {
662 this.mergePool.setMaximumPoolSize(mergeThreads);
663 this.mergePool.setCorePoolSize(mergeThreads);
664 } else {
665 this.mergePool.setCorePoolSize(mergeThreads);
666 this.mergePool.setMaximumPoolSize(mergeThreads);
667 }
668 }
669
670 CompactionThroughputController old = this.compactionThroughputController;
671 if (old != null) {
672 old.stop("configuration change");
673 }
674 this.compactionThroughputController =
675 CompactionThroughputControllerFactory.create(server, newConf);
676
677
678
679 this.conf.reloadConfiguration();
680 }
681
682 protected int getSmallCompactionThreadNum() {
683 return this.shortCompactions.getCorePoolSize();
684 }
685
686 protected int getLargeCompactionThreadNum() {
687 return this.longCompactions.getCorePoolSize();
688 }
689
690 protected int getSplitThreadNum() {
691 return this.splits.getCorePoolSize();
692 }
693
694 protected int getMergeThreadNum() {
695 return this.mergePool.getCorePoolSize();
696 }
697
698
699
700
701 @Override
702 public void registerChildren(ConfigurationManager manager) {
703
704 }
705
706
707
708
709 @Override
710 public void deregisterChildren(ConfigurationManager manager) {
711
712 }
713
714 @VisibleForTesting
715 public CompactionThroughputController getCompactionThroughputController() {
716 return compactionThroughputController;
717 }
718
719 @VisibleForTesting
720
721
722
723
724
725 void shutdownLongCompactions(){
726 this.longCompactions.shutdown();
727 }
728 }