001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.TreeMap; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.ConcatenatedLists; 041import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 048 049/** 050 * Stripe implementation of {@link StoreFileManager}. Not thread safe - relies on external locking 051 * (in HStore). Collections that this class returns are immutable or unique to the call, so they 052 * should be safe. Stripe store splits the key space of the region into non-overlapping stripes, as 053 * well as some recent files that have all the keys (level 0). Each stripe contains a set of files. 054 * When L0 is compacted, it's split into the files corresponding to existing stripe boundaries, that 055 * can thus be added to stripes. When scan or get happens, it only has to read the files from the 056 * corresponding stripes. See {@link StripeCompactionPolicy} on how the stripes are determined; this 057 * class doesn't care. This class should work together with {@link StripeCompactionPolicy} and 058 * {@link org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor}. With regard to how they 059 * work, we make at least the following (reasonable) assumptions: - Compaction produces one file per 060 * new stripe (if any); that is easy to change. - Compaction has one contiguous set of stripes both 061 * in and out, except if L0 is involved. 062 */ 063@InterfaceAudience.Private 064public class StripeStoreFileManager 065 implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider { 066 private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFileManager.class); 067 068 /** 069 * The file metadata fields that contain the stripe information. 070 */ 071 public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY"); 072 public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY"); 073 074 private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator(); 075 076 /** 077 * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf). 078 */ 079 public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY; 080 final static byte[] INVALID_KEY = null; 081 082 /** 083 * The state class. Used solely to replace results atomically during compactions and avoid 084 * complicated error handling. 085 */ 086 private static class State { 087 /** 088 * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored 089 * here. It is invariant that the start row of the stripe is the end row of the previous one 090 * (and is an open boundary for the first one). 091 */ 092 public byte[][] stripeEndRows = new byte[0][]; 093 094 /** 095 * Files by stripe. Each element of the list corresponds to stripeEndRow element with the same 096 * index, except the last one. Inside each list, the files are in reverse order by seqNum. Note 097 * that the length of this is one higher than that of stripeEndKeys. 098 */ 099 public ArrayList<ImmutableList<HStoreFile>> stripeFiles = new ArrayList<>(); 100 /** Level 0. The files are in reverse order by seqNum. */ 101 public ImmutableList<HStoreFile> level0Files = ImmutableList.of(); 102 103 /** Cached list of all files in the structure, to return from some calls */ 104 public ImmutableList<HStoreFile> allFilesCached = ImmutableList.of(); 105 private ImmutableList<HStoreFile> allCompactedFilesCached = ImmutableList.of(); 106 } 107 108 private State state = null; 109 110 /** Cached file metadata (or overrides as the case may be) */ 111 private HashMap<HStoreFile, byte[]> fileStarts = new HashMap<>(); 112 private HashMap<HStoreFile, byte[]> fileEnds = new HashMap<>(); 113 /** 114 * Normally invalid key is null, but in the map null is the result for "no key"; so use the 115 * following constant value in these maps instead. Note that this is a constant and we use it to 116 * compare by reference when we read from the map. 117 */ 118 private static final byte[] INVALID_KEY_IN_MAP = new byte[0]; 119 120 private final CellComparator cellComparator; 121 private StripeStoreConfig config; 122 123 private final int blockingFileCount; 124 125 public StripeStoreFileManager(CellComparator kvComparator, Configuration conf, 126 StripeStoreConfig config) { 127 this.cellComparator = kvComparator; 128 this.config = config; 129 this.blockingFileCount = 130 conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); 131 } 132 133 @Override 134 public void loadFiles(List<HStoreFile> storeFiles) { 135 loadUnclassifiedStoreFiles(storeFiles); 136 } 137 138 @Override 139 public Collection<HStoreFile> getStoreFiles() { 140 return state.allFilesCached; 141 } 142 143 @Override 144 public Collection<HStoreFile> getCompactedfiles() { 145 return state.allCompactedFilesCached; 146 } 147 148 @Override 149 public int getCompactedFilesCount() { 150 return state.allCompactedFilesCached.size(); 151 } 152 153 @Override 154 public void insertNewFiles(Collection<HStoreFile> sfs) { 155 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); 156 cmc.mergeResults(Collections.emptyList(), sfs); 157 debugDumpState("Added new files"); 158 } 159 160 @Override 161 public ImmutableCollection<HStoreFile> clearFiles() { 162 ImmutableCollection<HStoreFile> result = state.allFilesCached; 163 this.state = new State(); 164 this.fileStarts.clear(); 165 this.fileEnds.clear(); 166 return result; 167 } 168 169 @Override 170 public ImmutableCollection<HStoreFile> clearCompactedFiles() { 171 ImmutableCollection<HStoreFile> result = state.allCompactedFilesCached; 172 this.state = new State(); 173 return result; 174 } 175 176 @Override 177 public int getStorefileCount() { 178 return state.allFilesCached.size(); 179 } 180 181 /** 182 * See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} for details on this 183 * methods. 184 */ 185 @Override 186 public Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) { 187 KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists(); 188 // Order matters for this call. 189 result.addSublist(state.level0Files); 190 if (!state.stripeFiles.isEmpty()) { 191 int lastStripeIndex = findStripeForRow(CellUtil.cloneRow(targetKey), false); 192 for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) { 193 result.addSublist(state.stripeFiles.get(stripeIndex)); 194 } 195 } 196 return result.iterator(); 197 } 198 199 /** 200 * See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and 201 * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, Cell)} for 202 * details on this methods. 203 */ 204 @Override 205 public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore( 206 Iterator<HStoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) { 207 KeyBeforeConcatenatedLists.Iterator original = 208 (KeyBeforeConcatenatedLists.Iterator) candidateFiles; 209 assert original != null; 210 ArrayList<List<HStoreFile>> components = original.getComponents(); 211 for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) { 212 HStoreFile sf = components.get(firstIrrelevant).get(0); 213 byte[] endKey = endOf(sf); 214 // Entries are ordered as such: L0, then stripes in reverse order. We never remove 215 // level 0; we remove the stripe, and all subsequent ones, as soon as we find the 216 // first one that cannot possibly have better candidates. 217 if (!isInvalid(endKey) && !isOpen(endKey) && (nonOpenRowCompare(targetKey, endKey) >= 0)) { 218 original.removeComponents(firstIrrelevant); 219 break; 220 } 221 } 222 return original; 223 } 224 225 /** 226 * Override of getSplitPoint that determines the split point as the boundary between two stripes, 227 * unless it causes significant imbalance between split sides' sizes. In that case, the split 228 * boundary will be chosen from the middle of one of the stripes to minimize imbalance. 229 * @return The split point, or null if no split is possible. 230 */ 231 @Override 232 public Optional<byte[]> getSplitPoint() throws IOException { 233 if (this.getStorefileCount() == 0) { 234 return Optional.empty(); 235 } 236 if (state.stripeFiles.size() <= 1) { 237 return getSplitPointFromAllFiles(); 238 } 239 int leftIndex = -1, rightIndex = state.stripeFiles.size(); 240 long leftSize = 0, rightSize = 0; 241 long lastLeftSize = 0, lastRightSize = 0; 242 while (rightIndex - 1 != leftIndex) { 243 if (leftSize >= rightSize) { 244 --rightIndex; 245 lastRightSize = getStripeFilesSize(rightIndex); 246 rightSize += lastRightSize; 247 } else { 248 ++leftIndex; 249 lastLeftSize = getStripeFilesSize(leftIndex); 250 leftSize += lastLeftSize; 251 } 252 } 253 if (leftSize == 0 || rightSize == 0) { 254 String errMsg = String.format( 255 "Cannot split on a boundary - left index %d size %d, " + "right index %d size %d", 256 leftIndex, leftSize, rightIndex, rightSize); 257 debugDumpState(errMsg); 258 LOG.warn(errMsg); 259 return getSplitPointFromAllFiles(); 260 } 261 double ratio = (double) rightSize / leftSize; 262 if (ratio < 1) { 263 ratio = 1 / ratio; 264 } 265 if (config.getMaxSplitImbalance() > ratio) { 266 return Optional.of(state.stripeEndRows[leftIndex]); 267 } 268 269 // If the difference between the sides is too large, we could get the proportional key on 270 // the a stripe to equalize the difference, but there's no proportional key method at the 271 // moment, and it's not extremely important. 272 // See if we can achieve better ratio if we split the bigger side in half. 273 boolean isRightLarger = rightSize >= leftSize; 274 double newRatio = isRightLarger 275 ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize) 276 : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize); 277 if (newRatio < 1) { 278 newRatio = 1 / newRatio; 279 } 280 if (newRatio >= ratio) { 281 return Optional.of(state.stripeEndRows[leftIndex]); 282 } 283 LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split " + newRatio 284 + " configured ratio " + config.getMaxSplitImbalance()); 285 // OK, we may get better ratio, get it. 286 return StoreUtils.getSplitPoint(state.stripeFiles.get(isRightLarger ? rightIndex : leftIndex), 287 cellComparator); 288 } 289 290 private Optional<byte[]> getSplitPointFromAllFiles() throws IOException { 291 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 292 sfs.addSublist(state.level0Files); 293 sfs.addAllSublists(state.stripeFiles); 294 return StoreUtils.getSplitPoint(sfs, cellComparator); 295 } 296 297 private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) { 298 return (double) (largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f); 299 } 300 301 @Override 302 public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow, 303 byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) { 304 if (state.stripeFiles.isEmpty()) { 305 return state.level0Files; // There's just L0. 306 } 307 308 int firstStripe = findStripeForRow(startRow, true); 309 int lastStripe = findStripeForRow(stopRow, false); 310 assert firstStripe <= lastStripe; 311 if (firstStripe == lastStripe && state.level0Files.isEmpty()) { 312 return state.stripeFiles.get(firstStripe); // There's just one stripe we need. 313 } 314 if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) { 315 return state.allFilesCached; // We need to read all files. 316 } 317 318 ConcatenatedLists<HStoreFile> result = new ConcatenatedLists<>(); 319 result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1)); 320 result.addSublist(state.level0Files); 321 return result; 322 } 323 324 @Override 325 public void addCompactionResults(Collection<HStoreFile> compactedFiles, 326 Collection<HStoreFile> results) { 327 // See class comment for the assumptions we make here. 328 LOG.debug("Attempting to merge compaction results: " + compactedFiles.size() 329 + " files replaced by " + results.size()); 330 // In order to be able to fail in the middle of the operation, we'll operate on lazy 331 // copies and apply the result at the end. 332 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); 333 cmc.mergeResults(compactedFiles, results); 334 markCompactedAway(compactedFiles); 335 debugDumpState("Merged compaction results"); 336 } 337 338 // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised 339 // Let a background thread close the actual reader on these compacted files and also 340 // ensure to evict the blocks from block cache so that they are no longer in 341 // cache 342 private void markCompactedAway(Collection<HStoreFile> compactedFiles) { 343 for (HStoreFile file : compactedFiles) { 344 file.markCompactedAway(); 345 } 346 } 347 348 @Override 349 public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) { 350 // See class comment for the assumptions we make here. 351 LOG.debug("Attempting to delete compaction results: " + compactedFiles.size()); 352 // In order to be able to fail in the middle of the operation, we'll operate on lazy 353 // copies and apply the result at the end. 354 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); 355 cmc.deleteResults(compactedFiles); 356 debugDumpState("Deleted compaction results"); 357 } 358 359 @Override 360 public int getStoreCompactionPriority() { 361 // If there's only L0, do what the default store does. 362 // If we are in critical priority, do the same - we don't want to trump all stores all 363 // the time due to how many files we have. 364 int fc = getStorefileCount(); 365 if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) { 366 return this.blockingFileCount - fc; 367 } 368 // If we are in good shape, we don't want to be trumped by all other stores due to how 369 // many files we have, so do an approximate mapping to normal priority range; L0 counts 370 // for all stripes. 371 int l0 = state.level0Files.size(), sc = state.stripeFiles.size(); 372 int priority = (int) Math.ceil(((double) (this.blockingFileCount - fc + l0) / sc) - l0); 373 return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority; 374 } 375 376 /** 377 * Gets the total size of all files in the stripe. 378 * @param stripeIndex Stripe index. 379 * @return Size. 380 */ 381 private long getStripeFilesSize(int stripeIndex) { 382 long result = 0; 383 for (HStoreFile sf : state.stripeFiles.get(stripeIndex)) { 384 result += sf.getReader().length(); 385 } 386 return result; 387 } 388 389 /** 390 * Loads initial store files that were picked up from some physical location pertaining to this 391 * store (presumably). Unlike adding files after compaction, assumes empty initial sets, and is 392 * forgiving with regard to stripe constraints - at worst, many/all files will go to level 0. 393 * @param storeFiles Store files to add. 394 */ 395 private void loadUnclassifiedStoreFiles(List<HStoreFile> storeFiles) { 396 LOG.debug("Attempting to load " + storeFiles.size() + " store files."); 397 TreeMap<byte[], ArrayList<HStoreFile>> candidateStripes = new TreeMap<>(MAP_COMPARATOR); 398 ArrayList<HStoreFile> level0Files = new ArrayList<>(); 399 // Separate the files into tentative stripes; then validate. Currently, we rely on metadata. 400 // If needed, we could dynamically determine the stripes in future. 401 for (HStoreFile sf : storeFiles) { 402 byte[] startRow = startOf(sf), endRow = endOf(sf); 403 // Validate the range and put the files into place. 404 if (isInvalid(startRow) || isInvalid(endRow)) { 405 insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0. 406 ensureLevel0Metadata(sf); 407 } else if (!isOpen(startRow) && !isOpen(endRow) && nonOpenRowCompare(startRow, endRow) >= 0) { 408 LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row [" 409 + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0"); 410 insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also. 411 ensureLevel0Metadata(sf); 412 } else { 413 ArrayList<HStoreFile> stripe = candidateStripes.get(endRow); 414 if (stripe == null) { 415 stripe = new ArrayList<>(); 416 candidateStripes.put(endRow, stripe); 417 } 418 insertFileIntoStripe(stripe, sf); 419 } 420 } 421 // Possible improvement - for variable-count stripes, if all the files are in L0, we can 422 // instead create single, open-ended stripe with all files. 423 424 boolean hasOverlaps = false; 425 byte[] expectedStartRow = null; // first stripe can start wherever 426 Iterator<Map.Entry<byte[], ArrayList<HStoreFile>>> entryIter = 427 candidateStripes.entrySet().iterator(); 428 while (entryIter.hasNext()) { 429 Map.Entry<byte[], ArrayList<HStoreFile>> entry = entryIter.next(); 430 ArrayList<HStoreFile> files = entry.getValue(); 431 // Validate the file start rows, and remove the bad ones to level 0. 432 for (int i = 0; i < files.size(); ++i) { 433 HStoreFile sf = files.get(i); 434 byte[] startRow = startOf(sf); 435 if (expectedStartRow == null) { 436 expectedStartRow = startRow; // ensure that first stripe is still consistent 437 } else if (!rowEquals(expectedStartRow, startRow)) { 438 hasOverlaps = true; 439 LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at [" 440 + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow) 441 + "], to L0 it goes"); 442 HStoreFile badSf = files.remove(i); 443 insertFileIntoStripe(level0Files, badSf); 444 ensureLevel0Metadata(badSf); 445 --i; 446 } 447 } 448 // Check if any files from the candidate stripe are valid. If so, add a stripe. 449 byte[] endRow = entry.getKey(); 450 if (!files.isEmpty()) { 451 expectedStartRow = endRow; // Next stripe must start exactly at that key. 452 } else { 453 entryIter.remove(); 454 } 455 } 456 457 // In the end, there must be open ends on two sides. If not, and there were no errors i.e. 458 // files are consistent, they might be coming from a split. We will treat the boundaries 459 // as open keys anyway, and log the message. 460 // If there were errors, we'll play it safe and dump everything into L0. 461 if (!candidateStripes.isEmpty()) { 462 HStoreFile firstFile = candidateStripes.firstEntry().getValue().get(0); 463 boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey()); 464 if (!isOpen) { 465 LOG.warn("The range of the loaded files does not cover full key space: from [" 466 + Bytes.toString(startOf(firstFile)) + "], to [" 467 + Bytes.toString(candidateStripes.lastKey()) + "]"); 468 if (!hasOverlaps) { 469 ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true); 470 ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false); 471 } else { 472 LOG.warn("Inconsistent files, everything goes to L0."); 473 for (ArrayList<HStoreFile> files : candidateStripes.values()) { 474 for (HStoreFile sf : files) { 475 insertFileIntoStripe(level0Files, sf); 476 ensureLevel0Metadata(sf); 477 } 478 } 479 candidateStripes.clear(); 480 } 481 } 482 } 483 484 // Copy the results into the fields. 485 State state = new State(); 486 state.level0Files = ImmutableList.copyOf(level0Files); 487 state.stripeFiles = new ArrayList<>(candidateStripes.size()); 488 state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][]; 489 ArrayList<HStoreFile> newAllFiles = new ArrayList<>(level0Files); 490 int i = candidateStripes.size() - 1; 491 for (Map.Entry<byte[], ArrayList<HStoreFile>> entry : candidateStripes.entrySet()) { 492 state.stripeFiles.add(ImmutableList.copyOf(entry.getValue())); 493 newAllFiles.addAll(entry.getValue()); 494 if (i > 0) { 495 state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey(); 496 } 497 --i; 498 } 499 state.allFilesCached = ImmutableList.copyOf(newAllFiles); 500 this.state = state; 501 debugDumpState("Files loaded"); 502 } 503 504 private void ensureEdgeStripeMetadata(ArrayList<HStoreFile> stripe, boolean isFirst) { 505 HashMap<HStoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds; 506 for (HStoreFile sf : stripe) { 507 targetMap.put(sf, OPEN_KEY); 508 } 509 } 510 511 private void ensureLevel0Metadata(HStoreFile sf) { 512 if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP); 513 if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP); 514 } 515 516 private void debugDumpState(String string) { 517 if (!LOG.isDebugEnabled()) return; 518 StringBuilder sb = new StringBuilder(); 519 sb.append("\n" + string + "; current stripe state is as such:"); 520 sb.append("\n level 0 with ").append(state.level0Files.size()) 521 .append(" files: " + TraditionalBinaryPrefix 522 .long2String(StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";"); 523 for (int i = 0; i < state.stripeFiles.size(); ++i) { 524 String endRow = (i == state.stripeEndRows.length) 525 ? "(end)" 526 : "[" + Bytes.toString(state.stripeEndRows[i]) + "]"; 527 sb.append("\n stripe ending in ").append(endRow).append(" with ") 528 .append(state.stripeFiles.get(i).size()) 529 .append(" files: " + TraditionalBinaryPrefix.long2String( 530 StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";"); 531 } 532 sb.append("\n").append(state.stripeFiles.size()).append(" stripes total."); 533 sb.append("\n").append(getStorefileCount()).append(" files total."); 534 LOG.debug(sb.toString()); 535 } 536 537 /** 538 * Checks whether the key indicates an open interval boundary (i.e. infinity). 539 */ 540 private static final boolean isOpen(byte[] key) { 541 return key != null && key.length == 0; 542 } 543 544 private static final boolean isOpen(Cell key) { 545 return key != null && key.getRowLength() == 0; 546 } 547 548 /** 549 * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files). 550 */ 551 private static final boolean isInvalid(byte[] key) { 552 // No need to use Arrays.equals because INVALID_KEY is null 553 return key == INVALID_KEY; 554 } 555 556 /** 557 * Compare two keys for equality. 558 */ 559 private final boolean rowEquals(byte[] k1, byte[] k2) { 560 return Bytes.equals(k1, 0, k1.length, k2, 0, k2.length); 561 } 562 563 /** 564 * Compare two keys. Keys must not be open (isOpen(row) == false). 565 */ 566 private final int nonOpenRowCompare(byte[] k1, byte[] k2) { 567 assert !isOpen(k1) && !isOpen(k2); 568 return Bytes.compareTo(k1, k2); 569 } 570 571 private final int nonOpenRowCompare(Cell k1, byte[] k2) { 572 assert !isOpen(k1) && !isOpen(k2); 573 return cellComparator.compareRows(k1, k2, 0, k2.length); 574 } 575 576 /** 577 * Finds the stripe index by end row. 578 */ 579 private final int findStripeIndexByEndRow(byte[] endRow) { 580 assert !isInvalid(endRow); 581 if (isOpen(endRow)) return state.stripeEndRows.length; 582 return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR); 583 } 584 585 /** 586 * Finds the stripe index for the stripe containing a row provided externally for get/scan. 587 */ 588 private final int findStripeForRow(byte[] row, boolean isStart) { 589 if (isStart && Arrays.equals(row, HConstants.EMPTY_START_ROW)) return 0; 590 if (!isStart && Arrays.equals(row, HConstants.EMPTY_END_ROW)) { 591 return state.stripeFiles.size() - 1; 592 } 593 // If there's an exact match below, a stripe ends at "row". Stripe right boundary is 594 // exclusive, so that means the row is in the next stripe; thus, we need to add one to index. 595 // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where 596 // insertion point is the index of the next greater element, or list size if none. The 597 // insertion point happens to be exactly what we need, so we need to add one to the result. 598 return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1); 599 } 600 601 @Override 602 public final byte[] getStartRow(int stripeIndex) { 603 return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]); 604 } 605 606 @Override 607 public final byte[] getEndRow(int stripeIndex) { 608 return (stripeIndex == state.stripeEndRows.length 609 ? OPEN_KEY 610 : state.stripeEndRows[stripeIndex]); 611 } 612 613 private byte[] startOf(HStoreFile sf) { 614 byte[] result = fileStarts.get(sf); 615 616 // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter 617 // serves only as a marker and is not to be confused with other empty byte arrays. 618 // See Javadoc of INVALID_KEY_IN_MAP for more information 619 return (result == null) ? sf.getMetadataValue(STRIPE_START_KEY) 620 : result == INVALID_KEY_IN_MAP ? INVALID_KEY 621 : result; 622 } 623 624 private byte[] endOf(HStoreFile sf) { 625 byte[] result = fileEnds.get(sf); 626 627 // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter 628 // serves only as a marker and is not to be confused with other empty byte arrays. 629 // See Javadoc of INVALID_KEY_IN_MAP for more information 630 return (result == null) ? sf.getMetadataValue(STRIPE_END_KEY) 631 : result == INVALID_KEY_IN_MAP ? INVALID_KEY 632 : result; 633 } 634 635 /** 636 * Inserts a file in the correct place (by seqnum) in a stripe copy. 637 * @param stripe Stripe copy to insert into. 638 * @param sf File to insert. 639 */ 640 private static void insertFileIntoStripe(ArrayList<HStoreFile> stripe, HStoreFile sf) { 641 // The only operation for which sorting of the files matters is KeyBefore. Therefore, 642 // we will store the file in reverse order by seqNum from the outset. 643 for (int insertBefore = 0;; ++insertBefore) { 644 if ( 645 insertBefore == stripe.size() 646 || (StoreFileComparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0) 647 ) { 648 stripe.add(insertBefore, sf); 649 break; 650 } 651 } 652 } 653 654 /** 655 * An extension of ConcatenatedLists that has several peculiar properties. First, one can cut the 656 * tail of the logical list by removing last several sub-lists. Second, items can be removed thru 657 * iterator. Third, if the sub-lists are immutable, they are replaced with mutable copies when 658 * needed. On average KeyBefore operation will contain half the stripes as potential candidates, 659 * but will quickly cut down on them as it finds something in the more likely ones; thus, the 660 * above allow us to avoid unnecessary copying of a bunch of lists. 661 */ 662 private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<HStoreFile> { 663 @Override 664 public java.util.Iterator<HStoreFile> iterator() { 665 return new Iterator(); 666 } 667 668 public class Iterator extends ConcatenatedLists<HStoreFile>.Iterator { 669 public ArrayList<List<HStoreFile>> getComponents() { 670 return components; 671 } 672 673 public void removeComponents(int startIndex) { 674 List<List<HStoreFile>> subList = components.subList(startIndex, components.size()); 675 for (List<HStoreFile> entry : subList) { 676 size -= entry.size(); 677 } 678 assert size >= 0; 679 subList.clear(); 680 } 681 682 @Override 683 public void remove() { 684 if (!this.nextWasCalled) { 685 throw new IllegalStateException("No element to remove"); 686 } 687 this.nextWasCalled = false; 688 List<HStoreFile> src = components.get(currentComponent); 689 if (src instanceof ImmutableList<?>) { 690 src = new ArrayList<>(src); 691 components.set(currentComponent, src); 692 } 693 src.remove(indexWithinComponent); 694 --size; 695 --indexWithinComponent; 696 if (src.isEmpty()) { 697 components.remove(currentComponent); // indexWithinComponent is already -1 here. 698 } 699 } 700 } 701 } 702 703 /** 704 * Non-static helper class for merging compaction or flush results. Since we want to merge them 705 * atomically (more or less), it operates on lazy copies, then creates a new state object and puts 706 * it in place. 707 */ 708 private class CompactionOrFlushMergeCopy { 709 private ArrayList<List<HStoreFile>> stripeFiles = null; 710 private ArrayList<HStoreFile> level0Files = null; 711 private ArrayList<byte[]> stripeEndRows = null; 712 713 private Collection<HStoreFile> compactedFiles = null; 714 private Collection<HStoreFile> results = null; 715 716 private List<HStoreFile> l0Results = new ArrayList<>(); 717 private final boolean isFlush; 718 719 public CompactionOrFlushMergeCopy(boolean isFlush) { 720 // Create a lazy mutable copy (other fields are so lazy they start out as nulls). 721 this.stripeFiles = new ArrayList<>(StripeStoreFileManager.this.state.stripeFiles); 722 this.isFlush = isFlush; 723 } 724 725 private void mergeResults(Collection<HStoreFile> compactedFiles, 726 Collection<HStoreFile> results) { 727 assert this.compactedFiles == null && this.results == null; 728 this.compactedFiles = compactedFiles; 729 this.results = results; 730 // Do logical processing. 731 if (!isFlush) { 732 removeCompactedFiles(); 733 } 734 TreeMap<byte[], HStoreFile> newStripes = processResults(); 735 if (newStripes != null) { 736 processNewCandidateStripes(newStripes); 737 } 738 // Create new state and update parent. 739 State state = createNewState(false); 740 StripeStoreFileManager.this.state = state; 741 updateMetadataMaps(); 742 } 743 744 private void deleteResults(Collection<HStoreFile> compactedFiles) { 745 this.compactedFiles = compactedFiles; 746 // Create new state and update parent. 747 State state = createNewState(true); 748 StripeStoreFileManager.this.state = state; 749 updateMetadataMaps(); 750 } 751 752 private State createNewState(boolean delCompactedFiles) { 753 State oldState = StripeStoreFileManager.this.state; 754 // Stripe count should be the same unless the end rows changed. 755 assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null; 756 State newState = new State(); 757 newState.level0Files = 758 (this.level0Files == null) ? oldState.level0Files : ImmutableList.copyOf(this.level0Files); 759 newState.stripeEndRows = (this.stripeEndRows == null) 760 ? oldState.stripeEndRows 761 : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]); 762 newState.stripeFiles = new ArrayList<>(this.stripeFiles.size()); 763 for (List<HStoreFile> newStripe : this.stripeFiles) { 764 newState.stripeFiles.add(newStripe instanceof ImmutableList<?> 765 ? (ImmutableList<HStoreFile>) newStripe 766 : ImmutableList.copyOf(newStripe)); 767 } 768 769 List<HStoreFile> newAllFiles = new ArrayList<>(oldState.allFilesCached); 770 List<HStoreFile> newAllCompactedFiles = new ArrayList<>(oldState.allCompactedFilesCached); 771 if (!isFlush) { 772 newAllFiles.removeAll(compactedFiles); 773 if (delCompactedFiles) { 774 newAllCompactedFiles.removeAll(compactedFiles); 775 } else { 776 newAllCompactedFiles.addAll(compactedFiles); 777 } 778 } 779 if (results != null) { 780 newAllFiles.addAll(results); 781 } 782 newState.allFilesCached = ImmutableList.copyOf(newAllFiles); 783 newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles); 784 return newState; 785 } 786 787 private void updateMetadataMaps() { 788 StripeStoreFileManager parent = StripeStoreFileManager.this; 789 if (!isFlush) { 790 for (HStoreFile sf : this.compactedFiles) { 791 parent.fileStarts.remove(sf); 792 parent.fileEnds.remove(sf); 793 } 794 } 795 if (this.l0Results != null) { 796 for (HStoreFile sf : this.l0Results) { 797 parent.ensureLevel0Metadata(sf); 798 } 799 } 800 } 801 802 /** 803 * @param index Index of the stripe we need. 804 * @return A lazy stripe copy from current stripes. 805 */ 806 private final ArrayList<HStoreFile> getStripeCopy(int index) { 807 List<HStoreFile> stripeCopy = this.stripeFiles.get(index); 808 ArrayList<HStoreFile> result = null; 809 if (stripeCopy instanceof ImmutableList<?>) { 810 result = new ArrayList<>(stripeCopy); 811 this.stripeFiles.set(index, result); 812 } else { 813 result = (ArrayList<HStoreFile>) stripeCopy; 814 } 815 return result; 816 } 817 818 /** Returns A lazy L0 copy from current state. */ 819 private final ArrayList<HStoreFile> getLevel0Copy() { 820 if (this.level0Files == null) { 821 this.level0Files = new ArrayList<>(StripeStoreFileManager.this.state.level0Files); 822 } 823 return this.level0Files; 824 } 825 826 /** 827 * Process new files, and add them either to the structure of existing stripes, or to the list 828 * of new candidate stripes. 829 * @return New candidate stripes. 830 */ 831 private TreeMap<byte[], HStoreFile> processResults() { 832 TreeMap<byte[], HStoreFile> newStripes = null; 833 for (HStoreFile sf : this.results) { 834 byte[] startRow = startOf(sf), endRow = endOf(sf); 835 if (isInvalid(endRow) || isInvalid(startRow)) { 836 if (!isFlush) { 837 LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath()); 838 } 839 insertFileIntoStripe(getLevel0Copy(), sf); 840 this.l0Results.add(sf); 841 continue; 842 } 843 if (!this.stripeFiles.isEmpty()) { 844 int stripeIndex = findStripeIndexByEndRow(endRow); 845 if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) { 846 // Simple/common case - add file to an existing stripe. 847 insertFileIntoStripe(getStripeCopy(stripeIndex), sf); 848 continue; 849 } 850 } 851 852 // Make a new candidate stripe. 853 if (newStripes == null) { 854 newStripes = new TreeMap<>(MAP_COMPARATOR); 855 } 856 HStoreFile oldSf = newStripes.put(endRow, sf); 857 if (oldSf != null) { 858 throw new IllegalStateException( 859 "Compactor has produced multiple files for the stripe ending in [" 860 + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath()); 861 } 862 } 863 return newStripes; 864 } 865 866 /** 867 * Remove compacted files. 868 */ 869 private void removeCompactedFiles() { 870 for (HStoreFile oldFile : this.compactedFiles) { 871 byte[] oldEndRow = endOf(oldFile); 872 List<HStoreFile> source = null; 873 if (isInvalid(oldEndRow)) { 874 source = getLevel0Copy(); 875 } else { 876 int stripeIndex = findStripeIndexByEndRow(oldEndRow); 877 if (stripeIndex < 0) { 878 throw new IllegalStateException( 879 "An allegedly compacted file [" + oldFile + "] does not belong" 880 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])"); 881 } 882 source = getStripeCopy(stripeIndex); 883 } 884 if (!source.remove(oldFile)) { 885 LOG.warn("An allegedly compacted file [{}] was not found", oldFile); 886 } 887 } 888 } 889 890 /** 891 * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with new 892 * candidate stripes/removes old stripes; produces new set of stripe end rows. 893 * @param newStripes New stripes - files by end row. 894 */ 895 private void processNewCandidateStripes(TreeMap<byte[], HStoreFile> newStripes) { 896 // Validate that the removed and added aggregate ranges still make for a full key space. 897 boolean hasStripes = !this.stripeFiles.isEmpty(); 898 this.stripeEndRows = 899 new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows)); 900 int removeFrom = 0; 901 byte[] firstStartRow = startOf(newStripes.firstEntry().getValue()); 902 byte[] lastEndRow = newStripes.lastKey(); 903 if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) { 904 throw new IllegalStateException("Newly created stripes do not cover the entire key space."); 905 } 906 907 boolean canAddNewStripes = true; 908 Collection<HStoreFile> filesForL0 = null; 909 if (hasStripes) { 910 // Determine which stripes will need to be removed because they conflict with new stripes. 911 // The new boundaries should match old stripe boundaries, so we should get exact matches. 912 if (isOpen(firstStartRow)) { 913 removeFrom = 0; 914 } else { 915 removeFrom = findStripeIndexByEndRow(firstStartRow); 916 if (removeFrom < 0) { 917 throw new IllegalStateException("Compaction is trying to add a bad range."); 918 } 919 ++removeFrom; 920 } 921 int removeTo = findStripeIndexByEndRow(lastEndRow); 922 if (removeTo < 0) { 923 throw new IllegalStateException("Compaction is trying to add a bad range."); 924 } 925 // See if there are files in the stripes we are trying to replace. 926 ArrayList<HStoreFile> conflictingFiles = new ArrayList<>(); 927 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { 928 conflictingFiles.addAll(this.stripeFiles.get(removeIndex)); 929 } 930 if (!conflictingFiles.isEmpty()) { 931 // This can be caused by two things - concurrent flush into stripes, or a bug. 932 // Unfortunately, we cannot tell them apart without looking at timing or something 933 // like that. We will assume we are dealing with a flush and dump it into L0. 934 if (isFlush) { 935 long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values()); 936 LOG.warn("Stripes were created by a flush, but results of size " + newSize 937 + " cannot be added because the stripes have changed"); 938 canAddNewStripes = false; 939 filesForL0 = newStripes.values(); 940 } else { 941 long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles); 942 LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) " 943 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change"); 944 filesForL0 = conflictingFiles; 945 } 946 if (filesForL0 != null) { 947 for (HStoreFile sf : filesForL0) { 948 insertFileIntoStripe(getLevel0Copy(), sf); 949 } 950 l0Results.addAll(filesForL0); 951 } 952 } 953 954 if (canAddNewStripes) { 955 // Remove old empty stripes. 956 int originalCount = this.stripeFiles.size(); 957 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { 958 if (removeIndex != originalCount - 1) { 959 this.stripeEndRows.remove(removeIndex); 960 } 961 this.stripeFiles.remove(removeIndex); 962 } 963 } 964 } 965 966 if (!canAddNewStripes) { 967 return; // Files were already put into L0. 968 } 969 970 // Now, insert new stripes. The total ranges match, so we can insert where we removed. 971 byte[] previousEndRow = null; 972 int insertAt = removeFrom; 973 for (Map.Entry<byte[], HStoreFile> newStripe : newStripes.entrySet()) { 974 if (previousEndRow != null) { 975 // Validate that the ranges are contiguous. 976 assert !isOpen(previousEndRow); 977 byte[] startRow = startOf(newStripe.getValue()); 978 if (!rowEquals(previousEndRow, startRow)) { 979 throw new IllegalStateException("The new stripes produced by " 980 + (isFlush ? "flush" : "compaction") + " are not contiguous"); 981 } 982 } 983 // Add the new stripe. 984 ArrayList<HStoreFile> tmp = new ArrayList<>(); 985 tmp.add(newStripe.getValue()); 986 stripeFiles.add(insertAt, tmp); 987 previousEndRow = newStripe.getKey(); 988 if (!isOpen(previousEndRow)) { 989 stripeEndRows.add(insertAt, previousEndRow); 990 } 991 ++insertAt; 992 } 993 } 994 } 995 996 @Override 997 public List<HStoreFile> getLevel0Files() { 998 return this.state.level0Files; 999 } 1000 1001 @Override 1002 public List<byte[]> getStripeBoundaries() { 1003 if (this.state.stripeFiles.isEmpty()) { 1004 return Collections.emptyList(); 1005 } 1006 ArrayList<byte[]> result = new ArrayList<>(this.state.stripeEndRows.length + 2); 1007 result.add(OPEN_KEY); 1008 Collections.addAll(result, this.state.stripeEndRows); 1009 result.add(OPEN_KEY); 1010 return result; 1011 } 1012 1013 @Override 1014 public ArrayList<ImmutableList<HStoreFile>> getStripes() { 1015 return this.state.stripeFiles; 1016 } 1017 1018 @Override 1019 public int getStripeCount() { 1020 return this.state.stripeFiles.size(); 1021 } 1022 1023 @Override 1024 public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) { 1025 // 1) We can never get rid of the last file which has the maximum seqid in a stripe. 1026 // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. 1027 State state = this.state; 1028 Collection<HStoreFile> expiredStoreFiles = null; 1029 for (ImmutableList<HStoreFile> stripe : state.stripeFiles) { 1030 expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles); 1031 } 1032 return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles); 1033 } 1034 1035 private Collection<HStoreFile> findExpiredFiles(ImmutableList<HStoreFile> stripe, long maxTs, 1036 List<HStoreFile> filesCompacting, Collection<HStoreFile> expiredStoreFiles) { 1037 // Order by seqnum is reversed. 1038 for (int i = 1; i < stripe.size(); ++i) { 1039 HStoreFile sf = stripe.get(i); 1040 synchronized (sf) { 1041 long fileTs = sf.getReader().getMaxTimestamp(); 1042 if (fileTs < maxTs && !filesCompacting.contains(sf)) { 1043 LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimestamp is " 1044 + fileTs + ", which is below " + maxTs); 1045 if (expiredStoreFiles == null) { 1046 expiredStoreFiles = new ArrayList<>(); 1047 } 1048 expiredStoreFiles.add(sf); 1049 } 1050 } 1051 } 1052 return expiredStoreFiles; 1053 } 1054 1055 @Override 1056 public double getCompactionPressure() { 1057 State stateLocal = this.state; 1058 if (stateLocal.allFilesCached.size() > blockingFileCount) { 1059 // just a hit to tell others that we have reached the blocking file count. 1060 return 2.0; 1061 } 1062 if (stateLocal.stripeFiles.isEmpty()) { 1063 return 0.0; 1064 } 1065 int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size(); 1066 // do not calculate L0 separately because data will be moved to stripe quickly and in most cases 1067 // we flush data to stripe directly. 1068 int delta = stateLocal.level0Files.isEmpty() ? 0 : 1; 1069 double max = 0.0; 1070 for (ImmutableList<HStoreFile> stripeFile : stateLocal.stripeFiles) { 1071 int stripeFileCount = stripeFile.size(); 1072 double normCount = (double) (stripeFileCount + delta - config.getStripeCompactMinFiles()) 1073 / (blockingFilePerStripe - config.getStripeCompactMinFiles()); 1074 if (normCount >= 1.0) { 1075 // This could happen if stripe is not split evenly. Do not return values that larger than 1076 // 1.0 because we have not reached the blocking file count actually. 1077 return 1.0; 1078 } 1079 if (normCount > max) { 1080 max = normCount; 1081 } 1082 } 1083 return max; 1084 } 1085 1086 @Override 1087 public Comparator<HStoreFile> getStoreFileComparator() { 1088 return StoreFileComparators.SEQ_ID; 1089 } 1090}