1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.TreeMap;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.KeyValue.KVComparator;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.ConcatenatedLists;
43 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
44
45 import com.google.common.collect.ImmutableCollection;
46 import com.google.common.collect.ImmutableList;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 public class StripeStoreFileManager
66 implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
67 static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
68
69
70
71
72 public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
73 public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
74
75 private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
76
77
78
79
80 public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
81 final static byte[] INVALID_KEY = null;
82
83
84
85
86
87 private static class State {
88
89
90
91
92
93 public byte[][] stripeEndRows = new byte[0][];
94
95
96
97
98
99
100 public ArrayList<ImmutableList<StoreFile>> stripeFiles
101 = new ArrayList<ImmutableList<StoreFile>>();
102
103 public ImmutableList<StoreFile> level0Files = ImmutableList.<StoreFile>of();
104
105
106 public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
107 }
108 private State state = null;
109
110
111 private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
112 private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
113
114
115
116 private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
117
118 private final KVComparator kvComparator;
119 private StripeStoreConfig config;
120
121 private final int blockingFileCount;
122
123 public StripeStoreFileManager(
124 KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
125 this.kvComparator = kvComparator;
126 this.config = config;
127 this.blockingFileCount = conf.getInt(
128 HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
129 }
130
131 @Override
132 public void loadFiles(List<StoreFile> storeFiles) {
133 loadUnclassifiedStoreFiles(storeFiles);
134 }
135
136 @Override
137 public Collection<StoreFile> getStorefiles() {
138 return state.allFilesCached;
139 }
140
141 @Override
142 public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
143 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
144 cmc.mergeResults(null, sfs);
145 debugDumpState("Added new files");
146 }
147
148 @Override
149 public ImmutableCollection<StoreFile> clearFiles() {
150 ImmutableCollection<StoreFile> result = state.allFilesCached;
151 this.state = new State();
152 this.fileStarts.clear();
153 this.fileEnds.clear();
154 return result;
155 }
156
157 @Override
158 public int getStorefileCount() {
159 return state.allFilesCached.size();
160 }
161
162
163
164 @Override
165 public Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
166 KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
167
168 result.addSublist(state.level0Files);
169 if (!state.stripeFiles.isEmpty()) {
170 int lastStripeIndex = findStripeForRow(targetKey.getRow(), false);
171 for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
172 result.addSublist(state.stripeFiles.get(stripeIndex));
173 }
174 }
175 return result.iterator();
176 }
177
178
179
180
181 @Override
182 public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
183 Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) {
184 KeyBeforeConcatenatedLists.Iterator original =
185 (KeyBeforeConcatenatedLists.Iterator)candidateFiles;
186 assert original != null;
187 ArrayList<List<StoreFile>> components = original.getComponents();
188 for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
189 StoreFile sf = components.get(firstIrrelevant).get(0);
190 byte[] endKey = endOf(sf);
191
192
193
194 if (!isInvalid(endKey) && !isOpen(endKey)
195 && (nonOpenRowCompare(endKey, targetKey.getRow()) <= 0)) {
196 original.removeComponents(firstIrrelevant);
197 break;
198 }
199 }
200 return original;
201 }
202
203 @Override
204
205
206
207
208
209
210
211 public byte[] getSplitPoint() throws IOException {
212 if (this.getStorefileCount() == 0) return null;
213 if (state.stripeFiles.size() <= 1) {
214 return getSplitPointFromAllFiles();
215 }
216 int leftIndex = -1, rightIndex = state.stripeFiles.size();
217 long leftSize = 0, rightSize = 0;
218 long lastLeftSize = 0, lastRightSize = 0;
219 while (rightIndex - 1 != leftIndex) {
220 if (leftSize >= rightSize) {
221 --rightIndex;
222 lastRightSize = getStripeFilesSize(rightIndex);
223 rightSize += lastRightSize;
224 } else {
225 ++leftIndex;
226 lastLeftSize = getStripeFilesSize(leftIndex);
227 leftSize += lastLeftSize;
228 }
229 }
230 if (leftSize == 0 || rightSize == 0) {
231 String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
232 + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
233 debugDumpState(errMsg);
234 LOG.warn(errMsg);
235 return getSplitPointFromAllFiles();
236 }
237 double ratio = (double)rightSize / leftSize;
238 if (ratio < 1) {
239 ratio = 1 / ratio;
240 }
241 if (config.getMaxSplitImbalance() > ratio) return state.stripeEndRows[leftIndex];
242
243
244
245
246
247 boolean isRightLarger = rightSize >= leftSize;
248 double newRatio = isRightLarger
249 ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
250 : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
251 if (newRatio < 1) {
252 newRatio = 1 / newRatio;
253 }
254 if (newRatio >= ratio) return state.stripeEndRows[leftIndex];
255 LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
256 + newRatio + " configured ratio " + config.getMaxSplitImbalance());
257
258 return StoreUtils.getLargestFile(state.stripeFiles.get(
259 isRightLarger ? rightIndex : leftIndex)).getFileSplitPoint(this.kvComparator);
260 }
261
262 private byte[] getSplitPointFromAllFiles() throws IOException {
263 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
264 sfs.addSublist(state.level0Files);
265 sfs.addAllSublists(state.stripeFiles);
266 if (sfs.isEmpty()) return null;
267 return StoreUtils.getLargestFile(sfs).getFileSplitPoint(this.kvComparator);
268 }
269
270 private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
271 return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
272 }
273
274 @Override
275 public Collection<StoreFile> getFilesForScanOrGet(
276 boolean isGet, byte[] startRow, byte[] stopRow) {
277 if (state.stripeFiles.isEmpty()) {
278 return state.level0Files;
279 }
280
281 int firstStripe = findStripeForRow(startRow, true);
282 int lastStripe = findStripeForRow(stopRow, false);
283 assert firstStripe <= lastStripe;
284 if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
285 return state.stripeFiles.get(firstStripe);
286 }
287 if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
288 return state.allFilesCached;
289 }
290
291 ConcatenatedLists<StoreFile> result = new ConcatenatedLists<StoreFile>();
292 result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
293 result.addSublist(state.level0Files);
294 return result;
295 }
296
297 @Override
298 public void addCompactionResults(
299 Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException {
300
301 LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
302 + " files replaced by " + results.size());
303
304
305 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
306 cmc.mergeResults(compactedFiles, results);
307 debugDumpState("Merged compaction results");
308 }
309
310 @Override
311 public int getStoreCompactionPriority() {
312
313
314
315 int fc = getStorefileCount();
316 if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
317 return this.blockingFileCount - fc;
318 }
319
320
321
322 int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
323 int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
324 return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
325 }
326
327
328
329
330
331
332 private long getStripeFilesSize(int stripeIndex) {
333 long result = 0;
334 for (StoreFile sf : state.stripeFiles.get(stripeIndex)) {
335 result += sf.getReader().length();
336 }
337 return result;
338 }
339
340
341
342
343
344
345
346
347 private void loadUnclassifiedStoreFiles(List<StoreFile> storeFiles) {
348 LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
349 TreeMap<byte[], ArrayList<StoreFile>> candidateStripes =
350 new TreeMap<byte[], ArrayList<StoreFile>>(MAP_COMPARATOR);
351 ArrayList<StoreFile> level0Files = new ArrayList<StoreFile>();
352
353
354 for (StoreFile sf : storeFiles) {
355 byte[] startRow = startOf(sf), endRow = endOf(sf);
356
357 if (isInvalid(startRow) || isInvalid(endRow)) {
358 insertFileIntoStripe(level0Files, sf);
359 ensureLevel0Metadata(sf);
360 } else if (!isOpen(startRow) && !isOpen(endRow) &&
361 nonOpenRowCompare(startRow, endRow) >= 0) {
362 LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
363 + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
364 insertFileIntoStripe(level0Files, sf);
365 ensureLevel0Metadata(sf);
366 } else {
367 ArrayList<StoreFile> stripe = candidateStripes.get(endRow);
368 if (stripe == null) {
369 stripe = new ArrayList<StoreFile>();
370 candidateStripes.put(endRow, stripe);
371 }
372 insertFileIntoStripe(stripe, sf);
373 }
374 }
375
376
377
378 boolean hasOverlaps = false;
379 byte[] expectedStartRow = null;
380 Iterator<Map.Entry<byte[], ArrayList<StoreFile>>> entryIter =
381 candidateStripes.entrySet().iterator();
382 while (entryIter.hasNext()) {
383 Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
384 ArrayList<StoreFile> files = entry.getValue();
385
386 for (int i = 0; i < files.size(); ++i) {
387 StoreFile sf = files.get(i);
388 byte[] startRow = startOf(sf);
389 if (expectedStartRow == null) {
390 expectedStartRow = startRow;
391 } else if (!rowEquals(expectedStartRow, startRow)) {
392 hasOverlaps = true;
393 LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
394 + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
395 + "], to L0 it goes");
396 StoreFile badSf = files.remove(i);
397 insertFileIntoStripe(level0Files, badSf);
398 ensureLevel0Metadata(badSf);
399 --i;
400 }
401 }
402
403 byte[] endRow = entry.getKey();
404 if (!files.isEmpty()) {
405 expectedStartRow = endRow;
406 } else {
407 entryIter.remove();
408 }
409 }
410
411
412
413
414
415 if (!candidateStripes.isEmpty()) {
416 StoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
417 boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
418 if (!isOpen) {
419 LOG.warn("The range of the loaded files does not cover full key space: from ["
420 + Bytes.toString(startOf(firstFile)) + "], to ["
421 + Bytes.toString(candidateStripes.lastKey()) + "]");
422 if (!hasOverlaps) {
423 ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
424 ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
425 } else {
426 LOG.warn("Inconsistent files, everything goes to L0.");
427 for (ArrayList<StoreFile> files : candidateStripes.values()) {
428 for (StoreFile sf : files) {
429 insertFileIntoStripe(level0Files, sf);
430 ensureLevel0Metadata(sf);
431 }
432 }
433 candidateStripes.clear();
434 }
435 }
436 }
437
438
439 State state = new State();
440 state.level0Files = ImmutableList.copyOf(level0Files);
441 state.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(candidateStripes.size());
442 state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
443 ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(level0Files);
444 int i = candidateStripes.size() - 1;
445 for (Map.Entry<byte[], ArrayList<StoreFile>> entry : candidateStripes.entrySet()) {
446 state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
447 newAllFiles.addAll(entry.getValue());
448 if (i > 0) {
449 state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
450 }
451 --i;
452 }
453 state.allFilesCached = ImmutableList.copyOf(newAllFiles);
454 this.state = state;
455 debugDumpState("Files loaded");
456 }
457
458 private void ensureEdgeStripeMetadata(ArrayList<StoreFile> stripe, boolean isFirst) {
459 HashMap<StoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
460 for (StoreFile sf : stripe) {
461 targetMap.put(sf, OPEN_KEY);
462 }
463 }
464
465 private void ensureLevel0Metadata(StoreFile sf) {
466 if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
467 if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
468 }
469
470 private void debugDumpState(String string) {
471 if (!LOG.isDebugEnabled()) return;
472 StringBuilder sb = new StringBuilder();
473 sb.append("\n" + string + "; current stripe state is as such:");
474 sb.append("\n level 0 with ")
475 .append(state.level0Files.size())
476 .append(
477 " files: "
478 + TraditionalBinaryPrefix.long2String(
479 StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";");
480 for (int i = 0; i < state.stripeFiles.size(); ++i) {
481 String endRow = (i == state.stripeEndRows.length)
482 ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
483 sb.append("\n stripe ending in ")
484 .append(endRow)
485 .append(" with ")
486 .append(state.stripeFiles.get(i).size())
487 .append(
488 " files: "
489 + TraditionalBinaryPrefix.long2String(
490 StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";");
491 }
492 sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
493 sb.append("\n").append(getStorefileCount()).append(" files total.");
494 LOG.debug(sb.toString());
495 }
496
497
498
499
500 private static final boolean isOpen(byte[] key) {
501 return key != null && key.length == 0;
502 }
503
504
505
506
507 private static final boolean isInvalid(byte[] key) {
508 return key == INVALID_KEY;
509 }
510
511
512
513
514 private final boolean rowEquals(byte[] k1, byte[] k2) {
515 return kvComparator.matchingRows(k1, 0, k1.length, k2, 0, k2.length);
516 }
517
518
519
520
521 private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
522 assert !isOpen(k1) && !isOpen(k2);
523 return kvComparator.compareRows(k1, 0, k1.length, k2, 0, k2.length);
524 }
525
526
527
528
529 private final int findStripeIndexByEndRow(byte[] endRow) {
530 assert !isInvalid(endRow);
531 if (isOpen(endRow)) return state.stripeEndRows.length;
532 return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
533 }
534
535
536
537
538 private final int findStripeForRow(byte[] row, boolean isStart) {
539 if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
540 if (!isStart && row == HConstants.EMPTY_END_ROW) return state.stripeFiles.size() - 1;
541
542
543
544
545
546 return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
547 }
548
549 @Override
550 public final byte[] getStartRow(int stripeIndex) {
551 return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
552 }
553
554 @Override
555 public final byte[] getEndRow(int stripeIndex) {
556 return (stripeIndex == state.stripeEndRows.length
557 ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
558 }
559
560
561 private byte[] startOf(StoreFile sf) {
562 byte[] result = this.fileStarts.get(sf);
563 return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
564 : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
565 }
566
567 private byte[] endOf(StoreFile sf) {
568 byte[] result = this.fileEnds.get(sf);
569 return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
570 : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
571 }
572
573
574
575
576
577
578 private static void insertFileIntoStripe(ArrayList<StoreFile> stripe, StoreFile sf) {
579
580
581 for (int insertBefore = 0; ; ++insertBefore) {
582 if (insertBefore == stripe.size()
583 || (StoreFile.Comparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
584 stripe.add(insertBefore, sf);
585 break;
586 }
587 }
588 }
589
590
591
592
593
594
595
596
597
598
599 private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<StoreFile> {
600 @Override
601 public java.util.Iterator<StoreFile> iterator() {
602 return new Iterator();
603 }
604
605 public class Iterator extends ConcatenatedLists<StoreFile>.Iterator {
606 public ArrayList<List<StoreFile>> getComponents() {
607 return components;
608 }
609
610 public void removeComponents(int startIndex) {
611 List<List<StoreFile>> subList = components.subList(startIndex, components.size());
612 for (List<StoreFile> entry : subList) {
613 size -= entry.size();
614 }
615 assert size >= 0;
616 subList.clear();
617 }
618
619 @Override
620 public void remove() {
621 if (!this.nextWasCalled) {
622 throw new IllegalStateException("No element to remove");
623 }
624 this.nextWasCalled = false;
625 List<StoreFile> src = components.get(currentComponent);
626 if (src instanceof ImmutableList<?>) {
627 src = new ArrayList<StoreFile>(src);
628 components.set(currentComponent, src);
629 }
630 src.remove(indexWithinComponent);
631 --size;
632 --indexWithinComponent;
633 if (src.isEmpty()) {
634 components.remove(currentComponent);
635 }
636 }
637 }
638 }
639
640
641
642
643
644
645 private class CompactionOrFlushMergeCopy {
646 private ArrayList<List<StoreFile>> stripeFiles = null;
647 private ArrayList<StoreFile> level0Files = null;
648 private ArrayList<byte[]> stripeEndRows = null;
649
650 private Collection<StoreFile> compactedFiles = null;
651 private Collection<StoreFile> results = null;
652
653 private List<StoreFile> l0Results = new ArrayList<StoreFile>();
654 private final boolean isFlush;
655
656 public CompactionOrFlushMergeCopy(boolean isFlush) {
657
658 this.stripeFiles = new ArrayList<List<StoreFile>>(
659 StripeStoreFileManager.this.state.stripeFiles);
660 this.isFlush = isFlush;
661 }
662
663 public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
664 throws IOException {
665 assert this.compactedFiles == null && this.results == null;
666 this.compactedFiles = compactedFiles;
667 this.results = results;
668
669 if (!isFlush) removeCompactedFiles();
670 TreeMap<byte[], StoreFile> newStripes = processResults();
671 if (newStripes != null) {
672 processNewCandidateStripes(newStripes);
673 }
674
675 State state = createNewState();
676 StripeStoreFileManager.this.state = state;
677 updateMetadataMaps();
678 }
679
680 private State createNewState() {
681 State oldState = StripeStoreFileManager.this.state;
682
683 assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
684 State newState = new State();
685 newState.level0Files = (this.level0Files == null) ? oldState.level0Files
686 : ImmutableList.copyOf(this.level0Files);
687 newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
688 : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
689 newState.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(this.stripeFiles.size());
690 for (List<StoreFile> newStripe : this.stripeFiles) {
691 newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
692 ? (ImmutableList<StoreFile>)newStripe : ImmutableList.copyOf(newStripe));
693 }
694
695 List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
696 if (!isFlush) newAllFiles.removeAll(compactedFiles);
697 newAllFiles.addAll(results);
698 newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
699 return newState;
700 }
701
702 private void updateMetadataMaps() {
703 StripeStoreFileManager parent = StripeStoreFileManager.this;
704 if (!isFlush) {
705 for (StoreFile sf : this.compactedFiles) {
706 parent.fileStarts.remove(sf);
707 parent.fileEnds.remove(sf);
708 }
709 }
710 if (this.l0Results != null) {
711 for (StoreFile sf : this.l0Results) {
712 parent.ensureLevel0Metadata(sf);
713 }
714 }
715 }
716
717
718
719
720
721 private final ArrayList<StoreFile> getStripeCopy(int index) {
722 List<StoreFile> stripeCopy = this.stripeFiles.get(index);
723 ArrayList<StoreFile> result = null;
724 if (stripeCopy instanceof ImmutableList<?>) {
725 result = new ArrayList<StoreFile>(stripeCopy);
726 this.stripeFiles.set(index, result);
727 } else {
728 result = (ArrayList<StoreFile>)stripeCopy;
729 }
730 return result;
731 }
732
733
734
735
736 private final ArrayList<StoreFile> getLevel0Copy() {
737 if (this.level0Files == null) {
738 this.level0Files = new ArrayList<StoreFile>(StripeStoreFileManager.this.state.level0Files);
739 }
740 return this.level0Files;
741 }
742
743
744
745
746
747
748 private TreeMap<byte[], StoreFile> processResults() throws IOException {
749 TreeMap<byte[], StoreFile> newStripes = null;
750 for (StoreFile sf : this.results) {
751 byte[] startRow = startOf(sf), endRow = endOf(sf);
752 if (isInvalid(endRow) || isInvalid(startRow)) {
753 if (!isFlush) {
754 LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
755 }
756 insertFileIntoStripe(getLevel0Copy(), sf);
757 this.l0Results.add(sf);
758 continue;
759 }
760 if (!this.stripeFiles.isEmpty()) {
761 int stripeIndex = findStripeIndexByEndRow(endRow);
762 if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
763
764 insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
765 continue;
766 }
767 }
768
769
770 if (newStripes == null) {
771 newStripes = new TreeMap<byte[], StoreFile>(MAP_COMPARATOR);
772 }
773 StoreFile oldSf = newStripes.put(endRow, sf);
774 if (oldSf != null) {
775 throw new IOException("Compactor has produced multiple files for the stripe ending in ["
776 + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
777 }
778 }
779 return newStripes;
780 }
781
782
783
784
785
786 private void removeCompactedFiles() throws IOException {
787 for (StoreFile oldFile : this.compactedFiles) {
788 byte[] oldEndRow = endOf(oldFile);
789 List<StoreFile> source = null;
790 if (isInvalid(oldEndRow)) {
791 source = getLevel0Copy();
792 } else {
793 int stripeIndex = findStripeIndexByEndRow(oldEndRow);
794 if (stripeIndex < 0) {
795 throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
796 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
797 }
798 source = getStripeCopy(stripeIndex);
799 }
800 if (!source.remove(oldFile)) {
801 throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
802 }
803 }
804 }
805
806
807
808
809
810
811 private void processNewCandidateStripes(
812 TreeMap<byte[], StoreFile> newStripes) throws IOException {
813
814 boolean hasStripes = !this.stripeFiles.isEmpty();
815 this.stripeEndRows = new ArrayList<byte[]>(
816 Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
817 int removeFrom = 0;
818 byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
819 byte[] lastEndRow = newStripes.lastKey();
820 if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
821 throw new IOException("Newly created stripes do not cover the entire key space.");
822 }
823
824 boolean canAddNewStripes = true;
825 Collection<StoreFile> filesForL0 = null;
826 if (hasStripes) {
827
828
829 if (isOpen(firstStartRow)) {
830 removeFrom = 0;
831 } else {
832 removeFrom = findStripeIndexByEndRow(firstStartRow);
833 if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
834 ++removeFrom;
835 }
836 int removeTo = findStripeIndexByEndRow(lastEndRow);
837 if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
838
839 ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
840 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
841 conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
842 }
843 if (!conflictingFiles.isEmpty()) {
844
845
846
847 if (isFlush) {
848 long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
849 LOG.warn("Stripes were created by a flush, but results of size " + newSize
850 + " cannot be added because the stripes have changed");
851 canAddNewStripes = false;
852 filesForL0 = newStripes.values();
853 } else {
854 long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
855 LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
856 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
857 filesForL0 = conflictingFiles;
858 }
859 if (filesForL0 != null) {
860 for (StoreFile sf : filesForL0) {
861 insertFileIntoStripe(getLevel0Copy(), sf);
862 }
863 l0Results.addAll(filesForL0);
864 }
865 }
866
867 if (canAddNewStripes) {
868
869 int originalCount = this.stripeFiles.size();
870 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
871 if (removeIndex != originalCount - 1) {
872 this.stripeEndRows.remove(removeIndex);
873 }
874 this.stripeFiles.remove(removeIndex);
875 }
876 }
877 }
878
879 if (!canAddNewStripes) return;
880
881
882 byte[] previousEndRow = null;
883 int insertAt = removeFrom;
884 for (Map.Entry<byte[], StoreFile> newStripe : newStripes.entrySet()) {
885 if (previousEndRow != null) {
886
887 assert !isOpen(previousEndRow);
888 byte[] startRow = startOf(newStripe.getValue());
889 if (!rowEquals(previousEndRow, startRow)) {
890 throw new IOException("The new stripes produced by "
891 + (isFlush ? "flush" : "compaction") + " are not contiguous");
892 }
893 }
894
895 ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
896 tmp.add(newStripe.getValue());
897 stripeFiles.add(insertAt, tmp);
898 previousEndRow = newStripe.getKey();
899 if (!isOpen(previousEndRow)) {
900 stripeEndRows.add(insertAt, previousEndRow);
901 }
902 ++insertAt;
903 }
904 }
905 }
906
907 @Override
908 public List<StoreFile> getLevel0Files() {
909 return this.state.level0Files;
910 }
911
912 @Override
913 public List<byte[]> getStripeBoundaries() {
914 if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
915 ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
916 result.add(OPEN_KEY);
917 Collections.addAll(result, this.state.stripeEndRows);
918 result.add(OPEN_KEY);
919 return result;
920 }
921
922 @Override
923 public ArrayList<ImmutableList<StoreFile>> getStripes() {
924 return this.state.stripeFiles;
925 }
926
927 @Override
928 public int getStripeCount() {
929 return this.state.stripeFiles.size();
930 }
931
932 @Override
933 public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
934
935
936 State state = this.state;
937 Collection<StoreFile> expiredStoreFiles = null;
938 for (ImmutableList<StoreFile> stripe : state.stripeFiles) {
939 expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
940 }
941 return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
942 }
943
944 private Collection<StoreFile> findExpiredFiles(ImmutableList<StoreFile> stripe, long maxTs,
945 List<StoreFile> filesCompacting, Collection<StoreFile> expiredStoreFiles) {
946
947 for (int i = 1; i < stripe.size(); ++i) {
948 StoreFile sf = stripe.get(i);
949 long fileTs = sf.getReader().getMaxTimestamp();
950 if (fileTs < maxTs && !filesCompacting.contains(sf)) {
951 LOG.info("Found an expired store file: " + sf.getPath()
952 + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
953 if (expiredStoreFiles == null) {
954 expiredStoreFiles = new ArrayList<StoreFile>();
955 }
956 expiredStoreFiles.add(sf);
957 }
958 }
959 return expiredStoreFiles;
960 }
961
962 @Override
963 public double getCompactionPressure() {
964 State stateLocal = this.state;
965 if (stateLocal.allFilesCached.size() > blockingFileCount) {
966
967 return 2.0;
968 }
969 if (stateLocal.stripeFiles.isEmpty()) {
970 return 0.0;
971 }
972 int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size();
973
974
975 int delta = stateLocal.level0Files.isEmpty() ? 0 : 1;
976 double max = 0.0;
977 for (ImmutableList<StoreFile> stripeFile : stateLocal.stripeFiles) {
978 int stripeFileCount = stripeFile.size();
979 double normCount =
980 (double) (stripeFileCount + delta - config.getStripeCompactMinFiles())
981 / (blockingFilePerStripe - config.getStripeCompactMinFiles());
982 if (normCount >= 1.0) {
983
984
985 return 1.0;
986 }
987 if (normCount > max) {
988 max = normCount;
989 }
990 }
991 return max;
992 }
993 }