001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.TreeMap;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.ConcatenatedLists;
041import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
048
049/**
050 * Stripe implementation of StoreFileManager. Not thread safe - relies on external locking (in
051 * HStore). Collections that this class returns are immutable or unique to the call, so they should
052 * be safe. Stripe store splits the key space of the region into non-overlapping stripes, as well as
053 * some recent files that have all the keys (level 0). Each stripe contains a set of files. When L0
054 * is compacted, it's split into the files corresponding to existing stripe boundaries, that can
055 * thus be added to stripes. When scan or get happens, it only has to read the files from the
056 * corresponding stripes. See StripeCompationPolicy on how the stripes are determined; this class
057 * doesn't care. This class should work together with StripeCompactionPolicy and StripeCompactor.
058 * With regard to how they work, we make at least the following (reasonable) assumptions: -
059 * Compaction produces one file per new stripe (if any); that is easy to change. - Compaction has
060 * one contiguous set of stripes both in and out, except if L0 is involved.
061 */
062@InterfaceAudience.Private
063public class StripeStoreFileManager
064  implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
065  private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFileManager.class);
066
067  /**
068   * The file metadata fields that contain the stripe information.
069   */
070  public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
071  public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
072
073  private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
074
075  /**
076   * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf).
077   */
078  public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
079  final static byte[] INVALID_KEY = null;
080
081  /**
082   * The state class. Used solely to replace results atomically during compactions and avoid
083   * complicated error handling.
084   */
085  private static class State {
086    /**
087     * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
088     * here. It is invariant that the start row of the stripe is the end row of the previous one
089     * (and is an open boundary for the first one).
090     */
091    public byte[][] stripeEndRows = new byte[0][];
092
093    /**
094     * Files by stripe. Each element of the list corresponds to stripeEndRow element with the same
095     * index, except the last one. Inside each list, the files are in reverse order by seqNum. Note
096     * that the length of this is one higher than that of stripeEndKeys.
097     */
098    public ArrayList<ImmutableList<HStoreFile>> stripeFiles = new ArrayList<>();
099    /** Level 0. The files are in reverse order by seqNum. */
100    public ImmutableList<HStoreFile> level0Files = ImmutableList.of();
101
102    /** Cached list of all files in the structure, to return from some calls */
103    public ImmutableList<HStoreFile> allFilesCached = ImmutableList.of();
104    private ImmutableList<HStoreFile> allCompactedFilesCached = ImmutableList.of();
105  }
106
107  private State state = null;
108
109  /** Cached file metadata (or overrides as the case may be) */
110  private HashMap<HStoreFile, byte[]> fileStarts = new HashMap<>();
111  private HashMap<HStoreFile, byte[]> fileEnds = new HashMap<>();
112  /**
113   * Normally invalid key is null, but in the map null is the result for "no key"; so use the
114   * following constant value in these maps instead. Note that this is a constant and we use it to
115   * compare by reference when we read from the map.
116   */
117  private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
118
119  private final CellComparator cellComparator;
120  private StripeStoreConfig config;
121
122  private final int blockingFileCount;
123
124  public StripeStoreFileManager(CellComparator kvComparator, Configuration conf,
125    StripeStoreConfig config) {
126    this.cellComparator = kvComparator;
127    this.config = config;
128    this.blockingFileCount =
129      conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
130  }
131
132  @Override
133  public void loadFiles(List<HStoreFile> storeFiles) {
134    loadUnclassifiedStoreFiles(storeFiles);
135  }
136
137  @Override
138  public Collection<HStoreFile> getStorefiles() {
139    return state.allFilesCached;
140  }
141
142  @Override
143  public Collection<HStoreFile> getCompactedfiles() {
144    return state.allCompactedFilesCached;
145  }
146
147  @Override
148  public int getCompactedFilesCount() {
149    return state.allCompactedFilesCached.size();
150  }
151
152  @Override
153  public void insertNewFiles(Collection<HStoreFile> sfs) {
154    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
155    cmc.mergeResults(Collections.emptyList(), sfs);
156    debugDumpState("Added new files");
157  }
158
159  @Override
160  public ImmutableCollection<HStoreFile> clearFiles() {
161    ImmutableCollection<HStoreFile> result = state.allFilesCached;
162    this.state = new State();
163    this.fileStarts.clear();
164    this.fileEnds.clear();
165    return result;
166  }
167
168  @Override
169  public ImmutableCollection<HStoreFile> clearCompactedFiles() {
170    ImmutableCollection<HStoreFile> result = state.allCompactedFilesCached;
171    this.state = new State();
172    return result;
173  }
174
175  @Override
176  public int getStorefileCount() {
177    return state.allFilesCached.size();
178  }
179
180  /**
181   * See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} for details on this
182   * methods.
183   */
184  @Override
185  public Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
186    KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
187    // Order matters for this call.
188    result.addSublist(state.level0Files);
189    if (!state.stripeFiles.isEmpty()) {
190      int lastStripeIndex = findStripeForRow(CellUtil.cloneRow(targetKey), false);
191      for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
192        result.addSublist(state.stripeFiles.get(stripeIndex));
193      }
194    }
195    return result.iterator();
196  }
197
198  /**
199   * See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and
200   * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, Cell)} for
201   * details on this methods.
202   */
203  @Override
204  public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(
205    Iterator<HStoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) {
206    KeyBeforeConcatenatedLists.Iterator original =
207      (KeyBeforeConcatenatedLists.Iterator) candidateFiles;
208    assert original != null;
209    ArrayList<List<HStoreFile>> components = original.getComponents();
210    for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
211      HStoreFile sf = components.get(firstIrrelevant).get(0);
212      byte[] endKey = endOf(sf);
213      // Entries are ordered as such: L0, then stripes in reverse order. We never remove
214      // level 0; we remove the stripe, and all subsequent ones, as soon as we find the
215      // first one that cannot possibly have better candidates.
216      if (!isInvalid(endKey) && !isOpen(endKey) && (nonOpenRowCompare(targetKey, endKey) >= 0)) {
217        original.removeComponents(firstIrrelevant);
218        break;
219      }
220    }
221    return original;
222  }
223
224  /**
225   * Override of getSplitPoint that determines the split point as the boundary between two stripes,
226   * unless it causes significant imbalance between split sides' sizes. In that case, the split
227   * boundary will be chosen from the middle of one of the stripes to minimize imbalance.
228   * @return The split point, or null if no split is possible.
229   */
230  @Override
231  public Optional<byte[]> getSplitPoint() throws IOException {
232    if (this.getStorefileCount() == 0) {
233      return Optional.empty();
234    }
235    if (state.stripeFiles.size() <= 1) {
236      return getSplitPointFromAllFiles();
237    }
238    int leftIndex = -1, rightIndex = state.stripeFiles.size();
239    long leftSize = 0, rightSize = 0;
240    long lastLeftSize = 0, lastRightSize = 0;
241    while (rightIndex - 1 != leftIndex) {
242      if (leftSize >= rightSize) {
243        --rightIndex;
244        lastRightSize = getStripeFilesSize(rightIndex);
245        rightSize += lastRightSize;
246      } else {
247        ++leftIndex;
248        lastLeftSize = getStripeFilesSize(leftIndex);
249        leftSize += lastLeftSize;
250      }
251    }
252    if (leftSize == 0 || rightSize == 0) {
253      String errMsg = String.format(
254        "Cannot split on a boundary - left index %d size %d, " + "right index %d size %d",
255        leftIndex, leftSize, rightIndex, rightSize);
256      debugDumpState(errMsg);
257      LOG.warn(errMsg);
258      return getSplitPointFromAllFiles();
259    }
260    double ratio = (double) rightSize / leftSize;
261    if (ratio < 1) {
262      ratio = 1 / ratio;
263    }
264    if (config.getMaxSplitImbalance() > ratio) {
265      return Optional.of(state.stripeEndRows[leftIndex]);
266    }
267
268    // If the difference between the sides is too large, we could get the proportional key on
269    // the a stripe to equalize the difference, but there's no proportional key method at the
270    // moment, and it's not extremely important.
271    // See if we can achieve better ratio if we split the bigger side in half.
272    boolean isRightLarger = rightSize >= leftSize;
273    double newRatio = isRightLarger
274      ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
275      : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
276    if (newRatio < 1) {
277      newRatio = 1 / newRatio;
278    }
279    if (newRatio >= ratio) {
280      return Optional.of(state.stripeEndRows[leftIndex]);
281    }
282    LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split " + newRatio
283      + " configured ratio " + config.getMaxSplitImbalance());
284    // OK, we may get better ratio, get it.
285    return StoreUtils.getSplitPoint(state.stripeFiles.get(isRightLarger ? rightIndex : leftIndex),
286      cellComparator);
287  }
288
289  private Optional<byte[]> getSplitPointFromAllFiles() throws IOException {
290    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
291    sfs.addSublist(state.level0Files);
292    sfs.addAllSublists(state.stripeFiles);
293    return StoreUtils.getSplitPoint(sfs, cellComparator);
294  }
295
296  private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
297    return (double) (largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
298  }
299
300  @Override
301  public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
302    byte[] stopRow, boolean includeStopRow) {
303    if (state.stripeFiles.isEmpty()) {
304      return state.level0Files; // There's just L0.
305    }
306
307    int firstStripe = findStripeForRow(startRow, true);
308    int lastStripe = findStripeForRow(stopRow, false);
309    assert firstStripe <= lastStripe;
310    if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
311      return state.stripeFiles.get(firstStripe); // There's just one stripe we need.
312    }
313    if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
314      return state.allFilesCached; // We need to read all files.
315    }
316
317    ConcatenatedLists<HStoreFile> result = new ConcatenatedLists<>();
318    result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
319    result.addSublist(state.level0Files);
320    return result;
321  }
322
323  @Override
324  public void addCompactionResults(Collection<HStoreFile> compactedFiles,
325    Collection<HStoreFile> results) {
326    // See class comment for the assumptions we make here.
327    LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
328      + " files replaced by " + results.size());
329    // In order to be able to fail in the middle of the operation, we'll operate on lazy
330    // copies and apply the result at the end.
331    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
332    cmc.mergeResults(compactedFiles, results);
333    markCompactedAway(compactedFiles);
334    debugDumpState("Merged compaction results");
335  }
336
337  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
338  // Let a background thread close the actual reader on these compacted files and also
339  // ensure to evict the blocks from block cache so that they are no longer in
340  // cache
341  private void markCompactedAway(Collection<HStoreFile> compactedFiles) {
342    for (HStoreFile file : compactedFiles) {
343      file.markCompactedAway();
344    }
345  }
346
347  @Override
348  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
349    // See class comment for the assumptions we make here.
350    LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
351    // In order to be able to fail in the middle of the operation, we'll operate on lazy
352    // copies and apply the result at the end.
353    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
354    cmc.deleteResults(compactedFiles);
355    debugDumpState("Deleted compaction results");
356  }
357
358  @Override
359  public int getStoreCompactionPriority() {
360    // If there's only L0, do what the default store does.
361    // If we are in critical priority, do the same - we don't want to trump all stores all
362    // the time due to how many files we have.
363    int fc = getStorefileCount();
364    if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
365      return this.blockingFileCount - fc;
366    }
367    // If we are in good shape, we don't want to be trumped by all other stores due to how
368    // many files we have, so do an approximate mapping to normal priority range; L0 counts
369    // for all stripes.
370    int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
371    int priority = (int) Math.ceil(((double) (this.blockingFileCount - fc + l0) / sc) - l0);
372    return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
373  }
374
375  /**
376   * Gets the total size of all files in the stripe.
377   * @param stripeIndex Stripe index.
378   * @return Size.
379   */
380  private long getStripeFilesSize(int stripeIndex) {
381    long result = 0;
382    for (HStoreFile sf : state.stripeFiles.get(stripeIndex)) {
383      result += sf.getReader().length();
384    }
385    return result;
386  }
387
388  /**
389   * Loads initial store files that were picked up from some physical location pertaining to this
390   * store (presumably). Unlike adding files after compaction, assumes empty initial sets, and is
391   * forgiving with regard to stripe constraints - at worst, many/all files will go to level 0.
392   * @param storeFiles Store files to add.
393   */
394  private void loadUnclassifiedStoreFiles(List<HStoreFile> storeFiles) {
395    LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
396    TreeMap<byte[], ArrayList<HStoreFile>> candidateStripes = new TreeMap<>(MAP_COMPARATOR);
397    ArrayList<HStoreFile> level0Files = new ArrayList<>();
398    // Separate the files into tentative stripes; then validate. Currently, we rely on metadata.
399    // If needed, we could dynamically determine the stripes in future.
400    for (HStoreFile sf : storeFiles) {
401      byte[] startRow = startOf(sf), endRow = endOf(sf);
402      // Validate the range and put the files into place.
403      if (isInvalid(startRow) || isInvalid(endRow)) {
404        insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0.
405        ensureLevel0Metadata(sf);
406      } else if (!isOpen(startRow) && !isOpen(endRow) && nonOpenRowCompare(startRow, endRow) >= 0) {
407        LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
408          + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
409        insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also.
410        ensureLevel0Metadata(sf);
411      } else {
412        ArrayList<HStoreFile> stripe = candidateStripes.get(endRow);
413        if (stripe == null) {
414          stripe = new ArrayList<>();
415          candidateStripes.put(endRow, stripe);
416        }
417        insertFileIntoStripe(stripe, sf);
418      }
419    }
420    // Possible improvement - for variable-count stripes, if all the files are in L0, we can
421    // instead create single, open-ended stripe with all files.
422
423    boolean hasOverlaps = false;
424    byte[] expectedStartRow = null; // first stripe can start wherever
425    Iterator<Map.Entry<byte[], ArrayList<HStoreFile>>> entryIter =
426      candidateStripes.entrySet().iterator();
427    while (entryIter.hasNext()) {
428      Map.Entry<byte[], ArrayList<HStoreFile>> entry = entryIter.next();
429      ArrayList<HStoreFile> files = entry.getValue();
430      // Validate the file start rows, and remove the bad ones to level 0.
431      for (int i = 0; i < files.size(); ++i) {
432        HStoreFile sf = files.get(i);
433        byte[] startRow = startOf(sf);
434        if (expectedStartRow == null) {
435          expectedStartRow = startRow; // ensure that first stripe is still consistent
436        } else if (!rowEquals(expectedStartRow, startRow)) {
437          hasOverlaps = true;
438          LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
439            + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
440            + "], to L0 it goes");
441          HStoreFile badSf = files.remove(i);
442          insertFileIntoStripe(level0Files, badSf);
443          ensureLevel0Metadata(badSf);
444          --i;
445        }
446      }
447      // Check if any files from the candidate stripe are valid. If so, add a stripe.
448      byte[] endRow = entry.getKey();
449      if (!files.isEmpty()) {
450        expectedStartRow = endRow; // Next stripe must start exactly at that key.
451      } else {
452        entryIter.remove();
453      }
454    }
455
456    // In the end, there must be open ends on two sides. If not, and there were no errors i.e.
457    // files are consistent, they might be coming from a split. We will treat the boundaries
458    // as open keys anyway, and log the message.
459    // If there were errors, we'll play it safe and dump everything into L0.
460    if (!candidateStripes.isEmpty()) {
461      HStoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
462      boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
463      if (!isOpen) {
464        LOG.warn("The range of the loaded files does not cover full key space: from ["
465          + Bytes.toString(startOf(firstFile)) + "], to ["
466          + Bytes.toString(candidateStripes.lastKey()) + "]");
467        if (!hasOverlaps) {
468          ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
469          ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
470        } else {
471          LOG.warn("Inconsistent files, everything goes to L0.");
472          for (ArrayList<HStoreFile> files : candidateStripes.values()) {
473            for (HStoreFile sf : files) {
474              insertFileIntoStripe(level0Files, sf);
475              ensureLevel0Metadata(sf);
476            }
477          }
478          candidateStripes.clear();
479        }
480      }
481    }
482
483    // Copy the results into the fields.
484    State state = new State();
485    state.level0Files = ImmutableList.copyOf(level0Files);
486    state.stripeFiles = new ArrayList<>(candidateStripes.size());
487    state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
488    ArrayList<HStoreFile> newAllFiles = new ArrayList<>(level0Files);
489    int i = candidateStripes.size() - 1;
490    for (Map.Entry<byte[], ArrayList<HStoreFile>> entry : candidateStripes.entrySet()) {
491      state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
492      newAllFiles.addAll(entry.getValue());
493      if (i > 0) {
494        state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
495      }
496      --i;
497    }
498    state.allFilesCached = ImmutableList.copyOf(newAllFiles);
499    this.state = state;
500    debugDumpState("Files loaded");
501  }
502
503  private void ensureEdgeStripeMetadata(ArrayList<HStoreFile> stripe, boolean isFirst) {
504    HashMap<HStoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
505    for (HStoreFile sf : stripe) {
506      targetMap.put(sf, OPEN_KEY);
507    }
508  }
509
510  private void ensureLevel0Metadata(HStoreFile sf) {
511    if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
512    if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
513  }
514
515  private void debugDumpState(String string) {
516    if (!LOG.isDebugEnabled()) return;
517    StringBuilder sb = new StringBuilder();
518    sb.append("\n" + string + "; current stripe state is as such:");
519    sb.append("\n level 0 with ").append(state.level0Files.size())
520      .append(" files: " + TraditionalBinaryPrefix
521        .long2String(StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";");
522    for (int i = 0; i < state.stripeFiles.size(); ++i) {
523      String endRow = (i == state.stripeEndRows.length)
524        ? "(end)"
525        : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
526      sb.append("\n stripe ending in ").append(endRow).append(" with ")
527        .append(state.stripeFiles.get(i).size())
528        .append(" files: " + TraditionalBinaryPrefix.long2String(
529          StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";");
530    }
531    sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
532    sb.append("\n").append(getStorefileCount()).append(" files total.");
533    LOG.debug(sb.toString());
534  }
535
536  /**
537   * Checks whether the key indicates an open interval boundary (i.e. infinity).
538   */
539  private static final boolean isOpen(byte[] key) {
540    return key != null && key.length == 0;
541  }
542
543  private static final boolean isOpen(Cell key) {
544    return key != null && key.getRowLength() == 0;
545  }
546
547  /**
548   * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files).
549   */
550  private static final boolean isInvalid(byte[] key) {
551    // No need to use Arrays.equals because INVALID_KEY is null
552    return key == INVALID_KEY;
553  }
554
555  /**
556   * Compare two keys for equality.
557   */
558  private final boolean rowEquals(byte[] k1, byte[] k2) {
559    return Bytes.equals(k1, 0, k1.length, k2, 0, k2.length);
560  }
561
562  /**
563   * Compare two keys. Keys must not be open (isOpen(row) == false).
564   */
565  private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
566    assert !isOpen(k1) && !isOpen(k2);
567    return Bytes.compareTo(k1, k2);
568  }
569
570  private final int nonOpenRowCompare(Cell k1, byte[] k2) {
571    assert !isOpen(k1) && !isOpen(k2);
572    return cellComparator.compareRows(k1, k2, 0, k2.length);
573  }
574
575  /**
576   * Finds the stripe index by end row.
577   */
578  private final int findStripeIndexByEndRow(byte[] endRow) {
579    assert !isInvalid(endRow);
580    if (isOpen(endRow)) return state.stripeEndRows.length;
581    return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
582  }
583
584  /**
585   * Finds the stripe index for the stripe containing a row provided externally for get/scan.
586   */
587  private final int findStripeForRow(byte[] row, boolean isStart) {
588    if (isStart && Arrays.equals(row, HConstants.EMPTY_START_ROW)) return 0;
589    if (!isStart && Arrays.equals(row, HConstants.EMPTY_END_ROW)) {
590      return state.stripeFiles.size() - 1;
591    }
592    // If there's an exact match below, a stripe ends at "row". Stripe right boundary is
593    // exclusive, so that means the row is in the next stripe; thus, we need to add one to index.
594    // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where
595    // insertion point is the index of the next greater element, or list size if none. The
596    // insertion point happens to be exactly what we need, so we need to add one to the result.
597    return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
598  }
599
600  @Override
601  public final byte[] getStartRow(int stripeIndex) {
602    return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
603  }
604
605  @Override
606  public final byte[] getEndRow(int stripeIndex) {
607    return (stripeIndex == state.stripeEndRows.length
608      ? OPEN_KEY
609      : state.stripeEndRows[stripeIndex]);
610  }
611
612  private byte[] startOf(HStoreFile sf) {
613    byte[] result = fileStarts.get(sf);
614
615    // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter
616    // serves only as a marker and is not to be confused with other empty byte arrays.
617    // See Javadoc of INVALID_KEY_IN_MAP for more information
618    return (result == null) ? sf.getMetadataValue(STRIPE_START_KEY)
619      : result == INVALID_KEY_IN_MAP ? INVALID_KEY
620      : result;
621  }
622
623  private byte[] endOf(HStoreFile sf) {
624    byte[] result = fileEnds.get(sf);
625
626    // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter
627    // serves only as a marker and is not to be confused with other empty byte arrays.
628    // See Javadoc of INVALID_KEY_IN_MAP for more information
629    return (result == null) ? sf.getMetadataValue(STRIPE_END_KEY)
630      : result == INVALID_KEY_IN_MAP ? INVALID_KEY
631      : result;
632  }
633
634  /**
635   * Inserts a file in the correct place (by seqnum) in a stripe copy.
636   * @param stripe Stripe copy to insert into.
637   * @param sf     File to insert.
638   */
639  private static void insertFileIntoStripe(ArrayList<HStoreFile> stripe, HStoreFile sf) {
640    // The only operation for which sorting of the files matters is KeyBefore. Therefore,
641    // we will store the file in reverse order by seqNum from the outset.
642    for (int insertBefore = 0;; ++insertBefore) {
643      if (
644        insertBefore == stripe.size()
645          || (StoreFileComparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)
646      ) {
647        stripe.add(insertBefore, sf);
648        break;
649      }
650    }
651  }
652
653  /**
654   * An extension of ConcatenatedLists that has several peculiar properties. First, one can cut the
655   * tail of the logical list by removing last several sub-lists. Second, items can be removed thru
656   * iterator. Third, if the sub-lists are immutable, they are replaced with mutable copies when
657   * needed. On average KeyBefore operation will contain half the stripes as potential candidates,
658   * but will quickly cut down on them as it finds something in the more likely ones; thus, the
659   * above allow us to avoid unnecessary copying of a bunch of lists.
660   */
661  private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<HStoreFile> {
662    @Override
663    public java.util.Iterator<HStoreFile> iterator() {
664      return new Iterator();
665    }
666
667    public class Iterator extends ConcatenatedLists<HStoreFile>.Iterator {
668      public ArrayList<List<HStoreFile>> getComponents() {
669        return components;
670      }
671
672      public void removeComponents(int startIndex) {
673        List<List<HStoreFile>> subList = components.subList(startIndex, components.size());
674        for (List<HStoreFile> entry : subList) {
675          size -= entry.size();
676        }
677        assert size >= 0;
678        subList.clear();
679      }
680
681      @Override
682      public void remove() {
683        if (!this.nextWasCalled) {
684          throw new IllegalStateException("No element to remove");
685        }
686        this.nextWasCalled = false;
687        List<HStoreFile> src = components.get(currentComponent);
688        if (src instanceof ImmutableList<?>) {
689          src = new ArrayList<>(src);
690          components.set(currentComponent, src);
691        }
692        src.remove(indexWithinComponent);
693        --size;
694        --indexWithinComponent;
695        if (src.isEmpty()) {
696          components.remove(currentComponent); // indexWithinComponent is already -1 here.
697        }
698      }
699    }
700  }
701
702  /**
703   * Non-static helper class for merging compaction or flush results. Since we want to merge them
704   * atomically (more or less), it operates on lazy copies, then creates a new state object and puts
705   * it in place.
706   */
707  private class CompactionOrFlushMergeCopy {
708    private ArrayList<List<HStoreFile>> stripeFiles = null;
709    private ArrayList<HStoreFile> level0Files = null;
710    private ArrayList<byte[]> stripeEndRows = null;
711
712    private Collection<HStoreFile> compactedFiles = null;
713    private Collection<HStoreFile> results = null;
714
715    private List<HStoreFile> l0Results = new ArrayList<>();
716    private final boolean isFlush;
717
718    public CompactionOrFlushMergeCopy(boolean isFlush) {
719      // Create a lazy mutable copy (other fields are so lazy they start out as nulls).
720      this.stripeFiles = new ArrayList<>(StripeStoreFileManager.this.state.stripeFiles);
721      this.isFlush = isFlush;
722    }
723
724    private void mergeResults(Collection<HStoreFile> compactedFiles,
725      Collection<HStoreFile> results) {
726      assert this.compactedFiles == null && this.results == null;
727      this.compactedFiles = compactedFiles;
728      this.results = results;
729      // Do logical processing.
730      if (!isFlush) {
731        removeCompactedFiles();
732      }
733      TreeMap<byte[], HStoreFile> newStripes = processResults();
734      if (newStripes != null) {
735        processNewCandidateStripes(newStripes);
736      }
737      // Create new state and update parent.
738      State state = createNewState(false);
739      StripeStoreFileManager.this.state = state;
740      updateMetadataMaps();
741    }
742
743    private void deleteResults(Collection<HStoreFile> compactedFiles) {
744      this.compactedFiles = compactedFiles;
745      // Create new state and update parent.
746      State state = createNewState(true);
747      StripeStoreFileManager.this.state = state;
748      updateMetadataMaps();
749    }
750
751    private State createNewState(boolean delCompactedFiles) {
752      State oldState = StripeStoreFileManager.this.state;
753      // Stripe count should be the same unless the end rows changed.
754      assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
755      State newState = new State();
756      newState.level0Files =
757        (this.level0Files == null) ? oldState.level0Files : ImmutableList.copyOf(this.level0Files);
758      newState.stripeEndRows = (this.stripeEndRows == null)
759        ? oldState.stripeEndRows
760        : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
761      newState.stripeFiles = new ArrayList<>(this.stripeFiles.size());
762      for (List<HStoreFile> newStripe : this.stripeFiles) {
763        newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
764          ? (ImmutableList<HStoreFile>) newStripe
765          : ImmutableList.copyOf(newStripe));
766      }
767
768      List<HStoreFile> newAllFiles = new ArrayList<>(oldState.allFilesCached);
769      List<HStoreFile> newAllCompactedFiles = new ArrayList<>(oldState.allCompactedFilesCached);
770      if (!isFlush) {
771        newAllFiles.removeAll(compactedFiles);
772        if (delCompactedFiles) {
773          newAllCompactedFiles.removeAll(compactedFiles);
774        } else {
775          newAllCompactedFiles.addAll(compactedFiles);
776        }
777      }
778      if (results != null) {
779        newAllFiles.addAll(results);
780      }
781      newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
782      newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles);
783      return newState;
784    }
785
786    private void updateMetadataMaps() {
787      StripeStoreFileManager parent = StripeStoreFileManager.this;
788      if (!isFlush) {
789        for (HStoreFile sf : this.compactedFiles) {
790          parent.fileStarts.remove(sf);
791          parent.fileEnds.remove(sf);
792        }
793      }
794      if (this.l0Results != null) {
795        for (HStoreFile sf : this.l0Results) {
796          parent.ensureLevel0Metadata(sf);
797        }
798      }
799    }
800
801    /**
802     * @param index Index of the stripe we need.
803     * @return A lazy stripe copy from current stripes.
804     */
805    private final ArrayList<HStoreFile> getStripeCopy(int index) {
806      List<HStoreFile> stripeCopy = this.stripeFiles.get(index);
807      ArrayList<HStoreFile> result = null;
808      if (stripeCopy instanceof ImmutableList<?>) {
809        result = new ArrayList<>(stripeCopy);
810        this.stripeFiles.set(index, result);
811      } else {
812        result = (ArrayList<HStoreFile>) stripeCopy;
813      }
814      return result;
815    }
816
817    /** Returns A lazy L0 copy from current state. */
818    private final ArrayList<HStoreFile> getLevel0Copy() {
819      if (this.level0Files == null) {
820        this.level0Files = new ArrayList<>(StripeStoreFileManager.this.state.level0Files);
821      }
822      return this.level0Files;
823    }
824
825    /**
826     * Process new files, and add them either to the structure of existing stripes, or to the list
827     * of new candidate stripes.
828     * @return New candidate stripes.
829     */
830    private TreeMap<byte[], HStoreFile> processResults() {
831      TreeMap<byte[], HStoreFile> newStripes = null;
832      for (HStoreFile sf : this.results) {
833        byte[] startRow = startOf(sf), endRow = endOf(sf);
834        if (isInvalid(endRow) || isInvalid(startRow)) {
835          if (!isFlush) {
836            LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
837          }
838          insertFileIntoStripe(getLevel0Copy(), sf);
839          this.l0Results.add(sf);
840          continue;
841        }
842        if (!this.stripeFiles.isEmpty()) {
843          int stripeIndex = findStripeIndexByEndRow(endRow);
844          if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
845            // Simple/common case - add file to an existing stripe.
846            insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
847            continue;
848          }
849        }
850
851        // Make a new candidate stripe.
852        if (newStripes == null) {
853          newStripes = new TreeMap<>(MAP_COMPARATOR);
854        }
855        HStoreFile oldSf = newStripes.put(endRow, sf);
856        if (oldSf != null) {
857          throw new IllegalStateException(
858            "Compactor has produced multiple files for the stripe ending in ["
859              + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
860        }
861      }
862      return newStripes;
863    }
864
865    /**
866     * Remove compacted files.
867     */
868    private void removeCompactedFiles() {
869      for (HStoreFile oldFile : this.compactedFiles) {
870        byte[] oldEndRow = endOf(oldFile);
871        List<HStoreFile> source = null;
872        if (isInvalid(oldEndRow)) {
873          source = getLevel0Copy();
874        } else {
875          int stripeIndex = findStripeIndexByEndRow(oldEndRow);
876          if (stripeIndex < 0) {
877            throw new IllegalStateException(
878              "An allegedly compacted file [" + oldFile + "] does not belong"
879                + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
880          }
881          source = getStripeCopy(stripeIndex);
882        }
883        if (!source.remove(oldFile)) {
884          LOG.warn("An allegedly compacted file [{}] was not found", oldFile);
885        }
886      }
887    }
888
889    /**
890     * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with new
891     * candidate stripes/removes old stripes; produces new set of stripe end rows.
892     * @param newStripes New stripes - files by end row.
893     */
894    private void processNewCandidateStripes(TreeMap<byte[], HStoreFile> newStripes) {
895      // Validate that the removed and added aggregate ranges still make for a full key space.
896      boolean hasStripes = !this.stripeFiles.isEmpty();
897      this.stripeEndRows =
898        new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
899      int removeFrom = 0;
900      byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
901      byte[] lastEndRow = newStripes.lastKey();
902      if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
903        throw new IllegalStateException("Newly created stripes do not cover the entire key space.");
904      }
905
906      boolean canAddNewStripes = true;
907      Collection<HStoreFile> filesForL0 = null;
908      if (hasStripes) {
909        // Determine which stripes will need to be removed because they conflict with new stripes.
910        // The new boundaries should match old stripe boundaries, so we should get exact matches.
911        if (isOpen(firstStartRow)) {
912          removeFrom = 0;
913        } else {
914          removeFrom = findStripeIndexByEndRow(firstStartRow);
915          if (removeFrom < 0) {
916            throw new IllegalStateException("Compaction is trying to add a bad range.");
917          }
918          ++removeFrom;
919        }
920        int removeTo = findStripeIndexByEndRow(lastEndRow);
921        if (removeTo < 0) {
922          throw new IllegalStateException("Compaction is trying to add a bad range.");
923        }
924        // See if there are files in the stripes we are trying to replace.
925        ArrayList<HStoreFile> conflictingFiles = new ArrayList<>();
926        for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
927          conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
928        }
929        if (!conflictingFiles.isEmpty()) {
930          // This can be caused by two things - concurrent flush into stripes, or a bug.
931          // Unfortunately, we cannot tell them apart without looking at timing or something
932          // like that. We will assume we are dealing with a flush and dump it into L0.
933          if (isFlush) {
934            long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
935            LOG.warn("Stripes were created by a flush, but results of size " + newSize
936              + " cannot be added because the stripes have changed");
937            canAddNewStripes = false;
938            filesForL0 = newStripes.values();
939          } else {
940            long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
941            LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
942              + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
943            filesForL0 = conflictingFiles;
944          }
945          if (filesForL0 != null) {
946            for (HStoreFile sf : filesForL0) {
947              insertFileIntoStripe(getLevel0Copy(), sf);
948            }
949            l0Results.addAll(filesForL0);
950          }
951        }
952
953        if (canAddNewStripes) {
954          // Remove old empty stripes.
955          int originalCount = this.stripeFiles.size();
956          for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
957            if (removeIndex != originalCount - 1) {
958              this.stripeEndRows.remove(removeIndex);
959            }
960            this.stripeFiles.remove(removeIndex);
961          }
962        }
963      }
964
965      if (!canAddNewStripes) {
966        return; // Files were already put into L0.
967      }
968
969      // Now, insert new stripes. The total ranges match, so we can insert where we removed.
970      byte[] previousEndRow = null;
971      int insertAt = removeFrom;
972      for (Map.Entry<byte[], HStoreFile> newStripe : newStripes.entrySet()) {
973        if (previousEndRow != null) {
974          // Validate that the ranges are contiguous.
975          assert !isOpen(previousEndRow);
976          byte[] startRow = startOf(newStripe.getValue());
977          if (!rowEquals(previousEndRow, startRow)) {
978            throw new IllegalStateException("The new stripes produced by "
979              + (isFlush ? "flush" : "compaction") + " are not contiguous");
980          }
981        }
982        // Add the new stripe.
983        ArrayList<HStoreFile> tmp = new ArrayList<>();
984        tmp.add(newStripe.getValue());
985        stripeFiles.add(insertAt, tmp);
986        previousEndRow = newStripe.getKey();
987        if (!isOpen(previousEndRow)) {
988          stripeEndRows.add(insertAt, previousEndRow);
989        }
990        ++insertAt;
991      }
992    }
993  }
994
995  @Override
996  public List<HStoreFile> getLevel0Files() {
997    return this.state.level0Files;
998  }
999
1000  @Override
1001  public List<byte[]> getStripeBoundaries() {
1002    if (this.state.stripeFiles.isEmpty()) {
1003      return Collections.emptyList();
1004    }
1005    ArrayList<byte[]> result = new ArrayList<>(this.state.stripeEndRows.length + 2);
1006    result.add(OPEN_KEY);
1007    Collections.addAll(result, this.state.stripeEndRows);
1008    result.add(OPEN_KEY);
1009    return result;
1010  }
1011
1012  @Override
1013  public ArrayList<ImmutableList<HStoreFile>> getStripes() {
1014    return this.state.stripeFiles;
1015  }
1016
1017  @Override
1018  public int getStripeCount() {
1019    return this.state.stripeFiles.size();
1020  }
1021
1022  @Override
1023  public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) {
1024    // 1) We can never get rid of the last file which has the maximum seqid in a stripe.
1025    // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
1026    State state = this.state;
1027    Collection<HStoreFile> expiredStoreFiles = null;
1028    for (ImmutableList<HStoreFile> stripe : state.stripeFiles) {
1029      expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
1030    }
1031    return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
1032  }
1033
1034  private Collection<HStoreFile> findExpiredFiles(ImmutableList<HStoreFile> stripe, long maxTs,
1035    List<HStoreFile> filesCompacting, Collection<HStoreFile> expiredStoreFiles) {
1036    // Order by seqnum is reversed.
1037    for (int i = 1; i < stripe.size(); ++i) {
1038      HStoreFile sf = stripe.get(i);
1039      synchronized (sf) {
1040        long fileTs = sf.getReader().getMaxTimestamp();
1041        if (fileTs < maxTs && !filesCompacting.contains(sf)) {
1042          LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimestamp is "
1043            + fileTs + ", which is below " + maxTs);
1044          if (expiredStoreFiles == null) {
1045            expiredStoreFiles = new ArrayList<>();
1046          }
1047          expiredStoreFiles.add(sf);
1048        }
1049      }
1050    }
1051    return expiredStoreFiles;
1052  }
1053
1054  @Override
1055  public double getCompactionPressure() {
1056    State stateLocal = this.state;
1057    if (stateLocal.allFilesCached.size() > blockingFileCount) {
1058      // just a hit to tell others that we have reached the blocking file count.
1059      return 2.0;
1060    }
1061    if (stateLocal.stripeFiles.isEmpty()) {
1062      return 0.0;
1063    }
1064    int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size();
1065    // do not calculate L0 separately because data will be moved to stripe quickly and in most cases
1066    // we flush data to stripe directly.
1067    int delta = stateLocal.level0Files.isEmpty() ? 0 : 1;
1068    double max = 0.0;
1069    for (ImmutableList<HStoreFile> stripeFile : stateLocal.stripeFiles) {
1070      int stripeFileCount = stripeFile.size();
1071      double normCount = (double) (stripeFileCount + delta - config.getStripeCompactMinFiles())
1072        / (blockingFilePerStripe - config.getStripeCompactMinFiles());
1073      if (normCount >= 1.0) {
1074        // This could happen if stripe is not split evenly. Do not return values that larger than
1075        // 1.0 because we have not reached the blocking file count actually.
1076        return 1.0;
1077      }
1078      if (normCount > max) {
1079        max = normCount;
1080      }
1081    }
1082    return max;
1083  }
1084
1085  @Override
1086  public Comparator<HStoreFile> getStoreFileComparator() {
1087    return StoreFileComparators.SEQ_ID;
1088  }
1089}