1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.compactions;
20
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
34 import org.apache.hadoop.hbase.regionserver.StoreFile;
35 import org.apache.hadoop.hbase.regionserver.StoreUtils;
36 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
37 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
38 import org.apache.hadoop.hbase.security.User;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.ConcatenatedLists;
41 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
42 import org.apache.hadoop.hbase.util.Pair;
43
44 import com.google.common.collect.ImmutableList;
45
46
47
48
49 @InterfaceAudience.Private
50 public class StripeCompactionPolicy extends CompactionPolicy {
51 private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
52
53 private ExploringCompactionPolicy stripePolicy = null;
54
55 private StripeStoreConfig config;
56
57 public StripeCompactionPolicy(
58 Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
59 super(conf, storeConfigInfo);
60 this.config = config;
61 stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
62 }
63
64 public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
65 List<StoreFile> filesCompacting) {
66
67
68
69 ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
70 candidateFiles.removeAll(filesCompacting);
71 return candidateFiles;
72 }
73
74 public StripeCompactionRequest createEmptyRequest(
75 StripeInformationProvider si, CompactionRequest request) {
76
77 if (si.getStripeCount() > 0) {
78 return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
79 }
80 Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
81 request.getFiles(), this.config.getInitialCount());
82 return new SplitStripeCompactionRequest(
83 request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
84 }
85
86 public StripeStoreFlusher.StripeFlushRequest selectFlush(
87 StripeInformationProvider si, int kvCount) {
88 if (this.config.isUsingL0Flush()) {
89 return new StripeStoreFlusher.StripeFlushRequest();
90 }
91 if (si.getStripeCount() == 0) {
92
93 int initialCount = this.config.getInitialCount();
94 return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
95 }
96
97 return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
98 }
99
100 public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
101 List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
102
103
104 if (!filesCompacting.isEmpty()) {
105 LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
106 return null;
107 }
108
109
110
111
112
113
114
115 Collection<StoreFile> allFiles = si.getStorefiles();
116 if (StoreUtils.hasReferences(allFiles)) {
117 LOG.debug("There are references in the store; compacting all files");
118 long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
119 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
120 allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
121 request.setMajorRangeFull();
122 return request;
123 }
124
125 int stripeCount = si.getStripeCount();
126 List<StoreFile> l0Files = si.getLevel0Files();
127
128
129 boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
130 if (stripeCount == 0) {
131 if (!shouldCompactL0) return null;
132 return selectNewStripesCompaction(si);
133 }
134
135 boolean canDropDeletesNoL0 = l0Files.size() == 0;
136 if (shouldCompactL0) {
137 if (!canDropDeletesNoL0) {
138
139 StripeCompactionRequest result = selectSingleStripeCompaction(
140 si, true, canDropDeletesNoL0, isOffpeak);
141 if (result != null) return result;
142 }
143 LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
144 return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
145 }
146
147
148 StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
149 if (result != null) return result;
150
151
152
153 return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
154 }
155
156 public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
157
158 return filesCompacting.isEmpty()
159 && (StoreUtils.hasReferences(si.getStorefiles())
160 || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
161 || needsSingleStripeCompaction(si));
162 }
163
164 @Override
165 public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
166 return false;
167 }
168
169 @Override
170 public boolean throttleCompaction(long compactionSize) {
171 return compactionSize > comConf.getThrottlePoint();
172 }
173
174
175
176
177
178 protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
179 int minFiles = this.config.getStripeCompactMinFiles();
180 for (List<StoreFile> stripe : si.getStripes()) {
181 if (stripe.size() >= minFiles) return true;
182 }
183 return false;
184 }
185
186 protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
187 boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
188 ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
189
190 int bqIndex = -1;
191 List<StoreFile> bqSelection = null;
192 int stripeCount = stripes.size();
193 long bqTotalSize = -1;
194 for (int i = 0; i < stripeCount; ++i) {
195
196
197 List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
198 !canDropDeletesWithoutL0 && includeL0, isOffpeak);
199 if (selection.isEmpty()) continue;
200 long size = 0;
201 for (StoreFile sf : selection) {
202 size += sf.getReader().length();
203 }
204 if (bqSelection == null || selection.size() > bqSelection.size() ||
205 (selection.size() == bqSelection.size() && size < bqTotalSize)) {
206 bqSelection = selection;
207 bqIndex = i;
208 bqTotalSize = size;
209 }
210 }
211 if (bqSelection == null) {
212 LOG.debug("No good compaction is possible in any stripe");
213 return null;
214 }
215 List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
216
217 int targetCount = 1;
218 long targetKvs = Long.MAX_VALUE;
219 boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
220 String splitString = "";
221 if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
222 if (includeL0) {
223
224
225 return null;
226 }
227 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
228 targetKvs = kvsAndCount.getFirst();
229 targetCount = kvsAndCount.getSecond();
230 splitString = "; the stripe will be split into at most "
231 + targetCount + " stripes with " + targetKvs + " target KVs";
232 }
233
234 LOG.debug("Found compaction in a stripe with end key ["
235 + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
236 + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
237
238
239 StripeCompactionRequest req;
240 if (includeL0) {
241 assert hasAllFiles;
242 List<StoreFile> l0Files = si.getLevel0Files();
243 LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
244 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
245 sfs.addSublist(filesToCompact);
246 sfs.addSublist(l0Files);
247 req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
248 } else {
249 req = new SplitStripeCompactionRequest(
250 filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
251 }
252 if (hasAllFiles && (canDropDeletesWithoutL0 || includeL0)) {
253 req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
254 }
255 req.getRequest().setOffPeak(isOffpeak);
256 return req;
257 }
258
259
260
261
262
263
264
265 private List<StoreFile> selectSimpleCompaction(
266 List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
267 int minFilesLocal = Math.max(
268 allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
269 int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
270 return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal);
271 }
272
273
274
275
276
277
278
279
280 private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
281 int targetStripeCount, long targetSize) {
282 Collection<StoreFile> allFiles = si.getStorefiles();
283 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
284 allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
285 request.setMajorRangeFull();
286 LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
287 return request;
288 }
289
290 private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
291 List<StoreFile> l0Files = si.getLevel0Files();
292 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
293 LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
294 + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
295 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
296 si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
297 request.setMajorRangeFull();
298 return request;
299 }
300
301 private StripeCompactionRequest selectExpiredMergeCompaction(
302 StripeInformationProvider si, boolean canDropDeletesNoL0) {
303 long cfTtl = this.storeConfigInfo.getStoreFileTtl();
304 if (cfTtl == Long.MAX_VALUE) {
305 return null;
306 }
307 long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl;
308
309 int start = -1, bestStart = -1, length = 0, bestLength = 0;
310 ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
311 OUTER: for (int i = 0; i < stripes.size(); ++i) {
312 for (StoreFile storeFile : stripes.get(i)) {
313 if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
314
315 if (length > bestLength) {
316 bestStart = start;
317 bestLength = length;
318 }
319 start = -1;
320 length = 0;
321 continue OUTER;
322 }
323 if (start == -1) {
324 start = i;
325 }
326 ++length;
327 }
328 if (length > bestLength) {
329 bestStart = start;
330 bestLength = length;
331 }
332 if (bestLength == 0) return null;
333 if (bestLength == 1) {
334
335
336
337
338 if (bestStart == (stripes.size() - 1)) return null;
339 ++bestLength;
340 }
341 LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
342 int endIndex = bestStart + bestLength - 1;
343 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
344 sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
345 SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
346 si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
347 if (canDropDeletesNoL0) {
348 result.setMajorRangeFull();
349 }
350 return result;
351 }
352
353 private static long getTotalKvCount(final Collection<StoreFile> candidates) {
354 long totalSize = 0;
355 for (StoreFile storeFile : candidates) {
356 totalSize += storeFile.getReader().getEntries();
357 }
358 return totalSize;
359 }
360
361 public static long getTotalFileSize(final Collection<StoreFile> candidates) {
362 long totalSize = 0;
363 for (StoreFile storeFile : candidates) {
364 totalSize += storeFile.getReader().length();
365 }
366 return totalSize;
367 }
368
369 private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
370
371
372
373
374 long totalSize = getTotalFileSize(files);
375 long targetPartSize = config.getSplitPartSize();
376 assert targetPartSize > 0 && splitCount > 0;
377 double ratio = totalSize / (splitCount * targetPartSize);
378 while (ratio > 1.0) {
379
380 double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
381 if ((1.0 / newRatio) >= ratio) break;
382 ratio = newRatio;
383 splitCount += 1.0;
384 }
385 long kvCount = (long)(getTotalKvCount(files) / splitCount);
386 return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
387 }
388
389
390 public abstract static class StripeCompactionRequest {
391 protected CompactionRequest request;
392 protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
393
394 public List<Path> execute(StripeCompactor compactor,
395 CompactionThroughputController throughputController) throws IOException {
396 return execute(compactor, throughputController, null);
397 }
398
399
400
401
402
403
404 public abstract List<Path> execute(StripeCompactor compactor,
405 CompactionThroughputController throughputController, User user) throws IOException;
406
407 public StripeCompactionRequest(CompactionRequest request) {
408 this.request = request;
409 }
410
411
412
413
414
415
416
417 public void setMajorRange(byte[] startRow, byte[] endRow) {
418 this.majorRangeFromRow = startRow;
419 this.majorRangeToRow = endRow;
420 }
421
422 public CompactionRequest getRequest() {
423 return this.request;
424 }
425
426 public void setRequest(CompactionRequest request) {
427 assert request != null;
428 this.request = request;
429 this.majorRangeFromRow = this.majorRangeToRow = null;
430 }
431 }
432
433
434
435
436
437 private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
438 private final List<byte[]> targetBoundaries;
439
440
441
442
443
444 public BoundaryStripeCompactionRequest(CompactionRequest request,
445 List<byte[]> targetBoundaries) {
446 super(request);
447 this.targetBoundaries = targetBoundaries;
448 }
449
450 public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
451 List<byte[]> targetBoundaries) {
452 this(new CompactionRequest(files), targetBoundaries);
453 }
454
455 @Override
456 public List<Path> execute(StripeCompactor compactor,
457 CompactionThroughputController throughputController, User user) throws IOException {
458 return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
459 this.majorRangeToRow, throughputController, user);
460 }
461 }
462
463
464
465
466
467
468
469 private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
470 private final byte[] startRow, endRow;
471 private final int targetCount;
472 private final long targetKvs;
473
474
475
476
477
478
479
480
481
482 public SplitStripeCompactionRequest(CompactionRequest request,
483 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
484 super(request);
485 this.startRow = startRow;
486 this.endRow = endRow;
487 this.targetCount = targetCount;
488 this.targetKvs = targetKvs;
489 }
490
491 public SplitStripeCompactionRequest(
492 CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
493 this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
494 }
495
496 public SplitStripeCompactionRequest(
497 Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
498 this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
499 }
500
501 public SplitStripeCompactionRequest(Collection<StoreFile> files,
502 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
503 this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
504 }
505
506 @Override
507 public List<Path> execute(StripeCompactor compactor,
508 CompactionThroughputController throughputController, User user) throws IOException {
509 return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
510 this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
511 }
512
513
514
515 public void setMajorRangeFull() {
516 setMajorRange(this.startRow, this.endRow);
517 }
518 }
519
520
521 public static interface StripeInformationProvider {
522 public Collection<StoreFile> getStorefiles();
523
524
525
526
527
528
529 public byte[] getStartRow(int stripeIndex);
530
531
532
533
534
535
536 public byte[] getEndRow(int stripeIndex);
537
538
539
540
541 public List<StoreFile> getLevel0Files();
542
543
544
545
546 public List<byte[]> getStripeBoundaries();
547
548
549
550
551 public ArrayList<ImmutableList<StoreFile>> getStripes();
552
553
554
555
556 public int getStripeCount();
557 }
558 }