001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.Comparator; 027import java.util.HashMap; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.TreeMap; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellComparator; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.ConcatenatedLists; 043import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 048import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 049 050/** 051 * Stripe implementation of StoreFileManager. 052 * Not thread safe - relies on external locking (in HStore). Collections that this class 053 * returns are immutable or unique to the call, so they should be safe. 054 * Stripe store splits the key space of the region into non-overlapping stripes, as well as 055 * some recent files that have all the keys (level 0). Each stripe contains a set of files. 056 * When L0 is compacted, it's split into the files corresponding to existing stripe boundaries, 057 * that can thus be added to stripes. 058 * When scan or get happens, it only has to read the files from the corresponding stripes. 059 * See StripeCompationPolicy on how the stripes are determined; this class doesn't care. 060 * 061 * This class should work together with StripeCompactionPolicy and StripeCompactor. 062 * With regard to how they work, we make at least the following (reasonable) assumptions: 063 * - Compaction produces one file per new stripe (if any); that is easy to change. 064 * - Compaction has one contiguous set of stripes both in and out, except if L0 is involved. 065 */ 066@InterfaceAudience.Private 067public class StripeStoreFileManager 068 implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider { 069 private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFileManager.class); 070 071 /** 072 * The file metadata fields that contain the stripe information. 073 */ 074 public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY"); 075 public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY"); 076 077 private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator(); 078 079 /** 080 * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf). 081 */ 082 public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY; 083 final static byte[] INVALID_KEY = null; 084 085 /** 086 * The state class. Used solely to replace results atomically during 087 * compactions and avoid complicated error handling. 088 */ 089 private static class State { 090 /** 091 * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored 092 * here. It is invariant that the start row of the stripe is the end row of the previous one 093 * (and is an open boundary for the first one). 094 */ 095 public byte[][] stripeEndRows = new byte[0][]; 096 097 /** 098 * Files by stripe. Each element of the list corresponds to stripeEndRow element with the 099 * same index, except the last one. Inside each list, the files are in reverse order by 100 * seqNum. Note that the length of this is one higher than that of stripeEndKeys. 101 */ 102 public ArrayList<ImmutableList<HStoreFile>> stripeFiles = new ArrayList<>(); 103 /** Level 0. The files are in reverse order by seqNum. */ 104 public ImmutableList<HStoreFile> level0Files = ImmutableList.of(); 105 106 /** Cached list of all files in the structure, to return from some calls */ 107 public ImmutableList<HStoreFile> allFilesCached = ImmutableList.of(); 108 private ImmutableList<HStoreFile> allCompactedFilesCached = ImmutableList.of(); 109 } 110 private State state = null; 111 112 /** Cached file metadata (or overrides as the case may be) */ 113 private HashMap<HStoreFile, byte[]> fileStarts = new HashMap<>(); 114 private HashMap<HStoreFile, byte[]> fileEnds = new HashMap<>(); 115 /** Normally invalid key is null, but in the map null is the result for "no key"; so use 116 * the following constant value in these maps instead. Note that this is a constant and 117 * we use it to compare by reference when we read from the map. */ 118 private static final byte[] INVALID_KEY_IN_MAP = new byte[0]; 119 120 private final CellComparator cellComparator; 121 private StripeStoreConfig config; 122 123 private final int blockingFileCount; 124 125 public StripeStoreFileManager( 126 CellComparator kvComparator, Configuration conf, StripeStoreConfig config) { 127 this.cellComparator = kvComparator; 128 this.config = config; 129 this.blockingFileCount = conf.getInt( 130 HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); 131 } 132 133 @Override 134 public void loadFiles(List<HStoreFile> storeFiles) { 135 loadUnclassifiedStoreFiles(storeFiles); 136 } 137 138 @Override 139 public Collection<HStoreFile> getStorefiles() { 140 return state.allFilesCached; 141 } 142 143 @Override 144 public Collection<HStoreFile> getCompactedfiles() { 145 return state.allCompactedFilesCached; 146 } 147 148 @Override 149 public int getCompactedFilesCount() { 150 return state.allCompactedFilesCached.size(); 151 } 152 153 @Override 154 public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException { 155 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); 156 // Passing null does not cause NPE?? 157 cmc.mergeResults(null, sfs); 158 debugDumpState("Added new files"); 159 } 160 161 @Override 162 public ImmutableCollection<HStoreFile> clearFiles() { 163 ImmutableCollection<HStoreFile> result = state.allFilesCached; 164 this.state = new State(); 165 this.fileStarts.clear(); 166 this.fileEnds.clear(); 167 return result; 168 } 169 170 @Override 171 public ImmutableCollection<HStoreFile> clearCompactedFiles() { 172 ImmutableCollection<HStoreFile> result = state.allCompactedFilesCached; 173 this.state = new State(); 174 return result; 175 } 176 177 @Override 178 public int getStorefileCount() { 179 return state.allFilesCached.size(); 180 } 181 182 /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} 183 * for details on this methods. */ 184 @Override 185 public Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) { 186 KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists(); 187 // Order matters for this call. 188 result.addSublist(state.level0Files); 189 if (!state.stripeFiles.isEmpty()) { 190 int lastStripeIndex = findStripeForRow(CellUtil.cloneRow(targetKey), false); 191 for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) { 192 result.addSublist(state.stripeFiles.get(stripeIndex)); 193 } 194 } 195 return result.iterator(); 196 } 197 198 /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and 199 * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, Cell)} 200 * for details on this methods. */ 201 @Override 202 public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore( 203 Iterator<HStoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) { 204 KeyBeforeConcatenatedLists.Iterator original = 205 (KeyBeforeConcatenatedLists.Iterator)candidateFiles; 206 assert original != null; 207 ArrayList<List<HStoreFile>> components = original.getComponents(); 208 for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) { 209 HStoreFile sf = components.get(firstIrrelevant).get(0); 210 byte[] endKey = endOf(sf); 211 // Entries are ordered as such: L0, then stripes in reverse order. We never remove 212 // level 0; we remove the stripe, and all subsequent ones, as soon as we find the 213 // first one that cannot possibly have better candidates. 214 if (!isInvalid(endKey) && !isOpen(endKey) 215 && (nonOpenRowCompare(targetKey, endKey) >= 0)) { 216 original.removeComponents(firstIrrelevant); 217 break; 218 } 219 } 220 return original; 221 } 222 223 /** 224 * Override of getSplitPoint that determines the split point as the boundary between two 225 * stripes, unless it causes significant imbalance between split sides' sizes. In that 226 * case, the split boundary will be chosen from the middle of one of the stripes to 227 * minimize imbalance. 228 * @return The split point, or null if no split is possible. 229 */ 230 @Override 231 public Optional<byte[]> getSplitPoint() throws IOException { 232 if (this.getStorefileCount() == 0) { 233 return Optional.empty(); 234 } 235 if (state.stripeFiles.size() <= 1) { 236 return getSplitPointFromAllFiles(); 237 } 238 int leftIndex = -1, rightIndex = state.stripeFiles.size(); 239 long leftSize = 0, rightSize = 0; 240 long lastLeftSize = 0, lastRightSize = 0; 241 while (rightIndex - 1 != leftIndex) { 242 if (leftSize >= rightSize) { 243 --rightIndex; 244 lastRightSize = getStripeFilesSize(rightIndex); 245 rightSize += lastRightSize; 246 } else { 247 ++leftIndex; 248 lastLeftSize = getStripeFilesSize(leftIndex); 249 leftSize += lastLeftSize; 250 } 251 } 252 if (leftSize == 0 || rightSize == 0) { 253 String errMsg = String.format("Cannot split on a boundary - left index %d size %d, " 254 + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize); 255 debugDumpState(errMsg); 256 LOG.warn(errMsg); 257 return getSplitPointFromAllFiles(); 258 } 259 double ratio = (double)rightSize / leftSize; 260 if (ratio < 1) { 261 ratio = 1 / ratio; 262 } 263 if (config.getMaxSplitImbalance() > ratio) { 264 return Optional.of(state.stripeEndRows[leftIndex]); 265 } 266 267 // If the difference between the sides is too large, we could get the proportional key on 268 // the a stripe to equalize the difference, but there's no proportional key method at the 269 // moment, and it's not extremely important. 270 // See if we can achieve better ratio if we split the bigger side in half. 271 boolean isRightLarger = rightSize >= leftSize; 272 double newRatio = isRightLarger 273 ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize) 274 : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize); 275 if (newRatio < 1) { 276 newRatio = 1 / newRatio; 277 } 278 if (newRatio >= ratio) { 279 return Optional.of(state.stripeEndRows[leftIndex]); 280 } 281 LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split " 282 + newRatio + " configured ratio " + config.getMaxSplitImbalance()); 283 // OK, we may get better ratio, get it. 284 return StoreUtils.getSplitPoint(state.stripeFiles.get(isRightLarger ? rightIndex : leftIndex), 285 cellComparator); 286 } 287 288 private Optional<byte[]> getSplitPointFromAllFiles() throws IOException { 289 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 290 sfs.addSublist(state.level0Files); 291 sfs.addAllSublists(state.stripeFiles); 292 return StoreUtils.getSplitPoint(sfs, cellComparator); 293 } 294 295 private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) { 296 return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f); 297 } 298 299 @Override 300 public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow, 301 byte[] stopRow, boolean includeStopRow) { 302 if (state.stripeFiles.isEmpty()) { 303 return state.level0Files; // There's just L0. 304 } 305 306 int firstStripe = findStripeForRow(startRow, true); 307 int lastStripe = findStripeForRow(stopRow, false); 308 assert firstStripe <= lastStripe; 309 if (firstStripe == lastStripe && state.level0Files.isEmpty()) { 310 return state.stripeFiles.get(firstStripe); // There's just one stripe we need. 311 } 312 if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) { 313 return state.allFilesCached; // We need to read all files. 314 } 315 316 ConcatenatedLists<HStoreFile> result = new ConcatenatedLists<>(); 317 result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1)); 318 result.addSublist(state.level0Files); 319 return result; 320 } 321 322 @Override 323 public void addCompactionResults( 324 Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results) throws IOException { 325 // See class comment for the assumptions we make here. 326 LOG.debug("Attempting to merge compaction results: " + compactedFiles.size() 327 + " files replaced by " + results.size()); 328 // In order to be able to fail in the middle of the operation, we'll operate on lazy 329 // copies and apply the result at the end. 330 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); 331 cmc.mergeResults(compactedFiles, results); 332 markCompactedAway(compactedFiles); 333 debugDumpState("Merged compaction results"); 334 } 335 336 // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised 337 // Let a background thread close the actual reader on these compacted files and also 338 // ensure to evict the blocks from block cache so that they are no longer in 339 // cache 340 private void markCompactedAway(Collection<HStoreFile> compactedFiles) { 341 for (HStoreFile file : compactedFiles) { 342 file.markCompactedAway(); 343 } 344 } 345 346 @Override 347 public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException { 348 // See class comment for the assumptions we make here. 349 LOG.debug("Attempting to delete compaction results: " + compactedFiles.size()); 350 // In order to be able to fail in the middle of the operation, we'll operate on lazy 351 // copies and apply the result at the end. 352 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); 353 cmc.deleteResults(compactedFiles); 354 debugDumpState("Deleted compaction results"); 355 } 356 357 @Override 358 public int getStoreCompactionPriority() { 359 // If there's only L0, do what the default store does. 360 // If we are in critical priority, do the same - we don't want to trump all stores all 361 // the time due to how many files we have. 362 int fc = getStorefileCount(); 363 if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) { 364 return this.blockingFileCount - fc; 365 } 366 // If we are in good shape, we don't want to be trumped by all other stores due to how 367 // many files we have, so do an approximate mapping to normal priority range; L0 counts 368 // for all stripes. 369 int l0 = state.level0Files.size(), sc = state.stripeFiles.size(); 370 int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0); 371 return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority; 372 } 373 374 /** 375 * Gets the total size of all files in the stripe. 376 * @param stripeIndex Stripe index. 377 * @return Size. 378 */ 379 private long getStripeFilesSize(int stripeIndex) { 380 long result = 0; 381 for (HStoreFile sf : state.stripeFiles.get(stripeIndex)) { 382 result += sf.getReader().length(); 383 } 384 return result; 385 } 386 387 /** 388 * Loads initial store files that were picked up from some physical location pertaining to 389 * this store (presumably). Unlike adding files after compaction, assumes empty initial 390 * sets, and is forgiving with regard to stripe constraints - at worst, many/all files will 391 * go to level 0. 392 * @param storeFiles Store files to add. 393 */ 394 private void loadUnclassifiedStoreFiles(List<HStoreFile> storeFiles) { 395 LOG.debug("Attempting to load " + storeFiles.size() + " store files."); 396 TreeMap<byte[], ArrayList<HStoreFile>> candidateStripes = new TreeMap<>(MAP_COMPARATOR); 397 ArrayList<HStoreFile> level0Files = new ArrayList<>(); 398 // Separate the files into tentative stripes; then validate. Currently, we rely on metadata. 399 // If needed, we could dynamically determine the stripes in future. 400 for (HStoreFile sf : storeFiles) { 401 byte[] startRow = startOf(sf), endRow = endOf(sf); 402 // Validate the range and put the files into place. 403 if (isInvalid(startRow) || isInvalid(endRow)) { 404 insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0. 405 ensureLevel0Metadata(sf); 406 } else if (!isOpen(startRow) && !isOpen(endRow) && 407 nonOpenRowCompare(startRow, endRow) >= 0) { 408 LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row [" 409 + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0"); 410 insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also. 411 ensureLevel0Metadata(sf); 412 } else { 413 ArrayList<HStoreFile> stripe = candidateStripes.get(endRow); 414 if (stripe == null) { 415 stripe = new ArrayList<>(); 416 candidateStripes.put(endRow, stripe); 417 } 418 insertFileIntoStripe(stripe, sf); 419 } 420 } 421 // Possible improvement - for variable-count stripes, if all the files are in L0, we can 422 // instead create single, open-ended stripe with all files. 423 424 boolean hasOverlaps = false; 425 byte[] expectedStartRow = null; // first stripe can start wherever 426 Iterator<Map.Entry<byte[], ArrayList<HStoreFile>>> entryIter = 427 candidateStripes.entrySet().iterator(); 428 while (entryIter.hasNext()) { 429 Map.Entry<byte[], ArrayList<HStoreFile>> entry = entryIter.next(); 430 ArrayList<HStoreFile> files = entry.getValue(); 431 // Validate the file start rows, and remove the bad ones to level 0. 432 for (int i = 0; i < files.size(); ++i) { 433 HStoreFile sf = files.get(i); 434 byte[] startRow = startOf(sf); 435 if (expectedStartRow == null) { 436 expectedStartRow = startRow; // ensure that first stripe is still consistent 437 } else if (!rowEquals(expectedStartRow, startRow)) { 438 hasOverlaps = true; 439 LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at [" 440 + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow) 441 + "], to L0 it goes"); 442 HStoreFile badSf = files.remove(i); 443 insertFileIntoStripe(level0Files, badSf); 444 ensureLevel0Metadata(badSf); 445 --i; 446 } 447 } 448 // Check if any files from the candidate stripe are valid. If so, add a stripe. 449 byte[] endRow = entry.getKey(); 450 if (!files.isEmpty()) { 451 expectedStartRow = endRow; // Next stripe must start exactly at that key. 452 } else { 453 entryIter.remove(); 454 } 455 } 456 457 // In the end, there must be open ends on two sides. If not, and there were no errors i.e. 458 // files are consistent, they might be coming from a split. We will treat the boundaries 459 // as open keys anyway, and log the message. 460 // If there were errors, we'll play it safe and dump everything into L0. 461 if (!candidateStripes.isEmpty()) { 462 HStoreFile firstFile = candidateStripes.firstEntry().getValue().get(0); 463 boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey()); 464 if (!isOpen) { 465 LOG.warn("The range of the loaded files does not cover full key space: from [" 466 + Bytes.toString(startOf(firstFile)) + "], to [" 467 + Bytes.toString(candidateStripes.lastKey()) + "]"); 468 if (!hasOverlaps) { 469 ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true); 470 ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false); 471 } else { 472 LOG.warn("Inconsistent files, everything goes to L0."); 473 for (ArrayList<HStoreFile> files : candidateStripes.values()) { 474 for (HStoreFile sf : files) { 475 insertFileIntoStripe(level0Files, sf); 476 ensureLevel0Metadata(sf); 477 } 478 } 479 candidateStripes.clear(); 480 } 481 } 482 } 483 484 // Copy the results into the fields. 485 State state = new State(); 486 state.level0Files = ImmutableList.copyOf(level0Files); 487 state.stripeFiles = new ArrayList<>(candidateStripes.size()); 488 state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][]; 489 ArrayList<HStoreFile> newAllFiles = new ArrayList<>(level0Files); 490 int i = candidateStripes.size() - 1; 491 for (Map.Entry<byte[], ArrayList<HStoreFile>> entry : candidateStripes.entrySet()) { 492 state.stripeFiles.add(ImmutableList.copyOf(entry.getValue())); 493 newAllFiles.addAll(entry.getValue()); 494 if (i > 0) { 495 state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey(); 496 } 497 --i; 498 } 499 state.allFilesCached = ImmutableList.copyOf(newAllFiles); 500 this.state = state; 501 debugDumpState("Files loaded"); 502 } 503 504 private void ensureEdgeStripeMetadata(ArrayList<HStoreFile> stripe, boolean isFirst) { 505 HashMap<HStoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds; 506 for (HStoreFile sf : stripe) { 507 targetMap.put(sf, OPEN_KEY); 508 } 509 } 510 511 private void ensureLevel0Metadata(HStoreFile sf) { 512 if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP); 513 if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP); 514 } 515 516 private void debugDumpState(String string) { 517 if (!LOG.isDebugEnabled()) return; 518 StringBuilder sb = new StringBuilder(); 519 sb.append("\n" + string + "; current stripe state is as such:"); 520 sb.append("\n level 0 with ") 521 .append(state.level0Files.size()) 522 .append( 523 " files: " 524 + TraditionalBinaryPrefix.long2String( 525 StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";"); 526 for (int i = 0; i < state.stripeFiles.size(); ++i) { 527 String endRow = (i == state.stripeEndRows.length) 528 ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]"; 529 sb.append("\n stripe ending in ") 530 .append(endRow) 531 .append(" with ") 532 .append(state.stripeFiles.get(i).size()) 533 .append( 534 " files: " 535 + TraditionalBinaryPrefix.long2String( 536 StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";"); 537 } 538 sb.append("\n").append(state.stripeFiles.size()).append(" stripes total."); 539 sb.append("\n").append(getStorefileCount()).append(" files total."); 540 LOG.debug(sb.toString()); 541 } 542 543 /** 544 * Checks whether the key indicates an open interval boundary (i.e. infinity). 545 */ 546 private static final boolean isOpen(byte[] key) { 547 return key != null && key.length == 0; 548 } 549 550 private static final boolean isOpen(Cell key) { 551 return key != null && key.getRowLength() == 0; 552 } 553 554 /** 555 * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files). 556 */ 557 private static final boolean isInvalid(byte[] key) { 558 // No need to use Arrays.equals because INVALID_KEY is null 559 return key == INVALID_KEY; 560 } 561 562 /** 563 * Compare two keys for equality. 564 */ 565 private final boolean rowEquals(byte[] k1, byte[] k2) { 566 return Bytes.equals(k1, 0, k1.length, k2, 0, k2.length); 567 } 568 569 /** 570 * Compare two keys. Keys must not be open (isOpen(row) == false). 571 */ 572 private final int nonOpenRowCompare(byte[] k1, byte[] k2) { 573 assert !isOpen(k1) && !isOpen(k2); 574 return Bytes.compareTo(k1, k2); 575 } 576 577 private final int nonOpenRowCompare(Cell k1, byte[] k2) { 578 assert !isOpen(k1) && !isOpen(k2); 579 return cellComparator.compareRows(k1, k2, 0, k2.length); 580 } 581 582 /** 583 * Finds the stripe index by end row. 584 */ 585 private final int findStripeIndexByEndRow(byte[] endRow) { 586 assert !isInvalid(endRow); 587 if (isOpen(endRow)) return state.stripeEndRows.length; 588 return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR); 589 } 590 591 /** 592 * Finds the stripe index for the stripe containing a row provided externally for get/scan. 593 */ 594 private final int findStripeForRow(byte[] row, boolean isStart) { 595 if (isStart && Arrays.equals(row, HConstants.EMPTY_START_ROW)) return 0; 596 if (!isStart && Arrays.equals(row, HConstants.EMPTY_END_ROW)) { 597 return state.stripeFiles.size() - 1; 598 } 599 // If there's an exact match below, a stripe ends at "row". Stripe right boundary is 600 // exclusive, so that means the row is in the next stripe; thus, we need to add one to index. 601 // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where 602 // insertion point is the index of the next greater element, or list size if none. The 603 // insertion point happens to be exactly what we need, so we need to add one to the result. 604 return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1); 605 } 606 607 @Override 608 public final byte[] getStartRow(int stripeIndex) { 609 return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]); 610 } 611 612 @Override 613 public final byte[] getEndRow(int stripeIndex) { 614 return (stripeIndex == state.stripeEndRows.length 615 ? OPEN_KEY : state.stripeEndRows[stripeIndex]); 616 } 617 618 619 private byte[] startOf(HStoreFile sf) { 620 byte[] result = fileStarts.get(sf); 621 622 // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter 623 // serves only as a marker and is not to be confused with other empty byte arrays. 624 // See Javadoc of INVALID_KEY_IN_MAP for more information 625 return (result == null) 626 ? sf.getMetadataValue(STRIPE_START_KEY) 627 : result == INVALID_KEY_IN_MAP ? INVALID_KEY : result; 628 } 629 630 private byte[] endOf(HStoreFile sf) { 631 byte[] result = fileEnds.get(sf); 632 633 // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter 634 // serves only as a marker and is not to be confused with other empty byte arrays. 635 // See Javadoc of INVALID_KEY_IN_MAP for more information 636 return (result == null) 637 ? sf.getMetadataValue(STRIPE_END_KEY) 638 : result == INVALID_KEY_IN_MAP ? INVALID_KEY : result; 639 } 640 641 /** 642 * Inserts a file in the correct place (by seqnum) in a stripe copy. 643 * @param stripe Stripe copy to insert into. 644 * @param sf File to insert. 645 */ 646 private static void insertFileIntoStripe(ArrayList<HStoreFile> stripe, HStoreFile sf) { 647 // The only operation for which sorting of the files matters is KeyBefore. Therefore, 648 // we will store the file in reverse order by seqNum from the outset. 649 for (int insertBefore = 0; ; ++insertBefore) { 650 if (insertBefore == stripe.size() 651 || (StoreFileComparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) { 652 stripe.add(insertBefore, sf); 653 break; 654 } 655 } 656 } 657 658 /** 659 * An extension of ConcatenatedLists that has several peculiar properties. 660 * First, one can cut the tail of the logical list by removing last several sub-lists. 661 * Second, items can be removed thru iterator. 662 * Third, if the sub-lists are immutable, they are replaced with mutable copies when needed. 663 * On average KeyBefore operation will contain half the stripes as potential candidates, 664 * but will quickly cut down on them as it finds something in the more likely ones; thus, 665 * the above allow us to avoid unnecessary copying of a bunch of lists. 666 */ 667 private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<HStoreFile> { 668 @Override 669 public java.util.Iterator<HStoreFile> iterator() { 670 return new Iterator(); 671 } 672 673 public class Iterator extends ConcatenatedLists<HStoreFile>.Iterator { 674 public ArrayList<List<HStoreFile>> getComponents() { 675 return components; 676 } 677 678 public void removeComponents(int startIndex) { 679 List<List<HStoreFile>> subList = components.subList(startIndex, components.size()); 680 for (List<HStoreFile> entry : subList) { 681 size -= entry.size(); 682 } 683 assert size >= 0; 684 subList.clear(); 685 } 686 687 @Override 688 public void remove() { 689 if (!this.nextWasCalled) { 690 throw new IllegalStateException("No element to remove"); 691 } 692 this.nextWasCalled = false; 693 List<HStoreFile> src = components.get(currentComponent); 694 if (src instanceof ImmutableList<?>) { 695 src = new ArrayList<>(src); 696 components.set(currentComponent, src); 697 } 698 src.remove(indexWithinComponent); 699 --size; 700 --indexWithinComponent; 701 if (src.isEmpty()) { 702 components.remove(currentComponent); // indexWithinComponent is already -1 here. 703 } 704 } 705 } 706 } 707 708 /** 709 * Non-static helper class for merging compaction or flush results. 710 * Since we want to merge them atomically (more or less), it operates on lazy copies, 711 * then creates a new state object and puts it in place. 712 */ 713 private class CompactionOrFlushMergeCopy { 714 private ArrayList<List<HStoreFile>> stripeFiles = null; 715 private ArrayList<HStoreFile> level0Files = null; 716 private ArrayList<byte[]> stripeEndRows = null; 717 718 private Collection<HStoreFile> compactedFiles = null; 719 private Collection<HStoreFile> results = null; 720 721 private List<HStoreFile> l0Results = new ArrayList<>(); 722 private final boolean isFlush; 723 724 public CompactionOrFlushMergeCopy(boolean isFlush) { 725 // Create a lazy mutable copy (other fields are so lazy they start out as nulls). 726 this.stripeFiles = new ArrayList<>(StripeStoreFileManager.this.state.stripeFiles); 727 this.isFlush = isFlush; 728 } 729 730 private void mergeResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results) 731 throws IOException { 732 assert this.compactedFiles == null && this.results == null; 733 this.compactedFiles = compactedFiles; 734 this.results = results; 735 // Do logical processing. 736 if (!isFlush) removeCompactedFiles(); 737 TreeMap<byte[], HStoreFile> newStripes = processResults(); 738 if (newStripes != null) { 739 processNewCandidateStripes(newStripes); 740 } 741 // Create new state and update parent. 742 State state = createNewState(false); 743 StripeStoreFileManager.this.state = state; 744 updateMetadataMaps(); 745 } 746 747 private void deleteResults(Collection<HStoreFile> compactedFiles) throws IOException { 748 this.compactedFiles = compactedFiles; 749 // Create new state and update parent. 750 State state = createNewState(true); 751 StripeStoreFileManager.this.state = state; 752 updateMetadataMaps(); 753 } 754 755 private State createNewState(boolean delCompactedFiles) { 756 State oldState = StripeStoreFileManager.this.state; 757 // Stripe count should be the same unless the end rows changed. 758 assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null; 759 State newState = new State(); 760 newState.level0Files = (this.level0Files == null) ? oldState.level0Files 761 : ImmutableList.copyOf(this.level0Files); 762 newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows 763 : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]); 764 newState.stripeFiles = new ArrayList<>(this.stripeFiles.size()); 765 for (List<HStoreFile> newStripe : this.stripeFiles) { 766 newState.stripeFiles.add(newStripe instanceof ImmutableList<?> 767 ? (ImmutableList<HStoreFile>)newStripe : ImmutableList.copyOf(newStripe)); 768 } 769 770 List<HStoreFile> newAllFiles = new ArrayList<>(oldState.allFilesCached); 771 List<HStoreFile> newAllCompactedFiles = new ArrayList<>(oldState.allCompactedFilesCached); 772 if (!isFlush) { 773 newAllFiles.removeAll(compactedFiles); 774 if (delCompactedFiles) { 775 newAllCompactedFiles.removeAll(compactedFiles); 776 } else { 777 newAllCompactedFiles.addAll(compactedFiles); 778 } 779 } 780 if (results != null) { 781 newAllFiles.addAll(results); 782 } 783 newState.allFilesCached = ImmutableList.copyOf(newAllFiles); 784 newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles); 785 return newState; 786 } 787 788 private void updateMetadataMaps() { 789 StripeStoreFileManager parent = StripeStoreFileManager.this; 790 if (!isFlush) { 791 for (HStoreFile sf : this.compactedFiles) { 792 parent.fileStarts.remove(sf); 793 parent.fileEnds.remove(sf); 794 } 795 } 796 if (this.l0Results != null) { 797 for (HStoreFile sf : this.l0Results) { 798 parent.ensureLevel0Metadata(sf); 799 } 800 } 801 } 802 803 /** 804 * @param index Index of the stripe we need. 805 * @return A lazy stripe copy from current stripes. 806 */ 807 private final ArrayList<HStoreFile> getStripeCopy(int index) { 808 List<HStoreFile> stripeCopy = this.stripeFiles.get(index); 809 ArrayList<HStoreFile> result = null; 810 if (stripeCopy instanceof ImmutableList<?>) { 811 result = new ArrayList<>(stripeCopy); 812 this.stripeFiles.set(index, result); 813 } else { 814 result = (ArrayList<HStoreFile>)stripeCopy; 815 } 816 return result; 817 } 818 819 /** 820 * @return A lazy L0 copy from current state. 821 */ 822 private final ArrayList<HStoreFile> getLevel0Copy() { 823 if (this.level0Files == null) { 824 this.level0Files = new ArrayList<>(StripeStoreFileManager.this.state.level0Files); 825 } 826 return this.level0Files; 827 } 828 829 /** 830 * Process new files, and add them either to the structure of existing stripes, 831 * or to the list of new candidate stripes. 832 * @return New candidate stripes. 833 */ 834 private TreeMap<byte[], HStoreFile> processResults() throws IOException { 835 TreeMap<byte[], HStoreFile> newStripes = null; 836 for (HStoreFile sf : this.results) { 837 byte[] startRow = startOf(sf), endRow = endOf(sf); 838 if (isInvalid(endRow) || isInvalid(startRow)) { 839 if (!isFlush) { 840 LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath()); 841 } 842 insertFileIntoStripe(getLevel0Copy(), sf); 843 this.l0Results.add(sf); 844 continue; 845 } 846 if (!this.stripeFiles.isEmpty()) { 847 int stripeIndex = findStripeIndexByEndRow(endRow); 848 if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) { 849 // Simple/common case - add file to an existing stripe. 850 insertFileIntoStripe(getStripeCopy(stripeIndex), sf); 851 continue; 852 } 853 } 854 855 // Make a new candidate stripe. 856 if (newStripes == null) { 857 newStripes = new TreeMap<>(MAP_COMPARATOR); 858 } 859 HStoreFile oldSf = newStripes.put(endRow, sf); 860 if (oldSf != null) { 861 throw new IOException("Compactor has produced multiple files for the stripe ending in [" 862 + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath()); 863 } 864 } 865 return newStripes; 866 } 867 868 /** 869 * Remove compacted files. 870 */ 871 private void removeCompactedFiles() throws IOException { 872 for (HStoreFile oldFile : this.compactedFiles) { 873 byte[] oldEndRow = endOf(oldFile); 874 List<HStoreFile> source = null; 875 if (isInvalid(oldEndRow)) { 876 source = getLevel0Copy(); 877 } else { 878 int stripeIndex = findStripeIndexByEndRow(oldEndRow); 879 if (stripeIndex < 0) { 880 throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong" 881 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])"); 882 } 883 source = getStripeCopy(stripeIndex); 884 } 885 if (!source.remove(oldFile)) { 886 throw new IOException("An allegedly compacted file [" + oldFile + "] was not found"); 887 } 888 } 889 } 890 891 /** 892 * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with 893 * new candidate stripes/removes old stripes; produces new set of stripe end rows. 894 * @param newStripes New stripes - files by end row. 895 */ 896 private void processNewCandidateStripes( 897 TreeMap<byte[], HStoreFile> newStripes) throws IOException { 898 // Validate that the removed and added aggregate ranges still make for a full key space. 899 boolean hasStripes = !this.stripeFiles.isEmpty(); 900 this.stripeEndRows = new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows)); 901 int removeFrom = 0; 902 byte[] firstStartRow = startOf(newStripes.firstEntry().getValue()); 903 byte[] lastEndRow = newStripes.lastKey(); 904 if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) { 905 throw new IOException("Newly created stripes do not cover the entire key space."); 906 } 907 908 boolean canAddNewStripes = true; 909 Collection<HStoreFile> filesForL0 = null; 910 if (hasStripes) { 911 // Determine which stripes will need to be removed because they conflict with new stripes. 912 // The new boundaries should match old stripe boundaries, so we should get exact matches. 913 if (isOpen(firstStartRow)) { 914 removeFrom = 0; 915 } else { 916 removeFrom = findStripeIndexByEndRow(firstStartRow); 917 if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range."); 918 ++removeFrom; 919 } 920 int removeTo = findStripeIndexByEndRow(lastEndRow); 921 if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range."); 922 // See if there are files in the stripes we are trying to replace. 923 ArrayList<HStoreFile> conflictingFiles = new ArrayList<>(); 924 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { 925 conflictingFiles.addAll(this.stripeFiles.get(removeIndex)); 926 } 927 if (!conflictingFiles.isEmpty()) { 928 // This can be caused by two things - concurrent flush into stripes, or a bug. 929 // Unfortunately, we cannot tell them apart without looking at timing or something 930 // like that. We will assume we are dealing with a flush and dump it into L0. 931 if (isFlush) { 932 long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values()); 933 LOG.warn("Stripes were created by a flush, but results of size " + newSize 934 + " cannot be added because the stripes have changed"); 935 canAddNewStripes = false; 936 filesForL0 = newStripes.values(); 937 } else { 938 long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles); 939 LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) " 940 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change"); 941 filesForL0 = conflictingFiles; 942 } 943 if (filesForL0 != null) { 944 for (HStoreFile sf : filesForL0) { 945 insertFileIntoStripe(getLevel0Copy(), sf); 946 } 947 l0Results.addAll(filesForL0); 948 } 949 } 950 951 if (canAddNewStripes) { 952 // Remove old empty stripes. 953 int originalCount = this.stripeFiles.size(); 954 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { 955 if (removeIndex != originalCount - 1) { 956 this.stripeEndRows.remove(removeIndex); 957 } 958 this.stripeFiles.remove(removeIndex); 959 } 960 } 961 } 962 963 if (!canAddNewStripes) return; // Files were already put into L0. 964 965 // Now, insert new stripes. The total ranges match, so we can insert where we removed. 966 byte[] previousEndRow = null; 967 int insertAt = removeFrom; 968 for (Map.Entry<byte[], HStoreFile> newStripe : newStripes.entrySet()) { 969 if (previousEndRow != null) { 970 // Validate that the ranges are contiguous. 971 assert !isOpen(previousEndRow); 972 byte[] startRow = startOf(newStripe.getValue()); 973 if (!rowEquals(previousEndRow, startRow)) { 974 throw new IOException("The new stripes produced by " 975 + (isFlush ? "flush" : "compaction") + " are not contiguous"); 976 } 977 } 978 // Add the new stripe. 979 ArrayList<HStoreFile> tmp = new ArrayList<>(); 980 tmp.add(newStripe.getValue()); 981 stripeFiles.add(insertAt, tmp); 982 previousEndRow = newStripe.getKey(); 983 if (!isOpen(previousEndRow)) { 984 stripeEndRows.add(insertAt, previousEndRow); 985 } 986 ++insertAt; 987 } 988 } 989 } 990 991 @Override 992 public List<HStoreFile> getLevel0Files() { 993 return this.state.level0Files; 994 } 995 996 @Override 997 public List<byte[]> getStripeBoundaries() { 998 if (this.state.stripeFiles.isEmpty()) { 999 return Collections.emptyList(); 1000 } 1001 ArrayList<byte[]> result = new ArrayList<>(this.state.stripeEndRows.length + 2); 1002 result.add(OPEN_KEY); 1003 Collections.addAll(result, this.state.stripeEndRows); 1004 result.add(OPEN_KEY); 1005 return result; 1006 } 1007 1008 @Override 1009 public ArrayList<ImmutableList<HStoreFile>> getStripes() { 1010 return this.state.stripeFiles; 1011 } 1012 1013 @Override 1014 public int getStripeCount() { 1015 return this.state.stripeFiles.size(); 1016 } 1017 1018 @Override 1019 public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) { 1020 // 1) We can never get rid of the last file which has the maximum seqid in a stripe. 1021 // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. 1022 State state = this.state; 1023 Collection<HStoreFile> expiredStoreFiles = null; 1024 for (ImmutableList<HStoreFile> stripe : state.stripeFiles) { 1025 expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles); 1026 } 1027 return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles); 1028 } 1029 1030 private Collection<HStoreFile> findExpiredFiles(ImmutableList<HStoreFile> stripe, long maxTs, 1031 List<HStoreFile> filesCompacting, Collection<HStoreFile> expiredStoreFiles) { 1032 // Order by seqnum is reversed. 1033 for (int i = 1; i < stripe.size(); ++i) { 1034 HStoreFile sf = stripe.get(i); 1035 synchronized (sf) { 1036 long fileTs = sf.getReader().getMaxTimestamp(); 1037 if (fileTs < maxTs && !filesCompacting.contains(sf)) { 1038 LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimestamp is " 1039 + fileTs + ", which is below " + maxTs); 1040 if (expiredStoreFiles == null) { 1041 expiredStoreFiles = new ArrayList<>(); 1042 } 1043 expiredStoreFiles.add(sf); 1044 } 1045 } 1046 } 1047 return expiredStoreFiles; 1048 } 1049 1050 @Override 1051 public double getCompactionPressure() { 1052 State stateLocal = this.state; 1053 if (stateLocal.allFilesCached.size() > blockingFileCount) { 1054 // just a hit to tell others that we have reached the blocking file count. 1055 return 2.0; 1056 } 1057 if (stateLocal.stripeFiles.isEmpty()) { 1058 return 0.0; 1059 } 1060 int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size(); 1061 // do not calculate L0 separately because data will be moved to stripe quickly and in most cases 1062 // we flush data to stripe directly. 1063 int delta = stateLocal.level0Files.isEmpty() ? 0 : 1; 1064 double max = 0.0; 1065 for (ImmutableList<HStoreFile> stripeFile : stateLocal.stripeFiles) { 1066 int stripeFileCount = stripeFile.size(); 1067 double normCount = 1068 (double) (stripeFileCount + delta - config.getStripeCompactMinFiles()) 1069 / (blockingFilePerStripe - config.getStripeCompactMinFiles()); 1070 if (normCount >= 1.0) { 1071 // This could happen if stripe is not split evenly. Do not return values that larger than 1072 // 1.0 because we have not reached the blocking file count actually. 1073 return 1.0; 1074 } 1075 if (normCount > max) { 1076 max = normCount; 1077 } 1078 } 1079 return max; 1080 } 1081 1082 @Override 1083 public Comparator<HStoreFile> getStoreFileComparator() { 1084 return StoreFileComparators.SEQ_ID; 1085 } 1086}