001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.TreeMap;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.ConcatenatedLists;
041import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
048
049/**
050 * Stripe implementation of {@link StoreFileManager}. Not thread safe - relies on external locking
051 * (in HStore). Collections that this class returns are immutable or unique to the call, so they
052 * should be safe. Stripe store splits the key space of the region into non-overlapping stripes, as
053 * well as some recent files that have all the keys (level 0). Each stripe contains a set of files.
054 * When L0 is compacted, it's split into the files corresponding to existing stripe boundaries, that
055 * can thus be added to stripes. When scan or get happens, it only has to read the files from the
056 * corresponding stripes. See {@link StripeCompactionPolicy} on how the stripes are determined; this
057 * class doesn't care. This class should work together with {@link StripeCompactionPolicy} and
058 * {@link org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor}. With regard to how they
059 * work, we make at least the following (reasonable) assumptions: - Compaction produces one file per
060 * new stripe (if any); that is easy to change. - Compaction has one contiguous set of stripes both
061 * in and out, except if L0 is involved.
062 */
063@InterfaceAudience.Private
064public class StripeStoreFileManager
065  implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
066  private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFileManager.class);
067
068  /**
069   * The file metadata fields that contain the stripe information.
070   */
071  public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
072  public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
073
074  private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
075
076  /**
077   * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf).
078   */
079  public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
080  final static byte[] INVALID_KEY = null;
081
082  /**
083   * The state class. Used solely to replace results atomically during compactions and avoid
084   * complicated error handling.
085   */
086  private static class State {
087    /**
088     * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
089     * here. It is invariant that the start row of the stripe is the end row of the previous one
090     * (and is an open boundary for the first one).
091     */
092    public byte[][] stripeEndRows = new byte[0][];
093
094    /**
095     * Files by stripe. Each element of the list corresponds to stripeEndRow element with the same
096     * index, except the last one. Inside each list, the files are in reverse order by seqNum. Note
097     * that the length of this is one higher than that of stripeEndKeys.
098     */
099    public ArrayList<ImmutableList<HStoreFile>> stripeFiles = new ArrayList<>();
100    /** Level 0. The files are in reverse order by seqNum. */
101    public ImmutableList<HStoreFile> level0Files = ImmutableList.of();
102
103    /** Cached list of all files in the structure, to return from some calls */
104    public ImmutableList<HStoreFile> allFilesCached = ImmutableList.of();
105    private ImmutableList<HStoreFile> allCompactedFilesCached = ImmutableList.of();
106  }
107
108  private State state = null;
109
110  /** Cached file metadata (or overrides as the case may be) */
111  private HashMap<HStoreFile, byte[]> fileStarts = new HashMap<>();
112  private HashMap<HStoreFile, byte[]> fileEnds = new HashMap<>();
113  /**
114   * Normally invalid key is null, but in the map null is the result for "no key"; so use the
115   * following constant value in these maps instead. Note that this is a constant and we use it to
116   * compare by reference when we read from the map.
117   */
118  private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
119
120  private final CellComparator cellComparator;
121  private StripeStoreConfig config;
122
123  private final int blockingFileCount;
124
125  public StripeStoreFileManager(CellComparator kvComparator, Configuration conf,
126    StripeStoreConfig config) {
127    this.cellComparator = kvComparator;
128    this.config = config;
129    this.blockingFileCount =
130      conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
131  }
132
133  @Override
134  public void loadFiles(List<HStoreFile> storeFiles) {
135    loadUnclassifiedStoreFiles(storeFiles);
136  }
137
138  @Override
139  public Collection<HStoreFile> getStoreFiles() {
140    return state.allFilesCached;
141  }
142
143  @Override
144  public Collection<HStoreFile> getCompactedfiles() {
145    return state.allCompactedFilesCached;
146  }
147
148  @Override
149  public int getCompactedFilesCount() {
150    return state.allCompactedFilesCached.size();
151  }
152
153  @Override
154  public void insertNewFiles(Collection<HStoreFile> sfs) {
155    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
156    cmc.mergeResults(Collections.emptyList(), sfs);
157    debugDumpState("Added new files");
158  }
159
160  @Override
161  public ImmutableCollection<HStoreFile> clearFiles() {
162    ImmutableCollection<HStoreFile> result = state.allFilesCached;
163    this.state = new State();
164    this.fileStarts.clear();
165    this.fileEnds.clear();
166    return result;
167  }
168
169  @Override
170  public ImmutableCollection<HStoreFile> clearCompactedFiles() {
171    ImmutableCollection<HStoreFile> result = state.allCompactedFilesCached;
172    this.state = new State();
173    return result;
174  }
175
176  @Override
177  public int getStorefileCount() {
178    return state.allFilesCached.size();
179  }
180
181  /**
182   * See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} for details on this
183   * methods.
184   */
185  @Override
186  public Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
187    KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
188    // Order matters for this call.
189    result.addSublist(state.level0Files);
190    if (!state.stripeFiles.isEmpty()) {
191      int lastStripeIndex = findStripeForRow(CellUtil.cloneRow(targetKey), false);
192      for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
193        result.addSublist(state.stripeFiles.get(stripeIndex));
194      }
195    }
196    return result.iterator();
197  }
198
199  /**
200   * See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and
201   * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, Cell)} for
202   * details on this methods.
203   */
204  @Override
205  public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(
206    Iterator<HStoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) {
207    KeyBeforeConcatenatedLists.Iterator original =
208      (KeyBeforeConcatenatedLists.Iterator) candidateFiles;
209    assert original != null;
210    ArrayList<List<HStoreFile>> components = original.getComponents();
211    for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
212      HStoreFile sf = components.get(firstIrrelevant).get(0);
213      byte[] endKey = endOf(sf);
214      // Entries are ordered as such: L0, then stripes in reverse order. We never remove
215      // level 0; we remove the stripe, and all subsequent ones, as soon as we find the
216      // first one that cannot possibly have better candidates.
217      if (!isInvalid(endKey) && !isOpen(endKey) && (nonOpenRowCompare(targetKey, endKey) >= 0)) {
218        original.removeComponents(firstIrrelevant);
219        break;
220      }
221    }
222    return original;
223  }
224
225  /**
226   * Override of getSplitPoint that determines the split point as the boundary between two stripes,
227   * unless it causes significant imbalance between split sides' sizes. In that case, the split
228   * boundary will be chosen from the middle of one of the stripes to minimize imbalance.
229   * @return The split point, or null if no split is possible.
230   */
231  @Override
232  public Optional<byte[]> getSplitPoint() throws IOException {
233    if (this.getStorefileCount() == 0) {
234      return Optional.empty();
235    }
236    if (state.stripeFiles.size() <= 1) {
237      return getSplitPointFromAllFiles();
238    }
239    int leftIndex = -1, rightIndex = state.stripeFiles.size();
240    long leftSize = 0, rightSize = 0;
241    long lastLeftSize = 0, lastRightSize = 0;
242    while (rightIndex - 1 != leftIndex) {
243      if (leftSize >= rightSize) {
244        --rightIndex;
245        lastRightSize = getStripeFilesSize(rightIndex);
246        rightSize += lastRightSize;
247      } else {
248        ++leftIndex;
249        lastLeftSize = getStripeFilesSize(leftIndex);
250        leftSize += lastLeftSize;
251      }
252    }
253    if (leftSize == 0 || rightSize == 0) {
254      String errMsg = String.format(
255        "Cannot split on a boundary - left index %d size %d, " + "right index %d size %d",
256        leftIndex, leftSize, rightIndex, rightSize);
257      debugDumpState(errMsg);
258      LOG.warn(errMsg);
259      return getSplitPointFromAllFiles();
260    }
261    double ratio = (double) rightSize / leftSize;
262    if (ratio < 1) {
263      ratio = 1 / ratio;
264    }
265    if (config.getMaxSplitImbalance() > ratio) {
266      return Optional.of(state.stripeEndRows[leftIndex]);
267    }
268
269    // If the difference between the sides is too large, we could get the proportional key on
270    // the a stripe to equalize the difference, but there's no proportional key method at the
271    // moment, and it's not extremely important.
272    // See if we can achieve better ratio if we split the bigger side in half.
273    boolean isRightLarger = rightSize >= leftSize;
274    double newRatio = isRightLarger
275      ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
276      : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
277    if (newRatio < 1) {
278      newRatio = 1 / newRatio;
279    }
280    if (newRatio >= ratio) {
281      return Optional.of(state.stripeEndRows[leftIndex]);
282    }
283    LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split " + newRatio
284      + " configured ratio " + config.getMaxSplitImbalance());
285    // OK, we may get better ratio, get it.
286    return StoreUtils.getSplitPoint(state.stripeFiles.get(isRightLarger ? rightIndex : leftIndex),
287      cellComparator);
288  }
289
290  private Optional<byte[]> getSplitPointFromAllFiles() throws IOException {
291    ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
292    sfs.addSublist(state.level0Files);
293    sfs.addAllSublists(state.stripeFiles);
294    return StoreUtils.getSplitPoint(sfs, cellComparator);
295  }
296
297  private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
298    return (double) (largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
299  }
300
301  @Override
302  public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
303    byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) {
304    if (state.stripeFiles.isEmpty()) {
305      return state.level0Files; // There's just L0.
306    }
307
308    int firstStripe = findStripeForRow(startRow, true);
309    int lastStripe = findStripeForRow(stopRow, false);
310    assert firstStripe <= lastStripe;
311    if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
312      return state.stripeFiles.get(firstStripe); // There's just one stripe we need.
313    }
314    if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
315      return state.allFilesCached; // We need to read all files.
316    }
317
318    ConcatenatedLists<HStoreFile> result = new ConcatenatedLists<>();
319    result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
320    result.addSublist(state.level0Files);
321    return result;
322  }
323
324  @Override
325  public void addCompactionResults(Collection<HStoreFile> compactedFiles,
326    Collection<HStoreFile> results) {
327    // See class comment for the assumptions we make here.
328    LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
329      + " files replaced by " + results.size());
330    // In order to be able to fail in the middle of the operation, we'll operate on lazy
331    // copies and apply the result at the end.
332    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
333    cmc.mergeResults(compactedFiles, results);
334    markCompactedAway(compactedFiles);
335    debugDumpState("Merged compaction results");
336  }
337
338  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
339  // Let a background thread close the actual reader on these compacted files and also
340  // ensure to evict the blocks from block cache so that they are no longer in
341  // cache
342  private void markCompactedAway(Collection<HStoreFile> compactedFiles) {
343    for (HStoreFile file : compactedFiles) {
344      file.markCompactedAway();
345    }
346  }
347
348  @Override
349  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
350    // See class comment for the assumptions we make here.
351    LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
352    // In order to be able to fail in the middle of the operation, we'll operate on lazy
353    // copies and apply the result at the end.
354    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
355    cmc.deleteResults(compactedFiles);
356    debugDumpState("Deleted compaction results");
357  }
358
359  @Override
360  public int getStoreCompactionPriority() {
361    // If there's only L0, do what the default store does.
362    // If we are in critical priority, do the same - we don't want to trump all stores all
363    // the time due to how many files we have.
364    int fc = getStorefileCount();
365    if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
366      return this.blockingFileCount - fc;
367    }
368    // If we are in good shape, we don't want to be trumped by all other stores due to how
369    // many files we have, so do an approximate mapping to normal priority range; L0 counts
370    // for all stripes.
371    int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
372    int priority = (int) Math.ceil(((double) (this.blockingFileCount - fc + l0) / sc) - l0);
373    return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
374  }
375
376  /**
377   * Gets the total size of all files in the stripe.
378   * @param stripeIndex Stripe index.
379   * @return Size.
380   */
381  private long getStripeFilesSize(int stripeIndex) {
382    long result = 0;
383    for (HStoreFile sf : state.stripeFiles.get(stripeIndex)) {
384      result += sf.getReader().length();
385    }
386    return result;
387  }
388
389  /**
390   * Loads initial store files that were picked up from some physical location pertaining to this
391   * store (presumably). Unlike adding files after compaction, assumes empty initial sets, and is
392   * forgiving with regard to stripe constraints - at worst, many/all files will go to level 0.
393   * @param storeFiles Store files to add.
394   */
395  private void loadUnclassifiedStoreFiles(List<HStoreFile> storeFiles) {
396    LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
397    TreeMap<byte[], ArrayList<HStoreFile>> candidateStripes = new TreeMap<>(MAP_COMPARATOR);
398    ArrayList<HStoreFile> level0Files = new ArrayList<>();
399    // Separate the files into tentative stripes; then validate. Currently, we rely on metadata.
400    // If needed, we could dynamically determine the stripes in future.
401    for (HStoreFile sf : storeFiles) {
402      byte[] startRow = startOf(sf), endRow = endOf(sf);
403      // Validate the range and put the files into place.
404      if (isInvalid(startRow) || isInvalid(endRow)) {
405        insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0.
406        ensureLevel0Metadata(sf);
407      } else if (!isOpen(startRow) && !isOpen(endRow) && nonOpenRowCompare(startRow, endRow) >= 0) {
408        LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
409          + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
410        insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also.
411        ensureLevel0Metadata(sf);
412      } else {
413        ArrayList<HStoreFile> stripe = candidateStripes.get(endRow);
414        if (stripe == null) {
415          stripe = new ArrayList<>();
416          candidateStripes.put(endRow, stripe);
417        }
418        insertFileIntoStripe(stripe, sf);
419      }
420    }
421    // Possible improvement - for variable-count stripes, if all the files are in L0, we can
422    // instead create single, open-ended stripe with all files.
423
424    boolean hasOverlaps = false;
425    byte[] expectedStartRow = null; // first stripe can start wherever
426    Iterator<Map.Entry<byte[], ArrayList<HStoreFile>>> entryIter =
427      candidateStripes.entrySet().iterator();
428    while (entryIter.hasNext()) {
429      Map.Entry<byte[], ArrayList<HStoreFile>> entry = entryIter.next();
430      ArrayList<HStoreFile> files = entry.getValue();
431      // Validate the file start rows, and remove the bad ones to level 0.
432      for (int i = 0; i < files.size(); ++i) {
433        HStoreFile sf = files.get(i);
434        byte[] startRow = startOf(sf);
435        if (expectedStartRow == null) {
436          expectedStartRow = startRow; // ensure that first stripe is still consistent
437        } else if (!rowEquals(expectedStartRow, startRow)) {
438          hasOverlaps = true;
439          LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
440            + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
441            + "], to L0 it goes");
442          HStoreFile badSf = files.remove(i);
443          insertFileIntoStripe(level0Files, badSf);
444          ensureLevel0Metadata(badSf);
445          --i;
446        }
447      }
448      // Check if any files from the candidate stripe are valid. If so, add a stripe.
449      byte[] endRow = entry.getKey();
450      if (!files.isEmpty()) {
451        expectedStartRow = endRow; // Next stripe must start exactly at that key.
452      } else {
453        entryIter.remove();
454      }
455    }
456
457    // In the end, there must be open ends on two sides. If not, and there were no errors i.e.
458    // files are consistent, they might be coming from a split. We will treat the boundaries
459    // as open keys anyway, and log the message.
460    // If there were errors, we'll play it safe and dump everything into L0.
461    if (!candidateStripes.isEmpty()) {
462      HStoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
463      boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
464      if (!isOpen) {
465        LOG.warn("The range of the loaded files does not cover full key space: from ["
466          + Bytes.toString(startOf(firstFile)) + "], to ["
467          + Bytes.toString(candidateStripes.lastKey()) + "]");
468        if (!hasOverlaps) {
469          ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
470          ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
471        } else {
472          LOG.warn("Inconsistent files, everything goes to L0.");
473          for (ArrayList<HStoreFile> files : candidateStripes.values()) {
474            for (HStoreFile sf : files) {
475              insertFileIntoStripe(level0Files, sf);
476              ensureLevel0Metadata(sf);
477            }
478          }
479          candidateStripes.clear();
480        }
481      }
482    }
483
484    // Copy the results into the fields.
485    State state = new State();
486    state.level0Files = ImmutableList.copyOf(level0Files);
487    state.stripeFiles = new ArrayList<>(candidateStripes.size());
488    state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
489    ArrayList<HStoreFile> newAllFiles = new ArrayList<>(level0Files);
490    int i = candidateStripes.size() - 1;
491    for (Map.Entry<byte[], ArrayList<HStoreFile>> entry : candidateStripes.entrySet()) {
492      state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
493      newAllFiles.addAll(entry.getValue());
494      if (i > 0) {
495        state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
496      }
497      --i;
498    }
499    state.allFilesCached = ImmutableList.copyOf(newAllFiles);
500    this.state = state;
501    debugDumpState("Files loaded");
502  }
503
504  private void ensureEdgeStripeMetadata(ArrayList<HStoreFile> stripe, boolean isFirst) {
505    HashMap<HStoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
506    for (HStoreFile sf : stripe) {
507      targetMap.put(sf, OPEN_KEY);
508    }
509  }
510
511  private void ensureLevel0Metadata(HStoreFile sf) {
512    if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
513    if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
514  }
515
516  private void debugDumpState(String string) {
517    if (!LOG.isDebugEnabled()) return;
518    StringBuilder sb = new StringBuilder();
519    sb.append("\n" + string + "; current stripe state is as such:");
520    sb.append("\n level 0 with ").append(state.level0Files.size())
521      .append(" files: " + TraditionalBinaryPrefix
522        .long2String(StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";");
523    for (int i = 0; i < state.stripeFiles.size(); ++i) {
524      String endRow = (i == state.stripeEndRows.length)
525        ? "(end)"
526        : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
527      sb.append("\n stripe ending in ").append(endRow).append(" with ")
528        .append(state.stripeFiles.get(i).size())
529        .append(" files: " + TraditionalBinaryPrefix.long2String(
530          StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";");
531    }
532    sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
533    sb.append("\n").append(getStorefileCount()).append(" files total.");
534    LOG.debug(sb.toString());
535  }
536
537  /**
538   * Checks whether the key indicates an open interval boundary (i.e. infinity).
539   */
540  private static final boolean isOpen(byte[] key) {
541    return key != null && key.length == 0;
542  }
543
544  private static final boolean isOpen(Cell key) {
545    return key != null && key.getRowLength() == 0;
546  }
547
548  /**
549   * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files).
550   */
551  private static final boolean isInvalid(byte[] key) {
552    // No need to use Arrays.equals because INVALID_KEY is null
553    return key == INVALID_KEY;
554  }
555
556  /**
557   * Compare two keys for equality.
558   */
559  private final boolean rowEquals(byte[] k1, byte[] k2) {
560    return Bytes.equals(k1, 0, k1.length, k2, 0, k2.length);
561  }
562
563  /**
564   * Compare two keys. Keys must not be open (isOpen(row) == false).
565   */
566  private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
567    assert !isOpen(k1) && !isOpen(k2);
568    return Bytes.compareTo(k1, k2);
569  }
570
571  private final int nonOpenRowCompare(Cell k1, byte[] k2) {
572    assert !isOpen(k1) && !isOpen(k2);
573    return cellComparator.compareRows(k1, k2, 0, k2.length);
574  }
575
576  /**
577   * Finds the stripe index by end row.
578   */
579  private final int findStripeIndexByEndRow(byte[] endRow) {
580    assert !isInvalid(endRow);
581    if (isOpen(endRow)) return state.stripeEndRows.length;
582    return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
583  }
584
585  /**
586   * Finds the stripe index for the stripe containing a row provided externally for get/scan.
587   */
588  private final int findStripeForRow(byte[] row, boolean isStart) {
589    if (isStart && Arrays.equals(row, HConstants.EMPTY_START_ROW)) return 0;
590    if (!isStart && Arrays.equals(row, HConstants.EMPTY_END_ROW)) {
591      return state.stripeFiles.size() - 1;
592    }
593    // If there's an exact match below, a stripe ends at "row". Stripe right boundary is
594    // exclusive, so that means the row is in the next stripe; thus, we need to add one to index.
595    // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where
596    // insertion point is the index of the next greater element, or list size if none. The
597    // insertion point happens to be exactly what we need, so we need to add one to the result.
598    return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
599  }
600
601  @Override
602  public final byte[] getStartRow(int stripeIndex) {
603    return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
604  }
605
606  @Override
607  public final byte[] getEndRow(int stripeIndex) {
608    return (stripeIndex == state.stripeEndRows.length
609      ? OPEN_KEY
610      : state.stripeEndRows[stripeIndex]);
611  }
612
613  private byte[] startOf(HStoreFile sf) {
614    byte[] result = fileStarts.get(sf);
615
616    // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter
617    // serves only as a marker and is not to be confused with other empty byte arrays.
618    // See Javadoc of INVALID_KEY_IN_MAP for more information
619    return (result == null) ? sf.getMetadataValue(STRIPE_START_KEY)
620      : result == INVALID_KEY_IN_MAP ? INVALID_KEY
621      : result;
622  }
623
624  private byte[] endOf(HStoreFile sf) {
625    byte[] result = fileEnds.get(sf);
626
627    // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter
628    // serves only as a marker and is not to be confused with other empty byte arrays.
629    // See Javadoc of INVALID_KEY_IN_MAP for more information
630    return (result == null) ? sf.getMetadataValue(STRIPE_END_KEY)
631      : result == INVALID_KEY_IN_MAP ? INVALID_KEY
632      : result;
633  }
634
635  /**
636   * Inserts a file in the correct place (by seqnum) in a stripe copy.
637   * @param stripe Stripe copy to insert into.
638   * @param sf     File to insert.
639   */
640  private static void insertFileIntoStripe(ArrayList<HStoreFile> stripe, HStoreFile sf) {
641    // The only operation for which sorting of the files matters is KeyBefore. Therefore,
642    // we will store the file in reverse order by seqNum from the outset.
643    for (int insertBefore = 0;; ++insertBefore) {
644      if (
645        insertBefore == stripe.size()
646          || (StoreFileComparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)
647      ) {
648        stripe.add(insertBefore, sf);
649        break;
650      }
651    }
652  }
653
654  /**
655   * An extension of ConcatenatedLists that has several peculiar properties. First, one can cut the
656   * tail of the logical list by removing last several sub-lists. Second, items can be removed thru
657   * iterator. Third, if the sub-lists are immutable, they are replaced with mutable copies when
658   * needed. On average KeyBefore operation will contain half the stripes as potential candidates,
659   * but will quickly cut down on them as it finds something in the more likely ones; thus, the
660   * above allow us to avoid unnecessary copying of a bunch of lists.
661   */
662  private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<HStoreFile> {
663    @Override
664    public java.util.Iterator<HStoreFile> iterator() {
665      return new Iterator();
666    }
667
668    public class Iterator extends ConcatenatedLists<HStoreFile>.Iterator {
669      public ArrayList<List<HStoreFile>> getComponents() {
670        return components;
671      }
672
673      public void removeComponents(int startIndex) {
674        List<List<HStoreFile>> subList = components.subList(startIndex, components.size());
675        for (List<HStoreFile> entry : subList) {
676          size -= entry.size();
677        }
678        assert size >= 0;
679        subList.clear();
680      }
681
682      @Override
683      public void remove() {
684        if (!this.nextWasCalled) {
685          throw new IllegalStateException("No element to remove");
686        }
687        this.nextWasCalled = false;
688        List<HStoreFile> src = components.get(currentComponent);
689        if (src instanceof ImmutableList<?>) {
690          src = new ArrayList<>(src);
691          components.set(currentComponent, src);
692        }
693        src.remove(indexWithinComponent);
694        --size;
695        --indexWithinComponent;
696        if (src.isEmpty()) {
697          components.remove(currentComponent); // indexWithinComponent is already -1 here.
698        }
699      }
700    }
701  }
702
703  /**
704   * Non-static helper class for merging compaction or flush results. Since we want to merge them
705   * atomically (more or less), it operates on lazy copies, then creates a new state object and puts
706   * it in place.
707   */
708  private class CompactionOrFlushMergeCopy {
709    private ArrayList<List<HStoreFile>> stripeFiles = null;
710    private ArrayList<HStoreFile> level0Files = null;
711    private ArrayList<byte[]> stripeEndRows = null;
712
713    private Collection<HStoreFile> compactedFiles = null;
714    private Collection<HStoreFile> results = null;
715
716    private List<HStoreFile> l0Results = new ArrayList<>();
717    private final boolean isFlush;
718
719    public CompactionOrFlushMergeCopy(boolean isFlush) {
720      // Create a lazy mutable copy (other fields are so lazy they start out as nulls).
721      this.stripeFiles = new ArrayList<>(StripeStoreFileManager.this.state.stripeFiles);
722      this.isFlush = isFlush;
723    }
724
725    private void mergeResults(Collection<HStoreFile> compactedFiles,
726      Collection<HStoreFile> results) {
727      assert this.compactedFiles == null && this.results == null;
728      this.compactedFiles = compactedFiles;
729      this.results = results;
730      // Do logical processing.
731      if (!isFlush) {
732        removeCompactedFiles();
733      }
734      TreeMap<byte[], HStoreFile> newStripes = processResults();
735      if (newStripes != null) {
736        processNewCandidateStripes(newStripes);
737      }
738      // Create new state and update parent.
739      State state = createNewState(false);
740      StripeStoreFileManager.this.state = state;
741      updateMetadataMaps();
742    }
743
744    private void deleteResults(Collection<HStoreFile> compactedFiles) {
745      this.compactedFiles = compactedFiles;
746      // Create new state and update parent.
747      State state = createNewState(true);
748      StripeStoreFileManager.this.state = state;
749      updateMetadataMaps();
750    }
751
752    private State createNewState(boolean delCompactedFiles) {
753      State oldState = StripeStoreFileManager.this.state;
754      // Stripe count should be the same unless the end rows changed.
755      assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
756      State newState = new State();
757      newState.level0Files =
758        (this.level0Files == null) ? oldState.level0Files : ImmutableList.copyOf(this.level0Files);
759      newState.stripeEndRows = (this.stripeEndRows == null)
760        ? oldState.stripeEndRows
761        : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
762      newState.stripeFiles = new ArrayList<>(this.stripeFiles.size());
763      for (List<HStoreFile> newStripe : this.stripeFiles) {
764        newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
765          ? (ImmutableList<HStoreFile>) newStripe
766          : ImmutableList.copyOf(newStripe));
767      }
768
769      List<HStoreFile> newAllFiles = new ArrayList<>(oldState.allFilesCached);
770      List<HStoreFile> newAllCompactedFiles = new ArrayList<>(oldState.allCompactedFilesCached);
771      if (!isFlush) {
772        newAllFiles.removeAll(compactedFiles);
773        if (delCompactedFiles) {
774          newAllCompactedFiles.removeAll(compactedFiles);
775        } else {
776          newAllCompactedFiles.addAll(compactedFiles);
777        }
778      }
779      if (results != null) {
780        newAllFiles.addAll(results);
781      }
782      newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
783      newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles);
784      return newState;
785    }
786
787    private void updateMetadataMaps() {
788      StripeStoreFileManager parent = StripeStoreFileManager.this;
789      if (!isFlush) {
790        for (HStoreFile sf : this.compactedFiles) {
791          parent.fileStarts.remove(sf);
792          parent.fileEnds.remove(sf);
793        }
794      }
795      if (this.l0Results != null) {
796        for (HStoreFile sf : this.l0Results) {
797          parent.ensureLevel0Metadata(sf);
798        }
799      }
800    }
801
802    /**
803     * @param index Index of the stripe we need.
804     * @return A lazy stripe copy from current stripes.
805     */
806    private final ArrayList<HStoreFile> getStripeCopy(int index) {
807      List<HStoreFile> stripeCopy = this.stripeFiles.get(index);
808      ArrayList<HStoreFile> result = null;
809      if (stripeCopy instanceof ImmutableList<?>) {
810        result = new ArrayList<>(stripeCopy);
811        this.stripeFiles.set(index, result);
812      } else {
813        result = (ArrayList<HStoreFile>) stripeCopy;
814      }
815      return result;
816    }
817
818    /** Returns A lazy L0 copy from current state. */
819    private final ArrayList<HStoreFile> getLevel0Copy() {
820      if (this.level0Files == null) {
821        this.level0Files = new ArrayList<>(StripeStoreFileManager.this.state.level0Files);
822      }
823      return this.level0Files;
824    }
825
826    /**
827     * Process new files, and add them either to the structure of existing stripes, or to the list
828     * of new candidate stripes.
829     * @return New candidate stripes.
830     */
831    private TreeMap<byte[], HStoreFile> processResults() {
832      TreeMap<byte[], HStoreFile> newStripes = null;
833      for (HStoreFile sf : this.results) {
834        byte[] startRow = startOf(sf), endRow = endOf(sf);
835        if (isInvalid(endRow) || isInvalid(startRow)) {
836          if (!isFlush) {
837            LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
838          }
839          insertFileIntoStripe(getLevel0Copy(), sf);
840          this.l0Results.add(sf);
841          continue;
842        }
843        if (!this.stripeFiles.isEmpty()) {
844          int stripeIndex = findStripeIndexByEndRow(endRow);
845          if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
846            // Simple/common case - add file to an existing stripe.
847            insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
848            continue;
849          }
850        }
851
852        // Make a new candidate stripe.
853        if (newStripes == null) {
854          newStripes = new TreeMap<>(MAP_COMPARATOR);
855        }
856        HStoreFile oldSf = newStripes.put(endRow, sf);
857        if (oldSf != null) {
858          throw new IllegalStateException(
859            "Compactor has produced multiple files for the stripe ending in ["
860              + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
861        }
862      }
863      return newStripes;
864    }
865
866    /**
867     * Remove compacted files.
868     */
869    private void removeCompactedFiles() {
870      for (HStoreFile oldFile : this.compactedFiles) {
871        byte[] oldEndRow = endOf(oldFile);
872        List<HStoreFile> source = null;
873        if (isInvalid(oldEndRow)) {
874          source = getLevel0Copy();
875        } else {
876          int stripeIndex = findStripeIndexByEndRow(oldEndRow);
877          if (stripeIndex < 0) {
878            throw new IllegalStateException(
879              "An allegedly compacted file [" + oldFile + "] does not belong"
880                + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
881          }
882          source = getStripeCopy(stripeIndex);
883        }
884        if (!source.remove(oldFile)) {
885          LOG.warn("An allegedly compacted file [{}] was not found", oldFile);
886        }
887      }
888    }
889
890    /**
891     * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with new
892     * candidate stripes/removes old stripes; produces new set of stripe end rows.
893     * @param newStripes New stripes - files by end row.
894     */
895    private void processNewCandidateStripes(TreeMap<byte[], HStoreFile> newStripes) {
896      // Validate that the removed and added aggregate ranges still make for a full key space.
897      boolean hasStripes = !this.stripeFiles.isEmpty();
898      this.stripeEndRows =
899        new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
900      int removeFrom = 0;
901      byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
902      byte[] lastEndRow = newStripes.lastKey();
903      if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
904        throw new IllegalStateException("Newly created stripes do not cover the entire key space.");
905      }
906
907      boolean canAddNewStripes = true;
908      Collection<HStoreFile> filesForL0 = null;
909      if (hasStripes) {
910        // Determine which stripes will need to be removed because they conflict with new stripes.
911        // The new boundaries should match old stripe boundaries, so we should get exact matches.
912        if (isOpen(firstStartRow)) {
913          removeFrom = 0;
914        } else {
915          removeFrom = findStripeIndexByEndRow(firstStartRow);
916          if (removeFrom < 0) {
917            throw new IllegalStateException("Compaction is trying to add a bad range.");
918          }
919          ++removeFrom;
920        }
921        int removeTo = findStripeIndexByEndRow(lastEndRow);
922        if (removeTo < 0) {
923          throw new IllegalStateException("Compaction is trying to add a bad range.");
924        }
925        // See if there are files in the stripes we are trying to replace.
926        ArrayList<HStoreFile> conflictingFiles = new ArrayList<>();
927        for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
928          conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
929        }
930        if (!conflictingFiles.isEmpty()) {
931          // This can be caused by two things - concurrent flush into stripes, or a bug.
932          // Unfortunately, we cannot tell them apart without looking at timing or something
933          // like that. We will assume we are dealing with a flush and dump it into L0.
934          if (isFlush) {
935            long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
936            LOG.warn("Stripes were created by a flush, but results of size " + newSize
937              + " cannot be added because the stripes have changed");
938            canAddNewStripes = false;
939            filesForL0 = newStripes.values();
940          } else {
941            long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
942            LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
943              + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
944            filesForL0 = conflictingFiles;
945          }
946          if (filesForL0 != null) {
947            for (HStoreFile sf : filesForL0) {
948              insertFileIntoStripe(getLevel0Copy(), sf);
949            }
950            l0Results.addAll(filesForL0);
951          }
952        }
953
954        if (canAddNewStripes) {
955          // Remove old empty stripes.
956          int originalCount = this.stripeFiles.size();
957          for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
958            if (removeIndex != originalCount - 1) {
959              this.stripeEndRows.remove(removeIndex);
960            }
961            this.stripeFiles.remove(removeIndex);
962          }
963        }
964      }
965
966      if (!canAddNewStripes) {
967        return; // Files were already put into L0.
968      }
969
970      // Now, insert new stripes. The total ranges match, so we can insert where we removed.
971      byte[] previousEndRow = null;
972      int insertAt = removeFrom;
973      for (Map.Entry<byte[], HStoreFile> newStripe : newStripes.entrySet()) {
974        if (previousEndRow != null) {
975          // Validate that the ranges are contiguous.
976          assert !isOpen(previousEndRow);
977          byte[] startRow = startOf(newStripe.getValue());
978          if (!rowEquals(previousEndRow, startRow)) {
979            throw new IllegalStateException("The new stripes produced by "
980              + (isFlush ? "flush" : "compaction") + " are not contiguous");
981          }
982        }
983        // Add the new stripe.
984        ArrayList<HStoreFile> tmp = new ArrayList<>();
985        tmp.add(newStripe.getValue());
986        stripeFiles.add(insertAt, tmp);
987        previousEndRow = newStripe.getKey();
988        if (!isOpen(previousEndRow)) {
989          stripeEndRows.add(insertAt, previousEndRow);
990        }
991        ++insertAt;
992      }
993    }
994  }
995
996  @Override
997  public List<HStoreFile> getLevel0Files() {
998    return this.state.level0Files;
999  }
1000
1001  @Override
1002  public List<byte[]> getStripeBoundaries() {
1003    if (this.state.stripeFiles.isEmpty()) {
1004      return Collections.emptyList();
1005    }
1006    ArrayList<byte[]> result = new ArrayList<>(this.state.stripeEndRows.length + 2);
1007    result.add(OPEN_KEY);
1008    Collections.addAll(result, this.state.stripeEndRows);
1009    result.add(OPEN_KEY);
1010    return result;
1011  }
1012
1013  @Override
1014  public ArrayList<ImmutableList<HStoreFile>> getStripes() {
1015    return this.state.stripeFiles;
1016  }
1017
1018  @Override
1019  public int getStripeCount() {
1020    return this.state.stripeFiles.size();
1021  }
1022
1023  @Override
1024  public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) {
1025    // 1) We can never get rid of the last file which has the maximum seqid in a stripe.
1026    // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
1027    State state = this.state;
1028    Collection<HStoreFile> expiredStoreFiles = null;
1029    for (ImmutableList<HStoreFile> stripe : state.stripeFiles) {
1030      expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
1031    }
1032    return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
1033  }
1034
1035  private Collection<HStoreFile> findExpiredFiles(ImmutableList<HStoreFile> stripe, long maxTs,
1036    List<HStoreFile> filesCompacting, Collection<HStoreFile> expiredStoreFiles) {
1037    // Order by seqnum is reversed.
1038    for (int i = 1; i < stripe.size(); ++i) {
1039      HStoreFile sf = stripe.get(i);
1040      synchronized (sf) {
1041        long fileTs = sf.getReader().getMaxTimestamp();
1042        if (fileTs < maxTs && !filesCompacting.contains(sf)) {
1043          LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimestamp is "
1044            + fileTs + ", which is below " + maxTs);
1045          if (expiredStoreFiles == null) {
1046            expiredStoreFiles = new ArrayList<>();
1047          }
1048          expiredStoreFiles.add(sf);
1049        }
1050      }
1051    }
1052    return expiredStoreFiles;
1053  }
1054
1055  @Override
1056  public double getCompactionPressure() {
1057    State stateLocal = this.state;
1058    if (stateLocal.allFilesCached.size() > blockingFileCount) {
1059      // just a hit to tell others that we have reached the blocking file count.
1060      return 2.0;
1061    }
1062    if (stateLocal.stripeFiles.isEmpty()) {
1063      return 0.0;
1064    }
1065    int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size();
1066    // do not calculate L0 separately because data will be moved to stripe quickly and in most cases
1067    // we flush data to stripe directly.
1068    int delta = stateLocal.level0Files.isEmpty() ? 0 : 1;
1069    double max = 0.0;
1070    for (ImmutableList<HStoreFile> stripeFile : stateLocal.stripeFiles) {
1071      int stripeFileCount = stripeFile.size();
1072      double normCount = (double) (stripeFileCount + delta - config.getStripeCompactMinFiles())
1073        / (blockingFilePerStripe - config.getStripeCompactMinFiles());
1074      if (normCount >= 1.0) {
1075        // This could happen if stripe is not split evenly. Do not return values that larger than
1076        // 1.0 because we have not reached the blocking file count actually.
1077        return 1.0;
1078      }
1079      if (normCount > max) {
1080        max = normCount;
1081      }
1082    }
1083    return max;
1084  }
1085
1086  @Override
1087  public Comparator<HStoreFile> getStoreFileComparator() {
1088    return StoreFileComparators.SEQ_ID;
1089  }
1090}