001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Comparator;
027import java.util.HashMap;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.TreeMap;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellComparator;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.ConcatenatedLists;
043import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
048import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
049
050/**
051 * Stripe implementation of StoreFileManager.
052 * Not thread safe - relies on external locking (in HStore). Collections that this class
053 * returns are immutable or unique to the call, so they should be safe.
054 * Stripe store splits the key space of the region into non-overlapping stripes, as well as
055 * some recent files that have all the keys (level 0). Each stripe contains a set of files.
056 * When L0 is compacted, it's split into the files corresponding to existing stripe boundaries,
057 * that can thus be added to stripes.
058 * When scan or get happens, it only has to read the files from the corresponding stripes.
059 * See StripeCompationPolicy on how the stripes are determined; this class doesn't care.
060 *
061 * This class should work together with StripeCompactionPolicy and StripeCompactor.
062 * With regard to how they work, we make at least the following (reasonable) assumptions:
063 *  - Compaction produces one file per new stripe (if any); that is easy to change.
064 *  - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
065 */
066@InterfaceAudience.Private
067public class StripeStoreFileManager
068  implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
069  private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFileManager.class);
070
071  /**
072   * The file metadata fields that contain the stripe information.
073   */
074  public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
075  public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
076
077  private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
078
079  /**
080   * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf).
081   */
082  public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
083  final static byte[] INVALID_KEY = null;
084
085  /**
086   * The state class. Used solely to replace results atomically during
087   * compactions and avoid complicated error handling.
088   */
089  private static class State {
090    /**
091     * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
092     * here. It is invariant that the start row of the stripe is the end row of the previous one
093     * (and is an open boundary for the first one).
094     */
095    public byte[][] stripeEndRows = new byte[0][];
096
097    /**
098     * Files by stripe. Each element of the list corresponds to stripeEndRow element with the
099     * same index, except the last one. Inside each list, the files are in reverse order by
100     * seqNum. Note that the length of this is one higher than that of stripeEndKeys.
101     */
102    public ArrayList<ImmutableList<HStoreFile>> stripeFiles = new ArrayList<>();
103    /** Level 0. The files are in reverse order by seqNum. */
104    public ImmutableList<HStoreFile> level0Files = ImmutableList.of();
105
106    /** Cached list of all files in the structure, to return from some calls */
107    public ImmutableList<HStoreFile> allFilesCached = ImmutableList.of();
108    private ImmutableList<HStoreFile> allCompactedFilesCached = ImmutableList.of();
109  }
110  private State state = null;
111
112  /** Cached file metadata (or overrides as the case may be) */
113  private HashMap<HStoreFile, byte[]> fileStarts = new HashMap<>();
114  private HashMap<HStoreFile, byte[]> fileEnds = new HashMap<>();
115  /** Normally invalid key is null, but in the map null is the result for "no key"; so use
116   * the following constant value in these maps instead. Note that this is a constant and
117   * we use it to compare by reference when we read from the map. */
118  private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
119
120  private final CellComparator cellComparator;
121  private StripeStoreConfig config;
122
123  private final int blockingFileCount;
124
125  public StripeStoreFileManager(
126      CellComparator kvComparator, Configuration conf, StripeStoreConfig config) {
127    this.cellComparator = kvComparator;
128    this.config = config;
129    this.blockingFileCount = conf.getInt(
130        HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
131  }
132
133  @Override
134  public void loadFiles(List<HStoreFile> storeFiles) {
135    loadUnclassifiedStoreFiles(storeFiles);
136  }
137
138  @Override
139  public Collection<HStoreFile> getStorefiles() {
140    return state.allFilesCached;
141  }
142
143  @Override
144  public Collection<HStoreFile> getCompactedfiles() {
145    return state.allCompactedFilesCached;
146  }
147
148  @Override
149  public int getCompactedFilesCount() {
150    return state.allCompactedFilesCached.size();
151  }
152
153  @Override
154  public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
155    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
156    // Passing null does not cause NPE??
157    cmc.mergeResults(null, sfs);
158    debugDumpState("Added new files");
159  }
160
161  @Override
162  public ImmutableCollection<HStoreFile> clearFiles() {
163    ImmutableCollection<HStoreFile> result = state.allFilesCached;
164    this.state = new State();
165    this.fileStarts.clear();
166    this.fileEnds.clear();
167    return result;
168  }
169
170  @Override
171  public ImmutableCollection<HStoreFile> clearCompactedFiles() {
172    ImmutableCollection<HStoreFile> result = state.allCompactedFilesCached;
173    this.state = new State();
174    return result;
175  }
176
177  @Override
178  public int getStorefileCount() {
179    return state.allFilesCached.size();
180  }
181
182  /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)}
183   * for details on this methods. */
184  @Override
185  public Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
186    KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
187    // Order matters for this call.
188    result.addSublist(state.level0Files);
189    if (!state.stripeFiles.isEmpty()) {
190      int lastStripeIndex = findStripeForRow(CellUtil.cloneRow(targetKey), false);
191      for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
192        result.addSublist(state.stripeFiles.get(stripeIndex));
193      }
194    }
195    return result.iterator();
196  }
197
198  /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and
199   * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, Cell)}
200   * for details on this methods. */
201  @Override
202  public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(
203      Iterator<HStoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) {
204    KeyBeforeConcatenatedLists.Iterator original =
205        (KeyBeforeConcatenatedLists.Iterator)candidateFiles;
206    assert original != null;
207    ArrayList<List<HStoreFile>> components = original.getComponents();
208    for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
209      HStoreFile sf = components.get(firstIrrelevant).get(0);
210      byte[] endKey = endOf(sf);
211      // Entries are ordered as such: L0, then stripes in reverse order. We never remove
212      // level 0; we remove the stripe, and all subsequent ones, as soon as we find the
213      // first one that cannot possibly have better candidates.
214      if (!isInvalid(endKey) && !isOpen(endKey)
215          && (nonOpenRowCompare(targetKey, endKey) >= 0)) {
216        original.removeComponents(firstIrrelevant);
217        break;
218      }
219    }
220    return original;
221  }
222
223  /**
224   * Override of getSplitPoint that determines the split point as the boundary between two
225   * stripes, unless it causes significant imbalance between split sides' sizes. In that
226   * case, the split boundary will be chosen from the middle of one of the stripes to
227   * minimize imbalance.
228   * @return The split point, or null if no split is possible.
229   */
230  @Override
231  public Optional<byte[]> getSplitPoint() throws IOException {
232    if (this.getStorefileCount() == 0) {
233      return Optional.empty();
234    }
235    if (state.stripeFiles.size() <= 1) {
236      return getSplitPointFromAllFiles();
237    }
238    int leftIndex = -1, rightIndex = state.stripeFiles.size();
239    long leftSize = 0, rightSize = 0;
240    long lastLeftSize = 0, lastRightSize = 0;
241    while (rightIndex - 1 != leftIndex) {
242      if (leftSize >= rightSize) {
243        --rightIndex;
244        lastRightSize = getStripeFilesSize(rightIndex);
245        rightSize += lastRightSize;
246      } else {
247        ++leftIndex;
248        lastLeftSize = getStripeFilesSize(leftIndex);
249        leftSize += lastLeftSize;
250      }
251    }
252    if (leftSize == 0 || rightSize == 0) {
253      String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
254          + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
255      debugDumpState(errMsg);
256      LOG.warn(errMsg);
257      return getSplitPointFromAllFiles();
258    }
259    double ratio = (double)rightSize / leftSize;
260    if (ratio < 1) {
261      ratio = 1 / ratio;
262    }
263    if (config.getMaxSplitImbalance() > ratio) {
264      return Optional.of(state.stripeEndRows[leftIndex]);
265    }
266
267    // If the difference between the sides is too large, we could get the proportional key on
268    // the a stripe to equalize the difference, but there's no proportional key method at the
269    // moment, and it's not extremely important.
270    // See if we can achieve better ratio if we split the bigger side in half.
271    boolean isRightLarger = rightSize >= leftSize;
272    double newRatio = isRightLarger
273        ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
274        : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
275    if (newRatio < 1) {
276      newRatio = 1 / newRatio;
277    }
278    if (newRatio >= ratio) {
279      return Optional.of(state.stripeEndRows[leftIndex]);
280    }
281    LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
282        + newRatio + " configured ratio " + config.getMaxSplitImbalance());
283    // OK, we may get better ratio, get it.
284    return StoreUtils.getSplitPoint(state.stripeFiles.get(isRightLarger ? rightIndex : leftIndex),
285      cellComparator);
286  }
287
288  private Optional<byte[]> getSplitPointFromAllFiles() throws IOException {
289    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
290    sfs.addSublist(state.level0Files);
291    sfs.addAllSublists(state.stripeFiles);
292    return StoreUtils.getSplitPoint(sfs, cellComparator);
293  }
294
295  private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
296    return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
297  }
298
299  @Override
300  public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
301      byte[] stopRow, boolean includeStopRow) {
302    if (state.stripeFiles.isEmpty()) {
303      return state.level0Files; // There's just L0.
304    }
305
306    int firstStripe = findStripeForRow(startRow, true);
307    int lastStripe = findStripeForRow(stopRow, false);
308    assert firstStripe <= lastStripe;
309    if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
310      return state.stripeFiles.get(firstStripe); // There's just one stripe we need.
311    }
312    if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
313      return state.allFilesCached; // We need to read all files.
314    }
315
316    ConcatenatedLists<HStoreFile> result = new ConcatenatedLists<>();
317    result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
318    result.addSublist(state.level0Files);
319    return result;
320  }
321
322  @Override
323  public void addCompactionResults(
324    Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results) throws IOException {
325    // See class comment for the assumptions we make here.
326    LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
327        + " files replaced by " + results.size());
328    // In order to be able to fail in the middle of the operation, we'll operate on lazy
329    // copies and apply the result at the end.
330    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
331    cmc.mergeResults(compactedFiles, results);
332    markCompactedAway(compactedFiles);
333    debugDumpState("Merged compaction results");
334  }
335
336  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
337  // Let a background thread close the actual reader on these compacted files and also
338  // ensure to evict the blocks from block cache so that they are no longer in
339  // cache
340  private void markCompactedAway(Collection<HStoreFile> compactedFiles) {
341    for (HStoreFile file : compactedFiles) {
342      file.markCompactedAway();
343    }
344  }
345
346  @Override
347  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException {
348    // See class comment for the assumptions we make here.
349    LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
350    // In order to be able to fail in the middle of the operation, we'll operate on lazy
351    // copies and apply the result at the end.
352    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
353    cmc.deleteResults(compactedFiles);
354    debugDumpState("Deleted compaction results");
355  }
356
357  @Override
358  public int getStoreCompactionPriority() {
359    // If there's only L0, do what the default store does.
360    // If we are in critical priority, do the same - we don't want to trump all stores all
361    // the time due to how many files we have.
362    int fc = getStorefileCount();
363    if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
364      return this.blockingFileCount - fc;
365    }
366    // If we are in good shape, we don't want to be trumped by all other stores due to how
367    // many files we have, so do an approximate mapping to normal priority range; L0 counts
368    // for all stripes.
369    int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
370    int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
371    return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
372  }
373
374  /**
375   * Gets the total size of all files in the stripe.
376   * @param stripeIndex Stripe index.
377   * @return Size.
378   */
379  private long getStripeFilesSize(int stripeIndex) {
380    long result = 0;
381    for (HStoreFile sf : state.stripeFiles.get(stripeIndex)) {
382      result += sf.getReader().length();
383    }
384    return result;
385  }
386
387  /**
388   * Loads initial store files that were picked up from some physical location pertaining to
389   * this store (presumably). Unlike adding files after compaction, assumes empty initial
390   * sets, and is forgiving with regard to stripe constraints - at worst, many/all files will
391   * go to level 0.
392   * @param storeFiles Store files to add.
393   */
394  private void loadUnclassifiedStoreFiles(List<HStoreFile> storeFiles) {
395    LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
396    TreeMap<byte[], ArrayList<HStoreFile>> candidateStripes = new TreeMap<>(MAP_COMPARATOR);
397    ArrayList<HStoreFile> level0Files = new ArrayList<>();
398    // Separate the files into tentative stripes; then validate. Currently, we rely on metadata.
399    // If needed, we could dynamically determine the stripes in future.
400    for (HStoreFile sf : storeFiles) {
401      byte[] startRow = startOf(sf), endRow = endOf(sf);
402      // Validate the range and put the files into place.
403      if (isInvalid(startRow) || isInvalid(endRow)) {
404        insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0.
405        ensureLevel0Metadata(sf);
406      } else if (!isOpen(startRow) && !isOpen(endRow) &&
407          nonOpenRowCompare(startRow, endRow) >= 0) {
408        LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
409          + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
410        insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also.
411        ensureLevel0Metadata(sf);
412      } else {
413        ArrayList<HStoreFile> stripe = candidateStripes.get(endRow);
414        if (stripe == null) {
415          stripe = new ArrayList<>();
416          candidateStripes.put(endRow, stripe);
417        }
418        insertFileIntoStripe(stripe, sf);
419      }
420    }
421    // Possible improvement - for variable-count stripes, if all the files are in L0, we can
422    // instead create single, open-ended stripe with all files.
423
424    boolean hasOverlaps = false;
425    byte[] expectedStartRow = null; // first stripe can start wherever
426    Iterator<Map.Entry<byte[], ArrayList<HStoreFile>>> entryIter =
427        candidateStripes.entrySet().iterator();
428    while (entryIter.hasNext()) {
429      Map.Entry<byte[], ArrayList<HStoreFile>> entry = entryIter.next();
430      ArrayList<HStoreFile> files = entry.getValue();
431      // Validate the file start rows, and remove the bad ones to level 0.
432      for (int i = 0; i < files.size(); ++i) {
433        HStoreFile sf = files.get(i);
434        byte[] startRow = startOf(sf);
435        if (expectedStartRow == null) {
436          expectedStartRow = startRow; // ensure that first stripe is still consistent
437        } else if (!rowEquals(expectedStartRow, startRow)) {
438          hasOverlaps = true;
439          LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
440              + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
441              + "], to L0 it goes");
442          HStoreFile badSf = files.remove(i);
443          insertFileIntoStripe(level0Files, badSf);
444          ensureLevel0Metadata(badSf);
445          --i;
446        }
447      }
448      // Check if any files from the candidate stripe are valid. If so, add a stripe.
449      byte[] endRow = entry.getKey();
450      if (!files.isEmpty()) {
451        expectedStartRow = endRow; // Next stripe must start exactly at that key.
452      } else {
453        entryIter.remove();
454      }
455    }
456
457    // In the end, there must be open ends on two sides. If not, and there were no errors i.e.
458    // files are consistent, they might be coming from a split. We will treat the boundaries
459    // as open keys anyway, and log the message.
460    // If there were errors, we'll play it safe and dump everything into L0.
461    if (!candidateStripes.isEmpty()) {
462      HStoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
463      boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
464      if (!isOpen) {
465        LOG.warn("The range of the loaded files does not cover full key space: from ["
466            + Bytes.toString(startOf(firstFile)) + "], to ["
467            + Bytes.toString(candidateStripes.lastKey()) + "]");
468        if (!hasOverlaps) {
469          ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
470          ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
471        } else {
472          LOG.warn("Inconsistent files, everything goes to L0.");
473          for (ArrayList<HStoreFile> files : candidateStripes.values()) {
474            for (HStoreFile sf : files) {
475              insertFileIntoStripe(level0Files, sf);
476              ensureLevel0Metadata(sf);
477            }
478          }
479          candidateStripes.clear();
480        }
481      }
482    }
483
484    // Copy the results into the fields.
485    State state = new State();
486    state.level0Files = ImmutableList.copyOf(level0Files);
487    state.stripeFiles = new ArrayList<>(candidateStripes.size());
488    state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
489    ArrayList<HStoreFile> newAllFiles = new ArrayList<>(level0Files);
490    int i = candidateStripes.size() - 1;
491    for (Map.Entry<byte[], ArrayList<HStoreFile>> entry : candidateStripes.entrySet()) {
492      state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
493      newAllFiles.addAll(entry.getValue());
494      if (i > 0) {
495        state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
496      }
497      --i;
498    }
499    state.allFilesCached = ImmutableList.copyOf(newAllFiles);
500    this.state = state;
501    debugDumpState("Files loaded");
502  }
503
504  private void ensureEdgeStripeMetadata(ArrayList<HStoreFile> stripe, boolean isFirst) {
505    HashMap<HStoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
506    for (HStoreFile sf : stripe) {
507      targetMap.put(sf, OPEN_KEY);
508    }
509  }
510
511  private void ensureLevel0Metadata(HStoreFile sf) {
512    if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
513    if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
514  }
515
516  private void debugDumpState(String string) {
517    if (!LOG.isDebugEnabled()) return;
518    StringBuilder sb = new StringBuilder();
519    sb.append("\n" + string + "; current stripe state is as such:");
520    sb.append("\n level 0 with ")
521        .append(state.level0Files.size())
522        .append(
523          " files: "
524              + TraditionalBinaryPrefix.long2String(
525                StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";");
526    for (int i = 0; i < state.stripeFiles.size(); ++i) {
527      String endRow = (i == state.stripeEndRows.length)
528          ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
529      sb.append("\n stripe ending in ")
530          .append(endRow)
531          .append(" with ")
532          .append(state.stripeFiles.get(i).size())
533          .append(
534            " files: "
535                + TraditionalBinaryPrefix.long2String(
536                  StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";");
537    }
538    sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
539    sb.append("\n").append(getStorefileCount()).append(" files total.");
540    LOG.debug(sb.toString());
541  }
542
543  /**
544   * Checks whether the key indicates an open interval boundary (i.e. infinity).
545   */
546  private static final boolean isOpen(byte[] key) {
547    return key != null && key.length == 0;
548  }
549
550  private static final boolean isOpen(Cell key) {
551    return key != null && key.getRowLength() == 0;
552  }
553
554  /**
555   * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files).
556   */
557  private static final boolean isInvalid(byte[] key) {
558    // No need to use Arrays.equals because INVALID_KEY is null
559    return key == INVALID_KEY;
560  }
561
562  /**
563   * Compare two keys for equality.
564   */
565  private final boolean rowEquals(byte[] k1, byte[] k2) {
566    return Bytes.equals(k1, 0, k1.length, k2, 0, k2.length);
567  }
568
569  /**
570   * Compare two keys. Keys must not be open (isOpen(row) == false).
571   */
572  private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
573    assert !isOpen(k1) && !isOpen(k2);
574    return Bytes.compareTo(k1, k2);
575  }
576
577  private final int nonOpenRowCompare(Cell k1, byte[] k2) {
578    assert !isOpen(k1) && !isOpen(k2);
579    return cellComparator.compareRows(k1, k2, 0, k2.length);
580  }
581
582  /**
583   * Finds the stripe index by end row.
584   */
585  private final int findStripeIndexByEndRow(byte[] endRow) {
586    assert !isInvalid(endRow);
587    if (isOpen(endRow)) return state.stripeEndRows.length;
588    return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
589  }
590
591  /**
592   * Finds the stripe index for the stripe containing a row provided externally for get/scan.
593   */
594  private final int findStripeForRow(byte[] row, boolean isStart) {
595    if (isStart && Arrays.equals(row, HConstants.EMPTY_START_ROW)) return 0;
596    if (!isStart && Arrays.equals(row, HConstants.EMPTY_END_ROW)) {
597      return state.stripeFiles.size() - 1;
598    }
599    // If there's an exact match below, a stripe ends at "row". Stripe right boundary is
600    // exclusive, so that means the row is in the next stripe; thus, we need to add one to index.
601    // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where
602    // insertion point is the index of the next greater element, or list size if none. The
603    // insertion point happens to be exactly what we need, so we need to add one to the result.
604    return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
605  }
606
607  @Override
608  public final byte[] getStartRow(int stripeIndex) {
609    return (stripeIndex == 0  ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
610  }
611
612  @Override
613  public final byte[] getEndRow(int stripeIndex) {
614    return (stripeIndex == state.stripeEndRows.length
615        ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
616  }
617
618
619  private byte[] startOf(HStoreFile sf) {
620    byte[] result = fileStarts.get(sf);
621
622    // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter
623    // serves only as a marker and is not to be confused with other empty byte arrays.
624    // See Javadoc of INVALID_KEY_IN_MAP for more information
625    return (result == null)
626             ? sf.getMetadataValue(STRIPE_START_KEY)
627             : result == INVALID_KEY_IN_MAP ? INVALID_KEY : result;
628  }
629
630  private byte[] endOf(HStoreFile sf) {
631    byte[] result = fileEnds.get(sf);
632
633    // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter
634    // serves only as a marker and is not to be confused with other empty byte arrays.
635    // See Javadoc of INVALID_KEY_IN_MAP for more information
636    return (result == null)
637             ? sf.getMetadataValue(STRIPE_END_KEY)
638             : result == INVALID_KEY_IN_MAP ? INVALID_KEY : result;
639  }
640
641  /**
642   * Inserts a file in the correct place (by seqnum) in a stripe copy.
643   * @param stripe Stripe copy to insert into.
644   * @param sf File to insert.
645   */
646  private static void insertFileIntoStripe(ArrayList<HStoreFile> stripe, HStoreFile sf) {
647    // The only operation for which sorting of the files matters is KeyBefore. Therefore,
648    // we will store the file in reverse order by seqNum from the outset.
649    for (int insertBefore = 0; ; ++insertBefore) {
650      if (insertBefore == stripe.size()
651          || (StoreFileComparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
652        stripe.add(insertBefore, sf);
653        break;
654      }
655    }
656  }
657
658  /**
659   * An extension of ConcatenatedLists that has several peculiar properties.
660   * First, one can cut the tail of the logical list by removing last several sub-lists.
661   * Second, items can be removed thru iterator.
662   * Third, if the sub-lists are immutable, they are replaced with mutable copies when needed.
663   * On average KeyBefore operation will contain half the stripes as potential candidates,
664   * but will quickly cut down on them as it finds something in the more likely ones; thus,
665   * the above allow us to avoid unnecessary copying of a bunch of lists.
666   */
667  private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<HStoreFile> {
668    @Override
669    public java.util.Iterator<HStoreFile> iterator() {
670      return new Iterator();
671    }
672
673    public class Iterator extends ConcatenatedLists<HStoreFile>.Iterator {
674      public ArrayList<List<HStoreFile>> getComponents() {
675        return components;
676      }
677
678      public void removeComponents(int startIndex) {
679        List<List<HStoreFile>> subList = components.subList(startIndex, components.size());
680        for (List<HStoreFile> entry : subList) {
681          size -= entry.size();
682        }
683        assert size >= 0;
684        subList.clear();
685      }
686
687      @Override
688      public void remove() {
689        if (!this.nextWasCalled) {
690          throw new IllegalStateException("No element to remove");
691        }
692        this.nextWasCalled = false;
693        List<HStoreFile> src = components.get(currentComponent);
694        if (src instanceof ImmutableList<?>) {
695          src = new ArrayList<>(src);
696          components.set(currentComponent, src);
697        }
698        src.remove(indexWithinComponent);
699        --size;
700        --indexWithinComponent;
701        if (src.isEmpty()) {
702          components.remove(currentComponent); // indexWithinComponent is already -1 here.
703        }
704      }
705    }
706  }
707
708  /**
709   * Non-static helper class for merging compaction or flush results.
710   * Since we want to merge them atomically (more or less), it operates on lazy copies,
711   * then creates a new state object and puts it in place.
712   */
713  private class CompactionOrFlushMergeCopy {
714    private ArrayList<List<HStoreFile>> stripeFiles = null;
715    private ArrayList<HStoreFile> level0Files = null;
716    private ArrayList<byte[]> stripeEndRows = null;
717
718    private Collection<HStoreFile> compactedFiles = null;
719    private Collection<HStoreFile> results = null;
720
721    private List<HStoreFile> l0Results = new ArrayList<>();
722    private final boolean isFlush;
723
724    public CompactionOrFlushMergeCopy(boolean isFlush) {
725      // Create a lazy mutable copy (other fields are so lazy they start out as nulls).
726      this.stripeFiles = new ArrayList<>(StripeStoreFileManager.this.state.stripeFiles);
727      this.isFlush = isFlush;
728    }
729
730    private void mergeResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results)
731        throws IOException {
732      assert this.compactedFiles == null && this.results == null;
733      this.compactedFiles = compactedFiles;
734      this.results = results;
735      // Do logical processing.
736      if (!isFlush) removeCompactedFiles();
737      TreeMap<byte[], HStoreFile> newStripes = processResults();
738      if (newStripes != null) {
739        processNewCandidateStripes(newStripes);
740      }
741      // Create new state and update parent.
742      State state = createNewState(false);
743      StripeStoreFileManager.this.state = state;
744      updateMetadataMaps();
745    }
746
747    private void deleteResults(Collection<HStoreFile> compactedFiles) throws IOException {
748      this.compactedFiles = compactedFiles;
749      // Create new state and update parent.
750      State state = createNewState(true);
751      StripeStoreFileManager.this.state = state;
752      updateMetadataMaps();
753    }
754
755    private State createNewState(boolean delCompactedFiles) {
756      State oldState = StripeStoreFileManager.this.state;
757      // Stripe count should be the same unless the end rows changed.
758      assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
759      State newState = new State();
760      newState.level0Files = (this.level0Files == null) ? oldState.level0Files
761          : ImmutableList.copyOf(this.level0Files);
762      newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
763          : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
764      newState.stripeFiles = new ArrayList<>(this.stripeFiles.size());
765      for (List<HStoreFile> newStripe : this.stripeFiles) {
766        newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
767            ? (ImmutableList<HStoreFile>)newStripe : ImmutableList.copyOf(newStripe));
768      }
769
770      List<HStoreFile> newAllFiles = new ArrayList<>(oldState.allFilesCached);
771      List<HStoreFile> newAllCompactedFiles = new ArrayList<>(oldState.allCompactedFilesCached);
772      if (!isFlush) {
773        newAllFiles.removeAll(compactedFiles);
774        if (delCompactedFiles) {
775          newAllCompactedFiles.removeAll(compactedFiles);
776        } else {
777          newAllCompactedFiles.addAll(compactedFiles);
778        }
779      }
780      if (results != null) {
781        newAllFiles.addAll(results);
782      }
783      newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
784      newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles);
785      return newState;
786    }
787
788    private void updateMetadataMaps() {
789      StripeStoreFileManager parent = StripeStoreFileManager.this;
790      if (!isFlush) {
791        for (HStoreFile sf : this.compactedFiles) {
792          parent.fileStarts.remove(sf);
793          parent.fileEnds.remove(sf);
794        }
795      }
796      if (this.l0Results != null) {
797        for (HStoreFile sf : this.l0Results) {
798          parent.ensureLevel0Metadata(sf);
799        }
800      }
801    }
802
803    /**
804     * @param index Index of the stripe we need.
805     * @return A lazy stripe copy from current stripes.
806     */
807    private final ArrayList<HStoreFile> getStripeCopy(int index) {
808      List<HStoreFile> stripeCopy = this.stripeFiles.get(index);
809      ArrayList<HStoreFile> result = null;
810      if (stripeCopy instanceof ImmutableList<?>) {
811        result = new ArrayList<>(stripeCopy);
812        this.stripeFiles.set(index, result);
813      } else {
814        result = (ArrayList<HStoreFile>)stripeCopy;
815      }
816      return result;
817    }
818
819    /**
820     * @return A lazy L0 copy from current state.
821     */
822    private final ArrayList<HStoreFile> getLevel0Copy() {
823      if (this.level0Files == null) {
824        this.level0Files = new ArrayList<>(StripeStoreFileManager.this.state.level0Files);
825      }
826      return this.level0Files;
827    }
828
829    /**
830     * Process new files, and add them either to the structure of existing stripes,
831     * or to the list of new candidate stripes.
832     * @return New candidate stripes.
833     */
834    private TreeMap<byte[], HStoreFile> processResults() throws IOException {
835      TreeMap<byte[], HStoreFile> newStripes = null;
836      for (HStoreFile sf : this.results) {
837        byte[] startRow = startOf(sf), endRow = endOf(sf);
838        if (isInvalid(endRow) || isInvalid(startRow)) {
839          if (!isFlush) {
840            LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
841          }
842          insertFileIntoStripe(getLevel0Copy(), sf);
843          this.l0Results.add(sf);
844          continue;
845        }
846        if (!this.stripeFiles.isEmpty()) {
847          int stripeIndex = findStripeIndexByEndRow(endRow);
848          if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
849            // Simple/common case - add file to an existing stripe.
850            insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
851            continue;
852          }
853        }
854
855        // Make a new candidate stripe.
856        if (newStripes == null) {
857          newStripes = new TreeMap<>(MAP_COMPARATOR);
858        }
859        HStoreFile oldSf = newStripes.put(endRow, sf);
860        if (oldSf != null) {
861          throw new IOException("Compactor has produced multiple files for the stripe ending in ["
862              + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
863        }
864      }
865      return newStripes;
866    }
867
868    /**
869     * Remove compacted files.
870     */
871    private void removeCompactedFiles() throws IOException {
872      for (HStoreFile oldFile : this.compactedFiles) {
873        byte[] oldEndRow = endOf(oldFile);
874        List<HStoreFile> source = null;
875        if (isInvalid(oldEndRow)) {
876          source = getLevel0Copy();
877        } else {
878          int stripeIndex = findStripeIndexByEndRow(oldEndRow);
879          if (stripeIndex < 0) {
880            throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
881                + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
882          }
883          source = getStripeCopy(stripeIndex);
884        }
885        if (!source.remove(oldFile)) {
886          throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
887        }
888      }
889    }
890
891    /**
892     * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
893     * new candidate stripes/removes old stripes; produces new set of stripe end rows.
894     * @param newStripes  New stripes - files by end row.
895     */
896    private void processNewCandidateStripes(
897        TreeMap<byte[], HStoreFile> newStripes) throws IOException {
898      // Validate that the removed and added aggregate ranges still make for a full key space.
899      boolean hasStripes = !this.stripeFiles.isEmpty();
900      this.stripeEndRows = new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
901      int removeFrom = 0;
902      byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
903      byte[] lastEndRow = newStripes.lastKey();
904      if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
905        throw new IOException("Newly created stripes do not cover the entire key space.");
906      }
907
908      boolean canAddNewStripes = true;
909      Collection<HStoreFile> filesForL0 = null;
910      if (hasStripes) {
911        // Determine which stripes will need to be removed because they conflict with new stripes.
912        // The new boundaries should match old stripe boundaries, so we should get exact matches.
913        if (isOpen(firstStartRow)) {
914          removeFrom = 0;
915        } else {
916          removeFrom = findStripeIndexByEndRow(firstStartRow);
917          if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
918          ++removeFrom;
919        }
920        int removeTo = findStripeIndexByEndRow(lastEndRow);
921        if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
922        // See if there are files in the stripes we are trying to replace.
923        ArrayList<HStoreFile> conflictingFiles = new ArrayList<>();
924        for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
925          conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
926        }
927        if (!conflictingFiles.isEmpty()) {
928          // This can be caused by two things - concurrent flush into stripes, or a bug.
929          // Unfortunately, we cannot tell them apart without looking at timing or something
930          // like that. We will assume we are dealing with a flush and dump it into L0.
931          if (isFlush) {
932            long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
933            LOG.warn("Stripes were created by a flush, but results of size " + newSize
934                + " cannot be added because the stripes have changed");
935            canAddNewStripes = false;
936            filesForL0 = newStripes.values();
937          } else {
938            long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
939            LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
940                + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
941            filesForL0 = conflictingFiles;
942          }
943          if (filesForL0 != null) {
944            for (HStoreFile sf : filesForL0) {
945              insertFileIntoStripe(getLevel0Copy(), sf);
946            }
947            l0Results.addAll(filesForL0);
948          }
949        }
950
951        if (canAddNewStripes) {
952          // Remove old empty stripes.
953          int originalCount = this.stripeFiles.size();
954          for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
955            if (removeIndex != originalCount - 1) {
956              this.stripeEndRows.remove(removeIndex);
957            }
958            this.stripeFiles.remove(removeIndex);
959          }
960        }
961      }
962
963      if (!canAddNewStripes) return; // Files were already put into L0.
964
965      // Now, insert new stripes. The total ranges match, so we can insert where we removed.
966      byte[] previousEndRow = null;
967      int insertAt = removeFrom;
968      for (Map.Entry<byte[], HStoreFile> newStripe : newStripes.entrySet()) {
969        if (previousEndRow != null) {
970          // Validate that the ranges are contiguous.
971          assert !isOpen(previousEndRow);
972          byte[] startRow = startOf(newStripe.getValue());
973          if (!rowEquals(previousEndRow, startRow)) {
974            throw new IOException("The new stripes produced by "
975                + (isFlush ? "flush" : "compaction") + " are not contiguous");
976          }
977        }
978        // Add the new stripe.
979        ArrayList<HStoreFile> tmp = new ArrayList<>();
980        tmp.add(newStripe.getValue());
981        stripeFiles.add(insertAt, tmp);
982        previousEndRow = newStripe.getKey();
983        if (!isOpen(previousEndRow)) {
984          stripeEndRows.add(insertAt, previousEndRow);
985        }
986        ++insertAt;
987      }
988    }
989  }
990
991  @Override
992  public List<HStoreFile> getLevel0Files() {
993    return this.state.level0Files;
994  }
995
996  @Override
997  public List<byte[]> getStripeBoundaries() {
998    if (this.state.stripeFiles.isEmpty()) {
999      return Collections.emptyList();
1000    }
1001    ArrayList<byte[]> result = new ArrayList<>(this.state.stripeEndRows.length + 2);
1002    result.add(OPEN_KEY);
1003    Collections.addAll(result, this.state.stripeEndRows);
1004    result.add(OPEN_KEY);
1005    return result;
1006  }
1007
1008  @Override
1009  public ArrayList<ImmutableList<HStoreFile>> getStripes() {
1010    return this.state.stripeFiles;
1011  }
1012
1013  @Override
1014  public int getStripeCount() {
1015    return this.state.stripeFiles.size();
1016  }
1017
1018  @Override
1019  public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) {
1020    // 1) We can never get rid of the last file which has the maximum seqid in a stripe.
1021    // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
1022    State state = this.state;
1023    Collection<HStoreFile> expiredStoreFiles = null;
1024    for (ImmutableList<HStoreFile> stripe : state.stripeFiles) {
1025      expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
1026    }
1027    return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
1028  }
1029
1030  private Collection<HStoreFile> findExpiredFiles(ImmutableList<HStoreFile> stripe, long maxTs,
1031      List<HStoreFile> filesCompacting, Collection<HStoreFile> expiredStoreFiles) {
1032    // Order by seqnum is reversed.
1033    for (int i = 1; i < stripe.size(); ++i) {
1034      HStoreFile sf = stripe.get(i);
1035      synchronized (sf) {
1036        long fileTs = sf.getReader().getMaxTimestamp();
1037        if (fileTs < maxTs && !filesCompacting.contains(sf)) {
1038          LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimestamp is "
1039              + fileTs + ", which is below " + maxTs);
1040          if (expiredStoreFiles == null) {
1041            expiredStoreFiles = new ArrayList<>();
1042          }
1043          expiredStoreFiles.add(sf);
1044        }
1045      }
1046    }
1047    return expiredStoreFiles;
1048  }
1049
1050  @Override
1051  public double getCompactionPressure() {
1052    State stateLocal = this.state;
1053    if (stateLocal.allFilesCached.size() > blockingFileCount) {
1054      // just a hit to tell others that we have reached the blocking file count.
1055      return 2.0;
1056    }
1057    if (stateLocal.stripeFiles.isEmpty()) {
1058      return 0.0;
1059    }
1060    int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size();
1061    // do not calculate L0 separately because data will be moved to stripe quickly and in most cases
1062    // we flush data to stripe directly.
1063    int delta = stateLocal.level0Files.isEmpty() ? 0 : 1;
1064    double max = 0.0;
1065    for (ImmutableList<HStoreFile> stripeFile : stateLocal.stripeFiles) {
1066      int stripeFileCount = stripeFile.size();
1067      double normCount =
1068          (double) (stripeFileCount + delta - config.getStripeCompactMinFiles())
1069              / (blockingFilePerStripe - config.getStripeCompactMinFiles());
1070      if (normCount >= 1.0) {
1071        // This could happen if stripe is not split evenly. Do not return values that larger than
1072        // 1.0 because we have not reached the blocking file count actually.
1073        return 1.0;
1074      }
1075      if (normCount > max) {
1076        max = normCount;
1077      }
1078    }
1079    return max;
1080  }
1081
1082  @Override
1083  public Comparator<HStoreFile> getStoreFileComparator() {
1084    return StoreFileComparators.SEQ_ID;
1085  }
1086}