1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.PriorityBlockingQueue;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.RejectedExecutionHandler;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.RemoteExceptionHandler;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.conf.ConfigurationManager;
42 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
43 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
44 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
45 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
46 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
47 import org.apache.hadoop.hbase.security.User;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49 import org.apache.hadoop.hbase.util.Pair;
50 import org.apache.hadoop.util.StringUtils;
51
52 import com.google.common.annotations.VisibleForTesting;
53 import com.google.common.base.Preconditions;
54
55
56
57
58 @InterfaceAudience.Private
59 public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
60 static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
61
62
63 public final static String LARGE_COMPACTION_THREADS =
64 "hbase.regionserver.thread.compaction.large";
65 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
66
67
68 public final static String SMALL_COMPACTION_THREADS =
69 "hbase.regionserver.thread.compaction.small";
70 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
71
72
73 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
74 public final static int SPLIT_THREADS_DEFAULT = 1;
75
76
77 public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
78 public final static int MERGE_THREADS_DEFAULT = 1;
79
80 public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
81 "hbase.regionserver.regionSplitLimit";
82 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
83
84 private final HRegionServer server;
85 private final Configuration conf;
86
87 private final ThreadPoolExecutor longCompactions;
88 private final ThreadPoolExecutor shortCompactions;
89 private final ThreadPoolExecutor splits;
90 private final ThreadPoolExecutor mergePool;
91
92 private volatile CompactionThroughputController compactionThroughputController;
93
94
95
96
97
98
99 private int regionSplitLimit;
100
101
102 CompactSplitThread(HRegionServer server) {
103 super();
104 this.server = server;
105 this.conf = server.getConfiguration();
106 this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
107 DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
108
109 int largeThreads = Math.max(1, conf.getInt(
110 LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
111 int smallThreads = conf.getInt(
112 SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
113
114 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
115
116
117 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
118
119 final String n = Thread.currentThread().getName();
120
121 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
122 60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
123 new ThreadFactory() {
124 @Override
125 public Thread newThread(Runnable r) {
126 Thread t = new Thread(r);
127 t.setName(n + "-longCompactions-" + System.currentTimeMillis());
128 return t;
129 }
130 });
131 this.longCompactions.setRejectedExecutionHandler(new Rejection());
132 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
133 60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
134 new ThreadFactory() {
135 @Override
136 public Thread newThread(Runnable r) {
137 Thread t = new Thread(r);
138 t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
139 return t;
140 }
141 });
142 this.shortCompactions
143 .setRejectedExecutionHandler(new Rejection());
144 this.splits = (ThreadPoolExecutor)
145 Executors.newFixedThreadPool(splitThreads,
146 new ThreadFactory() {
147 @Override
148 public Thread newThread(Runnable r) {
149 Thread t = new Thread(r);
150 t.setName(n + "-splits-" + System.currentTimeMillis());
151 return t;
152 }
153 });
154 int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
155 this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
156 mergeThreads, new ThreadFactory() {
157 @Override
158 public Thread newThread(Runnable r) {
159 Thread t = new Thread(r);
160 t.setName(n + "-merges-" + System.currentTimeMillis());
161 return t;
162 }
163 });
164
165
166 this.compactionThroughputController =
167 CompactionThroughputControllerFactory.create(server, conf);
168 }
169
170 @Override
171 public String toString() {
172 return "compaction_queue=("
173 + longCompactions.getQueue().size() + ":"
174 + shortCompactions.getQueue().size() + ")"
175 + ", split_queue=" + splits.getQueue().size()
176 + ", merge_queue=" + mergePool.getQueue().size();
177 }
178
179 public String dumpQueue() {
180 StringBuffer queueLists = new StringBuffer();
181 queueLists.append("Compaction/Split Queue dump:\n");
182 queueLists.append(" LargeCompation Queue:\n");
183 BlockingQueue<Runnable> lq = longCompactions.getQueue();
184 Iterator<Runnable> it = lq.iterator();
185 while (it.hasNext()) {
186 queueLists.append(" " + it.next().toString());
187 queueLists.append("\n");
188 }
189
190 if (shortCompactions != null) {
191 queueLists.append("\n");
192 queueLists.append(" SmallCompation Queue:\n");
193 lq = shortCompactions.getQueue();
194 it = lq.iterator();
195 while (it.hasNext()) {
196 queueLists.append(" " + it.next().toString());
197 queueLists.append("\n");
198 }
199 }
200
201 queueLists.append("\n");
202 queueLists.append(" Split Queue:\n");
203 lq = splits.getQueue();
204 it = lq.iterator();
205 while (it.hasNext()) {
206 queueLists.append(" " + it.next().toString());
207 queueLists.append("\n");
208 }
209
210 queueLists.append("\n");
211 queueLists.append(" Region Merge Queue:\n");
212 lq = mergePool.getQueue();
213 it = lq.iterator();
214 while (it.hasNext()) {
215 queueLists.append(" " + it.next().toString());
216 queueLists.append("\n");
217 }
218
219 return queueLists.toString();
220 }
221
222 public synchronized void requestRegionsMerge(final Region a,
223 final Region b, final boolean forcible, long masterSystemTime, User user) {
224 try {
225 mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
228 + forcible + ". " + this);
229 }
230 } catch (RejectedExecutionException ree) {
231 LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
232 + forcible, ree);
233 }
234 }
235
236 public synchronized boolean requestSplit(final Region r) {
237
238 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
239 byte[] midKey = ((HRegion)r).checkSplit();
240 if (midKey != null) {
241 requestSplit(r, midKey);
242 return true;
243 }
244 }
245 return false;
246 }
247
248 public synchronized void requestSplit(final Region r, byte[] midKey) {
249 requestSplit(r, midKey, null);
250 }
251
252
253
254
255 public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
256 if (midKey == null) {
257 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
258 " not splittable because midkey=null");
259 if (((HRegion)r).shouldForceSplit()) {
260 ((HRegion)r).clearSplit();
261 }
262 return;
263 }
264 try {
265 this.splits.execute(new SplitRequest(r, midKey, this.server, user));
266 if (LOG.isDebugEnabled()) {
267 LOG.debug("Split requested for " + r + ". " + this);
268 }
269 } catch (RejectedExecutionException ree) {
270 LOG.info("Could not execute split for " + r, ree);
271 }
272 }
273
274 @Override
275 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
276 throws IOException {
277 return requestCompaction(r, why, null);
278 }
279
280 @Override
281 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
282 List<Pair<CompactionRequest, Store>> requests) throws IOException {
283 return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
284 }
285
286 @Override
287 public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
288 final String why, CompactionRequest request) throws IOException {
289 return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
290 }
291
292 @Override
293 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
294 int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
295 return requestCompactionInternal(r, why, p, requests, true, user);
296 }
297
298 private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
299 int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
300 throws IOException {
301
302 List<CompactionRequest> ret = null;
303 if (requests == null) {
304 ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
305 for (Store s : r.getStores()) {
306 CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
307 if (selectNow) ret.add(cr);
308 }
309 } else {
310 Preconditions.checkArgument(selectNow);
311 ret = new ArrayList<CompactionRequest>(requests.size());
312 for (Pair<CompactionRequest, Store> pair : requests) {
313 ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
314 }
315 }
316 return ret;
317 }
318
319 public CompactionRequest requestCompaction(final Region r, final Store s,
320 final String why, int priority, CompactionRequest request, User user) throws IOException {
321 return requestCompactionInternal(r, s, why, priority, request, true, user);
322 }
323
324 public synchronized void requestSystemCompaction(
325 final Region r, final String why) throws IOException {
326 requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
327 }
328
329 public void requestSystemCompaction(
330 final Region r, final Store s, final String why) throws IOException {
331 requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
332 }
333
334
335
336
337
338
339
340
341
342 private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
343 final String why, int priority, CompactionRequest request, boolean selectNow, User user)
344 throws IOException {
345 if (this.server.isStopped()
346 || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
347 return null;
348 }
349
350 CompactionContext compaction = null;
351 if (selectNow) {
352 compaction = selectCompaction(r, s, priority, request, user);
353 if (compaction == null) return null;
354 }
355
356
357
358 ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
359 ? longCompactions : shortCompactions;
360 pool.execute(new CompactionRunner(s, r, compaction, pool, user));
361 if (LOG.isDebugEnabled()) {
362 String type = (pool == shortCompactions) ? "Small " : "Large ";
363 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
364 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
365 }
366 return selectNow ? compaction.getRequest() : null;
367 }
368
369 private CompactionContext selectCompaction(final Region r, final Store s,
370 int priority, CompactionRequest request, User user) throws IOException {
371 CompactionContext compaction = s.requestCompaction(priority, request, user);
372 if (compaction == null) {
373 if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
374 LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
375 " because compaction request was cancelled");
376 }
377 return null;
378 }
379 assert compaction.hasSelection();
380 if (priority != Store.NO_PRIORITY) {
381 compaction.getRequest().setPriority(priority);
382 }
383 return compaction;
384 }
385
386
387
388
389 void interruptIfNecessary() {
390 splits.shutdown();
391 mergePool.shutdown();
392 longCompactions.shutdown();
393 shortCompactions.shutdown();
394 }
395
396 private void waitFor(ThreadPoolExecutor t, String name) {
397 boolean done = false;
398 while (!done) {
399 try {
400 done = t.awaitTermination(60, TimeUnit.SECONDS);
401 LOG.info("Waiting for " + name + " to finish...");
402 if (!done) {
403 t.shutdownNow();
404 }
405 } catch (InterruptedException ie) {
406 LOG.warn("Interrupted waiting for " + name + " to finish...");
407 }
408 }
409 }
410
411 void join() {
412 waitFor(splits, "Split Thread");
413 waitFor(mergePool, "Merge Thread");
414 waitFor(longCompactions, "Large Compaction Thread");
415 waitFor(shortCompactions, "Small Compaction Thread");
416 }
417
418
419
420
421
422
423
424 public int getCompactionQueueSize() {
425 return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
426 }
427
428 public int getLargeCompactionQueueSize() {
429 return longCompactions.getQueue().size();
430 }
431
432
433 public int getSmallCompactionQueueSize() {
434 return shortCompactions.getQueue().size();
435 }
436
437 public int getSplitQueueSize() {
438 return splits.getQueue().size();
439 }
440
441 private boolean shouldSplitRegion() {
442 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
443 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
444 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
445 }
446 return (regionSplitLimit > server.getNumberOfOnlineRegions());
447 }
448
449
450
451
452 public int getRegionSplitLimit() {
453 return this.regionSplitLimit;
454 }
455
456 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
457 justification="Contrived use of compareTo")
458 private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
459 private final Store store;
460 private final HRegion region;
461 private CompactionContext compaction;
462 private int queuedPriority;
463 private ThreadPoolExecutor parent;
464 private User user;
465
466 public CompactionRunner(Store store, Region region,
467 CompactionContext compaction, ThreadPoolExecutor parent, User user) {
468 super();
469 this.store = store;
470 this.region = (HRegion)region;
471 this.compaction = compaction;
472 this.queuedPriority = (this.compaction == null)
473 ? store.getCompactPriority() : compaction.getRequest().getPriority();
474 this.parent = parent;
475 this.user = user;
476 }
477
478 @Override
479 public String toString() {
480 return (this.compaction != null) ? ("Request = " + compaction.getRequest())
481 : ("Store = " + store.toString() + ", pri = " + queuedPriority);
482 }
483
484 private void doCompaction(User user) {
485
486 if (this.compaction == null) {
487 int oldPriority = this.queuedPriority;
488 this.queuedPriority = this.store.getCompactPriority();
489 if (this.queuedPriority > oldPriority) {
490
491
492 this.parent.execute(this);
493 return;
494 }
495 try {
496 this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
497 } catch (IOException ex) {
498 LOG.error("Compaction selection failed " + this, ex);
499 server.checkFileSystem();
500 return;
501 }
502 if (this.compaction == null) return;
503
504
505 assert this.compaction.hasSelection();
506 ThreadPoolExecutor pool = store.throttleCompaction(
507 compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
508 if (this.parent != pool) {
509 this.store.cancelRequestedCompaction(this.compaction);
510 this.compaction = null;
511 this.parent = pool;
512 this.parent.execute(this);
513 return;
514 }
515 }
516
517 assert this.compaction != null;
518
519 this.compaction.getRequest().beforeExecute();
520 try {
521
522
523 long start = EnvironmentEdgeManager.currentTime();
524 boolean completed =
525 region.compact(compaction, store, compactionThroughputController, user);
526 long now = EnvironmentEdgeManager.currentTime();
527 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
528 this + "; duration=" + StringUtils.formatTimeDiff(now, start));
529 if (completed) {
530
531 if (store.getCompactPriority() <= 0) {
532 requestSystemCompaction(region, store, "Recursive enqueue");
533 } else {
534
535 requestSplit(region);
536 }
537 }
538 } catch (IOException ex) {
539 IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
540 LOG.error("Compaction failed " + this, remoteEx);
541 if (remoteEx != ex) {
542 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
543 }
544 server.checkFileSystem();
545 } catch (Exception ex) {
546 LOG.error("Compaction failed " + this, ex);
547 server.checkFileSystem();
548 } finally {
549 LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
550 }
551 this.compaction.getRequest().afterExecute();
552 }
553
554 @Override
555 public void run() {
556 Preconditions.checkNotNull(server);
557 if (server.isStopped()
558 || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
559 return;
560 }
561 doCompaction(user);
562 }
563
564 private String formatStackTrace(Exception ex) {
565 StringWriter sw = new StringWriter();
566 PrintWriter pw = new PrintWriter(sw);
567 ex.printStackTrace(pw);
568 pw.flush();
569 return sw.toString();
570 }
571
572 @Override
573 public int compareTo(CompactionRunner o) {
574
575 int compareVal = queuedPriority - o.queuedPriority;
576 if (compareVal != 0) return compareVal;
577 CompactionContext tc = this.compaction, oc = o.compaction;
578
579 return (tc == null) ? ((oc == null) ? 0 : 1)
580 : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
581 }
582 }
583
584
585
586
587 private static class Rejection implements RejectedExecutionHandler {
588 @Override
589 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
590 if (runnable instanceof CompactionRunner) {
591 CompactionRunner runner = (CompactionRunner)runnable;
592 LOG.debug("Compaction Rejected: " + runner);
593 runner.store.cancelRequestedCompaction(runner.compaction);
594 }
595 }
596 }
597
598
599
600
601 @Override
602 public void onConfigurationChange(Configuration newConf) {
603
604
605
606
607
608 int largeThreads = Math.max(1, newConf.getInt(
609 LARGE_COMPACTION_THREADS,
610 LARGE_COMPACTION_THREADS_DEFAULT));
611 if (this.longCompactions.getCorePoolSize() != largeThreads) {
612 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
613 " from " + this.longCompactions.getCorePoolSize() + " to " +
614 largeThreads);
615 if(this.longCompactions.getCorePoolSize() < largeThreads) {
616 this.longCompactions.setMaximumPoolSize(largeThreads);
617 this.longCompactions.setCorePoolSize(largeThreads);
618 } else {
619 this.longCompactions.setCorePoolSize(largeThreads);
620 this.longCompactions.setMaximumPoolSize(largeThreads);
621 }
622 }
623
624 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
625 SMALL_COMPACTION_THREADS_DEFAULT);
626 if (this.shortCompactions.getCorePoolSize() != smallThreads) {
627 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
628 " from " + this.shortCompactions.getCorePoolSize() + " to " +
629 smallThreads);
630 if(this.shortCompactions.getCorePoolSize() < smallThreads) {
631 this.shortCompactions.setMaximumPoolSize(smallThreads);
632 this.shortCompactions.setCorePoolSize(smallThreads);
633 } else {
634 this.shortCompactions.setCorePoolSize(smallThreads);
635 this.shortCompactions.setMaximumPoolSize(smallThreads);
636 }
637 }
638
639 int splitThreads = newConf.getInt(SPLIT_THREADS,
640 SPLIT_THREADS_DEFAULT);
641 if (this.splits.getCorePoolSize() != splitThreads) {
642 LOG.info("Changing the value of " + SPLIT_THREADS +
643 " from " + this.splits.getCorePoolSize() + " to " +
644 splitThreads);
645 if(this.splits.getCorePoolSize() < splitThreads) {
646 this.splits.setMaximumPoolSize(splitThreads);
647 this.splits.setCorePoolSize(splitThreads);
648 } else {
649 this.splits.setCorePoolSize(splitThreads);
650 this.splits.setMaximumPoolSize(splitThreads);
651 }
652 }
653
654 int mergeThreads = newConf.getInt(MERGE_THREADS,
655 MERGE_THREADS_DEFAULT);
656 if (this.mergePool.getCorePoolSize() != mergeThreads) {
657 LOG.info("Changing the value of " + MERGE_THREADS +
658 " from " + this.mergePool.getCorePoolSize() + " to " +
659 mergeThreads);
660 if(this.mergePool.getCorePoolSize() < mergeThreads) {
661 this.mergePool.setMaximumPoolSize(mergeThreads);
662 this.mergePool.setCorePoolSize(mergeThreads);
663 } else {
664 this.mergePool.setCorePoolSize(mergeThreads);
665 this.mergePool.setMaximumPoolSize(mergeThreads);
666 }
667 }
668
669 CompactionThroughputController old = this.compactionThroughputController;
670 if (old != null) {
671 old.stop("configuration change");
672 }
673 this.compactionThroughputController =
674 CompactionThroughputControllerFactory.create(server, newConf);
675
676
677
678 this.conf.reloadConfiguration();
679 }
680
681 protected int getSmallCompactionThreadNum() {
682 return this.shortCompactions.getCorePoolSize();
683 }
684
685 protected int getLargeCompactionThreadNum() {
686 return this.longCompactions.getCorePoolSize();
687 }
688
689 protected int getSplitThreadNum() {
690 return this.splits.getCorePoolSize();
691 }
692
693 protected int getMergeThreadNum() {
694 return this.mergePool.getCorePoolSize();
695 }
696
697
698
699
700 @Override
701 public void registerChildren(ConfigurationManager manager) {
702
703 }
704
705
706
707
708 @Override
709 public void deregisterChildren(ConfigurationManager manager) {
710
711 }
712
713 @VisibleForTesting
714 public CompactionThroughputController getCompactionThroughputController() {
715 return compactionThroughputController;
716 }
717
718 }