View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.TreeMap;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.KeyValue.KVComparator;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ConcatenatedLists;
43  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
44  
45  import com.google.common.collect.ImmutableCollection;
46  import com.google.common.collect.ImmutableList;
47  
48  /**
49   * Stripe implementation of StoreFileManager.
50   * Not thread safe - relies on external locking (in HStore). Collections that this class
51   * returns are immutable or unique to the call, so they should be safe.
52   * Stripe store splits the key space of the region into non-overlapping stripes, as well as
53   * some recent files that have all the keys (level 0). Each stripe contains a set of files.
54   * When L0 is compacted, it's split into the files corresponding to existing stripe boundaries,
55   * that can thus be added to stripes.
56   * When scan or get happens, it only has to read the files from the corresponding stripes.
57   * See StripeCompationPolicy on how the stripes are determined; this class doesn't care.
58   *
59   * This class should work together with StripeCompactionPolicy and StripeCompactor.
60   * With regard to how they work, we make at least the following (reasonable) assumptions:
61   *  - Compaction produces one file per new stripe (if any); that is easy to change.
62   *  - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
63   */
64  @InterfaceAudience.Private
65  public class StripeStoreFileManager
66    implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
67    static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
68  
69    /**
70     * The file metadata fields that contain the stripe information.
71     */
72    public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
73    public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
74  
75    private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
76  
77    /**
78     * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf).
79     */
80    public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
81    final static byte[] INVALID_KEY = null;
82  
83    /**
84     * The state class. Used solely to replace results atomically during
85     * compactions and avoid complicated error handling.
86     */
87    private static class State {
88      /**
89       * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
90       * here. It is invariant that the start row of the stripe is the end row of the previous one
91       * (and is an open boundary for the first one).
92       */
93      public byte[][] stripeEndRows = new byte[0][];
94  
95      /**
96       * Files by stripe. Each element of the list corresponds to stripeEndRow element with the
97       * same index, except the last one. Inside each list, the files are in reverse order by
98       * seqNum. Note that the length of this is one higher than that of stripeEndKeys.
99       */
100     public ArrayList<ImmutableList<StoreFile>> stripeFiles
101       = new ArrayList<ImmutableList<StoreFile>>();
102     /** Level 0. The files are in reverse order by seqNum. */
103     public ImmutableList<StoreFile> level0Files = ImmutableList.<StoreFile>of();
104 
105     /** Cached list of all files in the structure, to return from some calls */
106     public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
107   }
108   private State state = null;
109 
110   /** Cached file metadata (or overrides as the case may be) */
111   private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
112   private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
113   /** Normally invalid key is null, but in the map null is the result for "no key"; so use
114    * the following constant value in these maps instead. Note that this is a constant and
115    * we use it to compare by reference when we read from the map. */
116   private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
117 
118   private final KVComparator kvComparator;
119   private StripeStoreConfig config;
120 
121   private final int blockingFileCount;
122 
123   public StripeStoreFileManager(
124       KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
125     this.kvComparator = kvComparator;
126     this.config = config;
127     this.blockingFileCount = conf.getInt(
128         HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
129   }
130 
131   @Override
132   public void loadFiles(List<StoreFile> storeFiles) {
133     loadUnclassifiedStoreFiles(storeFiles);
134   }
135 
136   @Override
137   public Collection<StoreFile> getStorefiles() {
138     return state.allFilesCached;
139   }
140 
141   @Override
142   public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
143     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
144     cmc.mergeResults(null, sfs);
145     debugDumpState("Added new files");
146   }
147 
148   @Override
149   public ImmutableCollection<StoreFile> clearFiles() {
150     ImmutableCollection<StoreFile> result = state.allFilesCached;
151     this.state = new State();
152     this.fileStarts.clear();
153     this.fileEnds.clear();
154     return result;
155   }
156 
157   @Override
158   public int getStorefileCount() {
159     return state.allFilesCached.size();
160   }
161 
162   /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)}
163    * for details on this methods. */
164   @Override
165   public Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
166     KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
167     // Order matters for this call.
168     result.addSublist(state.level0Files);
169     if (!state.stripeFiles.isEmpty()) {
170       int lastStripeIndex = findStripeForRow(targetKey.getRow(), false);
171       for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
172         result.addSublist(state.stripeFiles.get(stripeIndex));
173       }
174     }
175     return result.iterator();
176   }
177 
178   /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and
179    * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, Cell)}
180    * for details on this methods. */
181   @Override
182   public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
183       Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) {
184     KeyBeforeConcatenatedLists.Iterator original =
185         (KeyBeforeConcatenatedLists.Iterator)candidateFiles;
186     assert original != null;
187     ArrayList<List<StoreFile>> components = original.getComponents();
188     for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
189       StoreFile sf = components.get(firstIrrelevant).get(0);
190       byte[] endKey = endOf(sf);
191       // Entries are ordered as such: L0, then stripes in reverse order. We never remove
192       // level 0; we remove the stripe, and all subsequent ones, as soon as we find the
193       // first one that cannot possibly have better candidates.
194       if (!isInvalid(endKey) && !isOpen(endKey)
195           && (nonOpenRowCompare(endKey, targetKey.getRow()) <= 0)) {
196         original.removeComponents(firstIrrelevant);
197         break;
198       }
199     }
200     return original;
201   }
202 
203   @Override
204   /**
205    * Override of getSplitPoint that determines the split point as the boundary between two
206    * stripes, unless it causes significant imbalance between split sides' sizes. In that
207    * case, the split boundary will be chosen from the middle of one of the stripes to
208    * minimize imbalance.
209    * @return The split point, or null if no split is possible.
210    */
211   public byte[] getSplitPoint() throws IOException {
212     if (this.getStorefileCount() == 0) return null;
213     if (state.stripeFiles.size() <= 1) {
214       return getSplitPointFromAllFiles();
215     }
216     int leftIndex = -1, rightIndex = state.stripeFiles.size();
217     long leftSize = 0, rightSize = 0;
218     long lastLeftSize = 0, lastRightSize = 0;
219     while (rightIndex - 1 != leftIndex) {
220       if (leftSize >= rightSize) {
221         --rightIndex;
222         lastRightSize = getStripeFilesSize(rightIndex);
223         rightSize += lastRightSize;
224       } else {
225         ++leftIndex;
226         lastLeftSize = getStripeFilesSize(leftIndex);
227         leftSize += lastLeftSize;
228       }
229     }
230     if (leftSize == 0 || rightSize == 0) {
231       String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
232           + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
233       debugDumpState(errMsg);
234       LOG.warn(errMsg);
235       return getSplitPointFromAllFiles();
236     }
237     double ratio = (double)rightSize / leftSize;
238     if (ratio < 1) {
239       ratio = 1 / ratio;
240     }
241     if (config.getMaxSplitImbalance() > ratio) return state.stripeEndRows[leftIndex];
242 
243     // If the difference between the sides is too large, we could get the proportional key on
244     // the a stripe to equalize the difference, but there's no proportional key method at the
245     // moment, and it's not extremely important.
246     // See if we can achieve better ratio if we split the bigger side in half.
247     boolean isRightLarger = rightSize >= leftSize;
248     double newRatio = isRightLarger
249         ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
250         : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
251     if (newRatio < 1) {
252       newRatio = 1 / newRatio;
253     }
254     if (newRatio >= ratio)  return state.stripeEndRows[leftIndex];
255     LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
256         + newRatio + " configured ratio " + config.getMaxSplitImbalance());
257     // Ok, we may get better ratio, get it.
258     return StoreUtils.getLargestFile(state.stripeFiles.get(
259         isRightLarger ? rightIndex : leftIndex)).getFileSplitPoint(this.kvComparator);
260   }
261 
262   private byte[] getSplitPointFromAllFiles() throws IOException {
263     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
264     sfs.addSublist(state.level0Files);
265     sfs.addAllSublists(state.stripeFiles);
266     if (sfs.isEmpty()) return null;
267     return StoreUtils.getLargestFile(sfs).getFileSplitPoint(this.kvComparator);
268   }
269 
270   private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
271     return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
272   }
273 
274   @Override
275   public Collection<StoreFile> getFilesForScanOrGet(
276       boolean isGet, byte[] startRow, byte[] stopRow) {
277     if (state.stripeFiles.isEmpty()) {
278       return state.level0Files; // There's just L0.
279     }
280 
281     int firstStripe = findStripeForRow(startRow, true);
282     int lastStripe = findStripeForRow(stopRow, false);
283     assert firstStripe <= lastStripe;
284     if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
285       return state.stripeFiles.get(firstStripe); // There's just one stripe we need.
286     }
287     if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
288       return state.allFilesCached; // We need to read all files.
289     }
290 
291     ConcatenatedLists<StoreFile> result = new ConcatenatedLists<StoreFile>();
292     result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
293     result.addSublist(state.level0Files);
294     return result;
295   }
296 
297   @Override
298   public void addCompactionResults(
299     Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException {
300     // See class comment for the assumptions we make here.
301     LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
302         + " files replaced by " + results.size());
303     // In order to be able to fail in the middle of the operation, we'll operate on lazy
304     // copies and apply the result at the end.
305     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
306     cmc.mergeResults(compactedFiles, results);
307     debugDumpState("Merged compaction results");
308   }
309 
310   @Override
311   public int getStoreCompactionPriority() {
312     // If there's only L0, do what the default store does.
313     // If we are in critical priority, do the same - we don't want to trump all stores all
314     // the time due to how many files we have.
315     int fc = getStorefileCount();
316     if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
317       return this.blockingFileCount - fc;
318     }
319     // If we are in good shape, we don't want to be trumped by all other stores due to how
320     // many files we have, so do an approximate mapping to normal priority range; L0 counts
321     // for all stripes.
322     int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
323     int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
324     return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
325   }
326 
327   /**
328    * Gets the total size of all files in the stripe.
329    * @param stripeIndex Stripe index.
330    * @return Size.
331    */
332   private long getStripeFilesSize(int stripeIndex) {
333     long result = 0;
334     for (StoreFile sf : state.stripeFiles.get(stripeIndex)) {
335       result += sf.getReader().length();
336     }
337     return result;
338   }
339 
340   /**
341    * Loads initial store files that were picked up from some physical location pertaining to
342    * this store (presumably). Unlike adding files after compaction, assumes empty initial
343    * sets, and is forgiving with regard to stripe constraints - at worst, many/all files will
344    * go to level 0.
345    * @param storeFiles Store files to add.
346    */
347   private void loadUnclassifiedStoreFiles(List<StoreFile> storeFiles) {
348     LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
349     TreeMap<byte[], ArrayList<StoreFile>> candidateStripes =
350         new TreeMap<byte[], ArrayList<StoreFile>>(MAP_COMPARATOR);
351     ArrayList<StoreFile> level0Files = new ArrayList<StoreFile>();
352     // Separate the files into tentative stripes; then validate. Currently, we rely on metadata.
353     // If needed, we could dynamically determine the stripes in future.
354     for (StoreFile sf : storeFiles) {
355       byte[] startRow = startOf(sf), endRow = endOf(sf);
356       // Validate the range and put the files into place.
357       if (isInvalid(startRow) || isInvalid(endRow)) {
358         insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0.
359         ensureLevel0Metadata(sf);
360       } else if (!isOpen(startRow) && !isOpen(endRow) &&
361           nonOpenRowCompare(startRow, endRow) >= 0) {
362         LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
363           + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
364         insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also.
365         ensureLevel0Metadata(sf);
366       } else {
367         ArrayList<StoreFile> stripe = candidateStripes.get(endRow);
368         if (stripe == null) {
369           stripe = new ArrayList<StoreFile>();
370           candidateStripes.put(endRow, stripe);
371         }
372         insertFileIntoStripe(stripe, sf);
373       }
374     }
375     // Possible improvement - for variable-count stripes, if all the files are in L0, we can
376     // instead create single, open-ended stripe with all files.
377 
378     boolean hasOverlaps = false;
379     byte[] expectedStartRow = null; // first stripe can start wherever
380     Iterator<Map.Entry<byte[], ArrayList<StoreFile>>> entryIter =
381         candidateStripes.entrySet().iterator();
382     while (entryIter.hasNext()) {
383       Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
384       ArrayList<StoreFile> files = entry.getValue();
385       // Validate the file start rows, and remove the bad ones to level 0.
386       for (int i = 0; i < files.size(); ++i) {
387         StoreFile sf = files.get(i);
388         byte[] startRow = startOf(sf);
389         if (expectedStartRow == null) {
390           expectedStartRow = startRow; // ensure that first stripe is still consistent
391         } else if (!rowEquals(expectedStartRow, startRow)) {
392           hasOverlaps = true;
393           LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
394               + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
395               + "], to L0 it goes");
396           StoreFile badSf = files.remove(i);
397           insertFileIntoStripe(level0Files, badSf);
398           ensureLevel0Metadata(badSf);
399           --i;
400         }
401       }
402       // Check if any files from the candidate stripe are valid. If so, add a stripe.
403       byte[] endRow = entry.getKey();
404       if (!files.isEmpty()) {
405         expectedStartRow = endRow; // Next stripe must start exactly at that key.
406       } else {
407         entryIter.remove();
408       }
409     }
410 
411     // In the end, there must be open ends on two sides. If not, and there were no errors i.e.
412     // files are consistent, they might be coming from a split. We will treat the boundaries
413     // as open keys anyway, and log the message.
414     // If there were errors, we'll play it safe and dump everything into L0.
415     if (!candidateStripes.isEmpty()) {
416       StoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
417       boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
418       if (!isOpen) {
419         LOG.warn("The range of the loaded files does not cover full key space: from ["
420             + Bytes.toString(startOf(firstFile)) + "], to ["
421             + Bytes.toString(candidateStripes.lastKey()) + "]");
422         if (!hasOverlaps) {
423           ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
424           ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
425         } else {
426           LOG.warn("Inconsistent files, everything goes to L0.");
427           for (ArrayList<StoreFile> files : candidateStripes.values()) {
428             for (StoreFile sf : files) {
429               insertFileIntoStripe(level0Files, sf);
430               ensureLevel0Metadata(sf);
431             }
432           }
433           candidateStripes.clear();
434         }
435       }
436     }
437 
438     // Copy the results into the fields.
439     State state = new State();
440     state.level0Files = ImmutableList.copyOf(level0Files);
441     state.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(candidateStripes.size());
442     state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
443     ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(level0Files);
444     int i = candidateStripes.size() - 1;
445     for (Map.Entry<byte[], ArrayList<StoreFile>> entry : candidateStripes.entrySet()) {
446       state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
447       newAllFiles.addAll(entry.getValue());
448       if (i > 0) {
449         state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
450       }
451       --i;
452     }
453     state.allFilesCached = ImmutableList.copyOf(newAllFiles);
454     this.state = state;
455     debugDumpState("Files loaded");
456   }
457 
458   private void ensureEdgeStripeMetadata(ArrayList<StoreFile> stripe, boolean isFirst) {
459     HashMap<StoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
460     for (StoreFile sf : stripe) {
461       targetMap.put(sf, OPEN_KEY);
462     }
463   }
464 
465   private void ensureLevel0Metadata(StoreFile sf) {
466     if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
467     if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
468   }
469 
470   private void debugDumpState(String string) {
471     if (!LOG.isDebugEnabled()) return;
472     StringBuilder sb = new StringBuilder();
473     sb.append("\n" + string + "; current stripe state is as such:");
474     sb.append("\n level 0 with ")
475         .append(state.level0Files.size())
476         .append(
477           " files: "
478               + TraditionalBinaryPrefix.long2String(
479                 StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";");
480     for (int i = 0; i < state.stripeFiles.size(); ++i) {
481       String endRow = (i == state.stripeEndRows.length)
482           ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
483       sb.append("\n stripe ending in ")
484           .append(endRow)
485           .append(" with ")
486           .append(state.stripeFiles.get(i).size())
487           .append(
488             " files: "
489                 + TraditionalBinaryPrefix.long2String(
490                   StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";");
491     }
492     sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
493     sb.append("\n").append(getStorefileCount()).append(" files total.");
494     LOG.debug(sb.toString());
495   }
496 
497   /**
498    * Checks whether the key indicates an open interval boundary (i.e. infinity).
499    */
500   private static final boolean isOpen(byte[] key) {
501     return key != null && key.length == 0;
502   }
503 
504   /**
505    * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files).
506    */
507   private static final boolean isInvalid(byte[] key) {
508     return key == INVALID_KEY;
509   }
510 
511   /**
512    * Compare two keys for equality.
513    */
514   private final boolean rowEquals(byte[] k1, byte[] k2) {
515     return kvComparator.matchingRows(k1, 0, k1.length, k2, 0, k2.length);
516   }
517 
518   /**
519    * Compare two keys. Keys must not be open (isOpen(row) == false).
520    */
521   private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
522     assert !isOpen(k1) && !isOpen(k2);
523     return kvComparator.compareRows(k1, 0, k1.length, k2, 0, k2.length);
524   }
525 
526   /**
527    * Finds the stripe index by end row.
528    */
529   private final int findStripeIndexByEndRow(byte[] endRow) {
530     assert !isInvalid(endRow);
531     if (isOpen(endRow)) return state.stripeEndRows.length;
532     return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
533   }
534 
535   /**
536    * Finds the stripe index for the stripe containing a row provided externally for get/scan.
537    */
538   private final int findStripeForRow(byte[] row, boolean isStart) {
539     if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
540     if (!isStart && row == HConstants.EMPTY_END_ROW) return state.stripeFiles.size() - 1;
541     // If there's an exact match below, a stripe ends at "row". Stripe right boundary is
542     // exclusive, so that means the row is in the next stripe; thus, we need to add one to index.
543     // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where
544     // insertion point is the index of the next greater element, or list size if none. The
545     // insertion point happens to be exactly what we need, so we need to add one to the result.
546     return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
547   }
548 
549   @Override
550   public final byte[] getStartRow(int stripeIndex) {
551     return (stripeIndex == 0  ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
552   }
553 
554   @Override
555   public final byte[] getEndRow(int stripeIndex) {
556     return (stripeIndex == state.stripeEndRows.length
557         ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
558   }
559 
560 
561   private byte[] startOf(StoreFile sf) {
562     byte[] result = this.fileStarts.get(sf);
563     return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
564         : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
565   }
566 
567   private byte[] endOf(StoreFile sf) {
568     byte[] result = this.fileEnds.get(sf);
569     return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
570         : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
571   }
572 
573   /**
574    * Inserts a file in the correct place (by seqnum) in a stripe copy.
575    * @param stripe Stripe copy to insert into.
576    * @param sf File to insert.
577    */
578   private static void insertFileIntoStripe(ArrayList<StoreFile> stripe, StoreFile sf) {
579     // The only operation for which sorting of the files matters is KeyBefore. Therefore,
580     // we will store the file in reverse order by seqNum from the outset.
581     for (int insertBefore = 0; ; ++insertBefore) {
582       if (insertBefore == stripe.size()
583           || (StoreFile.Comparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
584         stripe.add(insertBefore, sf);
585         break;
586       }
587     }
588   }
589 
590   /**
591    * An extension of ConcatenatedLists that has several peculiar properties.
592    * First, one can cut the tail of the logical list by removing last several sub-lists.
593    * Second, items can be removed thru iterator.
594    * Third, if the sub-lists are immutable, they are replaced with mutable copies when needed.
595    * On average KeyBefore operation will contain half the stripes as potential candidates,
596    * but will quickly cut down on them as it finds something in the more likely ones; thus,
597    * the above allow us to avoid unnecessary copying of a bunch of lists.
598    */
599   private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<StoreFile> {
600     @Override
601     public java.util.Iterator<StoreFile> iterator() {
602       return new Iterator();
603     }
604 
605     public class Iterator extends ConcatenatedLists<StoreFile>.Iterator {
606       public ArrayList<List<StoreFile>> getComponents() {
607         return components;
608       }
609 
610       public void removeComponents(int startIndex) {
611         List<List<StoreFile>> subList = components.subList(startIndex, components.size());
612         for (List<StoreFile> entry : subList) {
613           size -= entry.size();
614         }
615         assert size >= 0;
616         subList.clear();
617       }
618 
619       @Override
620       public void remove() {
621         if (!this.nextWasCalled) {
622           throw new IllegalStateException("No element to remove");
623         }
624         this.nextWasCalled = false;
625         List<StoreFile> src = components.get(currentComponent);
626         if (src instanceof ImmutableList<?>) {
627           src = new ArrayList<StoreFile>(src);
628           components.set(currentComponent, src);
629         }
630         src.remove(indexWithinComponent);
631         --size;
632         --indexWithinComponent;
633         if (src.isEmpty()) {
634           components.remove(currentComponent); // indexWithinComponent is already -1 here.
635         }
636       }
637     }
638   }
639 
640   /**
641    * Non-static helper class for merging compaction or flush results.
642    * Since we want to merge them atomically (more or less), it operates on lazy copies,
643    * then creates a new state object and puts it in place.
644    */
645   private class CompactionOrFlushMergeCopy {
646     private ArrayList<List<StoreFile>> stripeFiles = null;
647     private ArrayList<StoreFile> level0Files = null;
648     private ArrayList<byte[]> stripeEndRows = null;
649 
650     private Collection<StoreFile> compactedFiles = null;
651     private Collection<StoreFile> results = null;
652 
653     private List<StoreFile> l0Results = new ArrayList<StoreFile>();
654     private final boolean isFlush;
655 
656     public CompactionOrFlushMergeCopy(boolean isFlush) {
657       // Create a lazy mutable copy (other fields are so lazy they start out as nulls).
658       this.stripeFiles = new ArrayList<List<StoreFile>>(
659           StripeStoreFileManager.this.state.stripeFiles);
660       this.isFlush = isFlush;
661     }
662 
663     public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
664         throws IOException {
665       assert this.compactedFiles == null && this.results == null;
666       this.compactedFiles = compactedFiles;
667       this.results = results;
668       // Do logical processing.
669       if (!isFlush) removeCompactedFiles();
670       TreeMap<byte[], StoreFile> newStripes = processResults();
671       if (newStripes != null) {
672         processNewCandidateStripes(newStripes);
673       }
674       // Create new state and update parent.
675       State state = createNewState();
676       StripeStoreFileManager.this.state = state;
677       updateMetadataMaps();
678     }
679 
680     private State createNewState() {
681       State oldState = StripeStoreFileManager.this.state;
682       // Stripe count should be the same unless the end rows changed.
683       assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
684       State newState = new State();
685       newState.level0Files = (this.level0Files == null) ? oldState.level0Files
686           : ImmutableList.copyOf(this.level0Files);
687       newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
688           : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
689       newState.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(this.stripeFiles.size());
690       for (List<StoreFile> newStripe : this.stripeFiles) {
691         newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
692             ? (ImmutableList<StoreFile>)newStripe : ImmutableList.copyOf(newStripe));
693       }
694 
695       List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
696       if (!isFlush) newAllFiles.removeAll(compactedFiles);
697       newAllFiles.addAll(results);
698       newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
699       return newState;
700     }
701 
702     private void updateMetadataMaps() {
703       StripeStoreFileManager parent = StripeStoreFileManager.this;
704       if (!isFlush) {
705         for (StoreFile sf : this.compactedFiles) {
706           parent.fileStarts.remove(sf);
707           parent.fileEnds.remove(sf);
708         }
709       }
710       if (this.l0Results != null) {
711         for (StoreFile sf : this.l0Results) {
712           parent.ensureLevel0Metadata(sf);
713         }
714       }
715     }
716 
717     /**
718      * @param index Index of the stripe we need.
719      * @return A lazy stripe copy from current stripes.
720      */
721     private final ArrayList<StoreFile> getStripeCopy(int index) {
722       List<StoreFile> stripeCopy = this.stripeFiles.get(index);
723       ArrayList<StoreFile> result = null;
724       if (stripeCopy instanceof ImmutableList<?>) {
725         result = new ArrayList<StoreFile>(stripeCopy);
726         this.stripeFiles.set(index, result);
727       } else {
728         result = (ArrayList<StoreFile>)stripeCopy;
729       }
730       return result;
731     }
732 
733     /**
734      * @return A lazy L0 copy from current state.
735      */
736     private final ArrayList<StoreFile> getLevel0Copy() {
737       if (this.level0Files == null) {
738         this.level0Files = new ArrayList<StoreFile>(StripeStoreFileManager.this.state.level0Files);
739       }
740       return this.level0Files;
741     }
742 
743     /**
744      * Process new files, and add them either to the structure of existing stripes,
745      * or to the list of new candidate stripes.
746      * @return New candidate stripes.
747      */
748     private TreeMap<byte[], StoreFile> processResults() throws IOException {
749       TreeMap<byte[], StoreFile> newStripes = null;
750       for (StoreFile sf : this.results) {
751         byte[] startRow = startOf(sf), endRow = endOf(sf);
752         if (isInvalid(endRow) || isInvalid(startRow)) {
753           if (!isFlush) {
754             LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
755           }
756           insertFileIntoStripe(getLevel0Copy(), sf);
757           this.l0Results.add(sf);
758           continue;
759         }
760         if (!this.stripeFiles.isEmpty()) {
761           int stripeIndex = findStripeIndexByEndRow(endRow);
762           if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
763             // Simple/common case - add file to an existing stripe.
764             insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
765             continue;
766           }
767         }
768 
769         // Make a new candidate stripe.
770         if (newStripes == null) {
771           newStripes = new TreeMap<byte[], StoreFile>(MAP_COMPARATOR);
772         }
773         StoreFile oldSf = newStripes.put(endRow, sf);
774         if (oldSf != null) {
775           throw new IOException("Compactor has produced multiple files for the stripe ending in ["
776               + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
777         }
778       }
779       return newStripes;
780     }
781 
782     /**
783      * Remove compacted files.
784      * @param compactedFiles Compacted files.
785      */
786     private void removeCompactedFiles() throws IOException {
787       for (StoreFile oldFile : this.compactedFiles) {
788         byte[] oldEndRow = endOf(oldFile);
789         List<StoreFile> source = null;
790         if (isInvalid(oldEndRow)) {
791           source = getLevel0Copy();
792         } else {
793           int stripeIndex = findStripeIndexByEndRow(oldEndRow);
794           if (stripeIndex < 0) {
795             throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
796                 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
797           }
798           source = getStripeCopy(stripeIndex);
799         }
800         if (!source.remove(oldFile)) {
801           throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
802         }
803       }
804     }
805 
806     /**
807      * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
808      * new candidate stripes/removes old stripes; produces new set of stripe end rows.
809      * @param newStripes  New stripes - files by end row.
810      */
811     private void processNewCandidateStripes(
812         TreeMap<byte[], StoreFile> newStripes) throws IOException {
813       // Validate that the removed and added aggregate ranges still make for a full key space.
814       boolean hasStripes = !this.stripeFiles.isEmpty();
815       this.stripeEndRows = new ArrayList<byte[]>(
816           Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
817       int removeFrom = 0;
818       byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
819       byte[] lastEndRow = newStripes.lastKey();
820       if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
821         throw new IOException("Newly created stripes do not cover the entire key space.");
822       }
823 
824       boolean canAddNewStripes = true;
825       Collection<StoreFile> filesForL0 = null;
826       if (hasStripes) {
827         // Determine which stripes will need to be removed because they conflict with new stripes.
828         // The new boundaries should match old stripe boundaries, so we should get exact matches.
829         if (isOpen(firstStartRow)) {
830           removeFrom = 0;
831         } else {
832           removeFrom = findStripeIndexByEndRow(firstStartRow);
833           if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
834           ++removeFrom;
835         }
836         int removeTo = findStripeIndexByEndRow(lastEndRow);
837         if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
838         // See if there are files in the stripes we are trying to replace.
839         ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
840         for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
841           conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
842         }
843         if (!conflictingFiles.isEmpty()) {
844           // This can be caused by two things - concurrent flush into stripes, or a bug.
845           // Unfortunately, we cannot tell them apart without looking at timing or something
846           // like that. We will assume we are dealing with a flush and dump it into L0.
847           if (isFlush) {
848             long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
849             LOG.warn("Stripes were created by a flush, but results of size " + newSize
850                 + " cannot be added because the stripes have changed");
851             canAddNewStripes = false;
852             filesForL0 = newStripes.values();
853           } else {
854             long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
855             LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
856                 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
857             filesForL0 = conflictingFiles;
858           }
859           if (filesForL0 != null) {
860             for (StoreFile sf : filesForL0) {
861               insertFileIntoStripe(getLevel0Copy(), sf);
862             }
863             l0Results.addAll(filesForL0);
864           }
865         }
866 
867         if (canAddNewStripes) {
868           // Remove old empty stripes.
869           int originalCount = this.stripeFiles.size();
870           for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
871             if (removeIndex != originalCount - 1) {
872               this.stripeEndRows.remove(removeIndex);
873             }
874             this.stripeFiles.remove(removeIndex);
875           }
876         }
877       }
878 
879       if (!canAddNewStripes) return; // Files were already put into L0.
880 
881       // Now, insert new stripes. The total ranges match, so we can insert where we removed.
882       byte[] previousEndRow = null;
883       int insertAt = removeFrom;
884       for (Map.Entry<byte[], StoreFile> newStripe : newStripes.entrySet()) {
885         if (previousEndRow != null) {
886           // Validate that the ranges are contiguous.
887           assert !isOpen(previousEndRow);
888           byte[] startRow = startOf(newStripe.getValue());
889           if (!rowEquals(previousEndRow, startRow)) {
890             throw new IOException("The new stripes produced by "
891                 + (isFlush ? "flush" : "compaction") + " are not contiguous");
892           }
893         }
894         // Add the new stripe.
895         ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
896         tmp.add(newStripe.getValue());
897         stripeFiles.add(insertAt, tmp);
898         previousEndRow = newStripe.getKey();
899         if (!isOpen(previousEndRow)) {
900           stripeEndRows.add(insertAt, previousEndRow);
901         }
902         ++insertAt;
903       }
904     }
905   }
906 
907   @Override
908   public List<StoreFile> getLevel0Files() {
909     return this.state.level0Files;
910   }
911 
912   @Override
913   public List<byte[]> getStripeBoundaries() {
914     if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
915     ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
916     result.add(OPEN_KEY);
917     Collections.addAll(result, this.state.stripeEndRows);
918     result.add(OPEN_KEY);
919     return result;
920   }
921 
922   @Override
923   public ArrayList<ImmutableList<StoreFile>> getStripes() {
924     return this.state.stripeFiles;
925   }
926 
927   @Override
928   public int getStripeCount() {
929     return this.state.stripeFiles.size();
930   }
931 
932   @Override
933   public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
934     // 1) We can never get rid of the last file which has the maximum seqid in a stripe.
935     // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
936     State state = this.state;
937     Collection<StoreFile> expiredStoreFiles = null;
938     for (ImmutableList<StoreFile> stripe : state.stripeFiles) {
939       expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
940     }
941     return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
942   }
943 
944   private Collection<StoreFile> findExpiredFiles(ImmutableList<StoreFile> stripe, long maxTs,
945       List<StoreFile> filesCompacting, Collection<StoreFile> expiredStoreFiles) {
946     // Order by seqnum is reversed.
947     for (int i = 1; i < stripe.size(); ++i) {
948       StoreFile sf = stripe.get(i);
949       long fileTs = sf.getReader().getMaxTimestamp();
950       if (fileTs < maxTs && !filesCompacting.contains(sf)) {
951         LOG.info("Found an expired store file: " + sf.getPath()
952             + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
953         if (expiredStoreFiles == null) {
954           expiredStoreFiles = new ArrayList<StoreFile>();
955         }
956         expiredStoreFiles.add(sf);
957       }
958     }
959     return expiredStoreFiles;
960   }
961 
962   @Override
963   public double getCompactionPressure() {
964     State stateLocal = this.state;
965     if (stateLocal.allFilesCached.size() > blockingFileCount) {
966       // just a hit to tell others that we have reached the blocking file count.
967       return 2.0;
968     }
969     if (stateLocal.stripeFiles.isEmpty()) {
970       return 0.0;
971     }
972     int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size();
973     // do not calculate L0 separately because data will be moved to stripe quickly and in most cases
974     // we flush data to stripe directly.
975     int delta = stateLocal.level0Files.isEmpty() ? 0 : 1;
976     double max = 0.0;
977     for (ImmutableList<StoreFile> stripeFile : stateLocal.stripeFiles) {
978       int stripeFileCount = stripeFile.size();
979       double normCount =
980           (double) (stripeFileCount + delta - config.getStripeCompactMinFiles())
981               / (blockingFilePerStripe - config.getStripeCompactMinFiles());
982       if (normCount >= 1.0) {
983         // This could happen if stripe is not split evenly. Do not return values that larger than
984         // 1.0 because we have not reached the blocking file count actually.
985         return 1.0;
986       }
987       if (normCount > max) {
988         max = normCount;
989       }
990     }
991     return max;
992   }
993 }