001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.TreeMap; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.ConcatenatedLists; 041import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 048 049/** 050 * Stripe implementation of StoreFileManager. Not thread safe - relies on external locking (in 051 * HStore). Collections that this class returns are immutable or unique to the call, so they should 052 * be safe. Stripe store splits the key space of the region into non-overlapping stripes, as well as 053 * some recent files that have all the keys (level 0). Each stripe contains a set of files. When L0 054 * is compacted, it's split into the files corresponding to existing stripe boundaries, that can 055 * thus be added to stripes. When scan or get happens, it only has to read the files from the 056 * corresponding stripes. See StripeCompationPolicy on how the stripes are determined; this class 057 * doesn't care. This class should work together with StripeCompactionPolicy and StripeCompactor. 058 * With regard to how they work, we make at least the following (reasonable) assumptions: - 059 * Compaction produces one file per new stripe (if any); that is easy to change. - Compaction has 060 * one contiguous set of stripes both in and out, except if L0 is involved. 061 */ 062@InterfaceAudience.Private 063public class StripeStoreFileManager 064 implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider { 065 private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFileManager.class); 066 067 /** 068 * The file metadata fields that contain the stripe information. 069 */ 070 public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY"); 071 public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY"); 072 073 private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator(); 074 075 /** 076 * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf). 077 */ 078 public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY; 079 final static byte[] INVALID_KEY = null; 080 081 /** 082 * The state class. Used solely to replace results atomically during compactions and avoid 083 * complicated error handling. 084 */ 085 private static class State { 086 /** 087 * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored 088 * here. It is invariant that the start row of the stripe is the end row of the previous one 089 * (and is an open boundary for the first one). 090 */ 091 public byte[][] stripeEndRows = new byte[0][]; 092 093 /** 094 * Files by stripe. Each element of the list corresponds to stripeEndRow element with the same 095 * index, except the last one. Inside each list, the files are in reverse order by seqNum. Note 096 * that the length of this is one higher than that of stripeEndKeys. 097 */ 098 public ArrayList<ImmutableList<HStoreFile>> stripeFiles = new ArrayList<>(); 099 /** Level 0. The files are in reverse order by seqNum. */ 100 public ImmutableList<HStoreFile> level0Files = ImmutableList.of(); 101 102 /** Cached list of all files in the structure, to return from some calls */ 103 public ImmutableList<HStoreFile> allFilesCached = ImmutableList.of(); 104 private ImmutableList<HStoreFile> allCompactedFilesCached = ImmutableList.of(); 105 } 106 107 private State state = null; 108 109 /** Cached file metadata (or overrides as the case may be) */ 110 private HashMap<HStoreFile, byte[]> fileStarts = new HashMap<>(); 111 private HashMap<HStoreFile, byte[]> fileEnds = new HashMap<>(); 112 /** 113 * Normally invalid key is null, but in the map null is the result for "no key"; so use the 114 * following constant value in these maps instead. Note that this is a constant and we use it to 115 * compare by reference when we read from the map. 116 */ 117 private static final byte[] INVALID_KEY_IN_MAP = new byte[0]; 118 119 private final CellComparator cellComparator; 120 private StripeStoreConfig config; 121 122 private final int blockingFileCount; 123 124 public StripeStoreFileManager(CellComparator kvComparator, Configuration conf, 125 StripeStoreConfig config) { 126 this.cellComparator = kvComparator; 127 this.config = config; 128 this.blockingFileCount = 129 conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); 130 } 131 132 @Override 133 public void loadFiles(List<HStoreFile> storeFiles) { 134 loadUnclassifiedStoreFiles(storeFiles); 135 } 136 137 @Override 138 public Collection<HStoreFile> getStorefiles() { 139 return state.allFilesCached; 140 } 141 142 @Override 143 public Collection<HStoreFile> getCompactedfiles() { 144 return state.allCompactedFilesCached; 145 } 146 147 @Override 148 public int getCompactedFilesCount() { 149 return state.allCompactedFilesCached.size(); 150 } 151 152 @Override 153 public void insertNewFiles(Collection<HStoreFile> sfs) { 154 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); 155 cmc.mergeResults(Collections.emptyList(), sfs); 156 debugDumpState("Added new files"); 157 } 158 159 @Override 160 public ImmutableCollection<HStoreFile> clearFiles() { 161 ImmutableCollection<HStoreFile> result = state.allFilesCached; 162 this.state = new State(); 163 this.fileStarts.clear(); 164 this.fileEnds.clear(); 165 return result; 166 } 167 168 @Override 169 public ImmutableCollection<HStoreFile> clearCompactedFiles() { 170 ImmutableCollection<HStoreFile> result = state.allCompactedFilesCached; 171 this.state = new State(); 172 return result; 173 } 174 175 @Override 176 public int getStorefileCount() { 177 return state.allFilesCached.size(); 178 } 179 180 /** 181 * See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} for details on this 182 * methods. 183 */ 184 @Override 185 public Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) { 186 KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists(); 187 // Order matters for this call. 188 result.addSublist(state.level0Files); 189 if (!state.stripeFiles.isEmpty()) { 190 int lastStripeIndex = findStripeForRow(CellUtil.cloneRow(targetKey), false); 191 for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) { 192 result.addSublist(state.stripeFiles.get(stripeIndex)); 193 } 194 } 195 return result.iterator(); 196 } 197 198 /** 199 * See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and 200 * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, Cell)} for 201 * details on this methods. 202 */ 203 @Override 204 public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore( 205 Iterator<HStoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) { 206 KeyBeforeConcatenatedLists.Iterator original = 207 (KeyBeforeConcatenatedLists.Iterator) candidateFiles; 208 assert original != null; 209 ArrayList<List<HStoreFile>> components = original.getComponents(); 210 for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) { 211 HStoreFile sf = components.get(firstIrrelevant).get(0); 212 byte[] endKey = endOf(sf); 213 // Entries are ordered as such: L0, then stripes in reverse order. We never remove 214 // level 0; we remove the stripe, and all subsequent ones, as soon as we find the 215 // first one that cannot possibly have better candidates. 216 if (!isInvalid(endKey) && !isOpen(endKey) && (nonOpenRowCompare(targetKey, endKey) >= 0)) { 217 original.removeComponents(firstIrrelevant); 218 break; 219 } 220 } 221 return original; 222 } 223 224 /** 225 * Override of getSplitPoint that determines the split point as the boundary between two stripes, 226 * unless it causes significant imbalance between split sides' sizes. In that case, the split 227 * boundary will be chosen from the middle of one of the stripes to minimize imbalance. 228 * @return The split point, or null if no split is possible. 229 */ 230 @Override 231 public Optional<byte[]> getSplitPoint() throws IOException { 232 if (this.getStorefileCount() == 0) { 233 return Optional.empty(); 234 } 235 if (state.stripeFiles.size() <= 1) { 236 return getSplitPointFromAllFiles(); 237 } 238 int leftIndex = -1, rightIndex = state.stripeFiles.size(); 239 long leftSize = 0, rightSize = 0; 240 long lastLeftSize = 0, lastRightSize = 0; 241 while (rightIndex - 1 != leftIndex) { 242 if (leftSize >= rightSize) { 243 --rightIndex; 244 lastRightSize = getStripeFilesSize(rightIndex); 245 rightSize += lastRightSize; 246 } else { 247 ++leftIndex; 248 lastLeftSize = getStripeFilesSize(leftIndex); 249 leftSize += lastLeftSize; 250 } 251 } 252 if (leftSize == 0 || rightSize == 0) { 253 String errMsg = String.format( 254 "Cannot split on a boundary - left index %d size %d, " + "right index %d size %d", 255 leftIndex, leftSize, rightIndex, rightSize); 256 debugDumpState(errMsg); 257 LOG.warn(errMsg); 258 return getSplitPointFromAllFiles(); 259 } 260 double ratio = (double) rightSize / leftSize; 261 if (ratio < 1) { 262 ratio = 1 / ratio; 263 } 264 if (config.getMaxSplitImbalance() > ratio) { 265 return Optional.of(state.stripeEndRows[leftIndex]); 266 } 267 268 // If the difference between the sides is too large, we could get the proportional key on 269 // the a stripe to equalize the difference, but there's no proportional key method at the 270 // moment, and it's not extremely important. 271 // See if we can achieve better ratio if we split the bigger side in half. 272 boolean isRightLarger = rightSize >= leftSize; 273 double newRatio = isRightLarger 274 ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize) 275 : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize); 276 if (newRatio < 1) { 277 newRatio = 1 / newRatio; 278 } 279 if (newRatio >= ratio) { 280 return Optional.of(state.stripeEndRows[leftIndex]); 281 } 282 LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split " + newRatio 283 + " configured ratio " + config.getMaxSplitImbalance()); 284 // OK, we may get better ratio, get it. 285 return StoreUtils.getSplitPoint(state.stripeFiles.get(isRightLarger ? rightIndex : leftIndex), 286 cellComparator); 287 } 288 289 private Optional<byte[]> getSplitPointFromAllFiles() throws IOException { 290 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 291 sfs.addSublist(state.level0Files); 292 sfs.addAllSublists(state.stripeFiles); 293 return StoreUtils.getSplitPoint(sfs, cellComparator); 294 } 295 296 private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) { 297 return (double) (largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f); 298 } 299 300 @Override 301 public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow, 302 byte[] stopRow, boolean includeStopRow) { 303 if (state.stripeFiles.isEmpty()) { 304 return state.level0Files; // There's just L0. 305 } 306 307 int firstStripe = findStripeForRow(startRow, true); 308 int lastStripe = findStripeForRow(stopRow, false); 309 assert firstStripe <= lastStripe; 310 if (firstStripe == lastStripe && state.level0Files.isEmpty()) { 311 return state.stripeFiles.get(firstStripe); // There's just one stripe we need. 312 } 313 if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) { 314 return state.allFilesCached; // We need to read all files. 315 } 316 317 ConcatenatedLists<HStoreFile> result = new ConcatenatedLists<>(); 318 result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1)); 319 result.addSublist(state.level0Files); 320 return result; 321 } 322 323 @Override 324 public void addCompactionResults(Collection<HStoreFile> compactedFiles, 325 Collection<HStoreFile> results) { 326 // See class comment for the assumptions we make here. 327 LOG.debug("Attempting to merge compaction results: " + compactedFiles.size() 328 + " files replaced by " + results.size()); 329 // In order to be able to fail in the middle of the operation, we'll operate on lazy 330 // copies and apply the result at the end. 331 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); 332 cmc.mergeResults(compactedFiles, results); 333 markCompactedAway(compactedFiles); 334 debugDumpState("Merged compaction results"); 335 } 336 337 // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised 338 // Let a background thread close the actual reader on these compacted files and also 339 // ensure to evict the blocks from block cache so that they are no longer in 340 // cache 341 private void markCompactedAway(Collection<HStoreFile> compactedFiles) { 342 for (HStoreFile file : compactedFiles) { 343 file.markCompactedAway(); 344 } 345 } 346 347 @Override 348 public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) { 349 // See class comment for the assumptions we make here. 350 LOG.debug("Attempting to delete compaction results: " + compactedFiles.size()); 351 // In order to be able to fail in the middle of the operation, we'll operate on lazy 352 // copies and apply the result at the end. 353 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); 354 cmc.deleteResults(compactedFiles); 355 debugDumpState("Deleted compaction results"); 356 } 357 358 @Override 359 public int getStoreCompactionPriority() { 360 // If there's only L0, do what the default store does. 361 // If we are in critical priority, do the same - we don't want to trump all stores all 362 // the time due to how many files we have. 363 int fc = getStorefileCount(); 364 if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) { 365 return this.blockingFileCount - fc; 366 } 367 // If we are in good shape, we don't want to be trumped by all other stores due to how 368 // many files we have, so do an approximate mapping to normal priority range; L0 counts 369 // for all stripes. 370 int l0 = state.level0Files.size(), sc = state.stripeFiles.size(); 371 int priority = (int) Math.ceil(((double) (this.blockingFileCount - fc + l0) / sc) - l0); 372 return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority; 373 } 374 375 /** 376 * Gets the total size of all files in the stripe. 377 * @param stripeIndex Stripe index. 378 * @return Size. 379 */ 380 private long getStripeFilesSize(int stripeIndex) { 381 long result = 0; 382 for (HStoreFile sf : state.stripeFiles.get(stripeIndex)) { 383 result += sf.getReader().length(); 384 } 385 return result; 386 } 387 388 /** 389 * Loads initial store files that were picked up from some physical location pertaining to this 390 * store (presumably). Unlike adding files after compaction, assumes empty initial sets, and is 391 * forgiving with regard to stripe constraints - at worst, many/all files will go to level 0. 392 * @param storeFiles Store files to add. 393 */ 394 private void loadUnclassifiedStoreFiles(List<HStoreFile> storeFiles) { 395 LOG.debug("Attempting to load " + storeFiles.size() + " store files."); 396 TreeMap<byte[], ArrayList<HStoreFile>> candidateStripes = new TreeMap<>(MAP_COMPARATOR); 397 ArrayList<HStoreFile> level0Files = new ArrayList<>(); 398 // Separate the files into tentative stripes; then validate. Currently, we rely on metadata. 399 // If needed, we could dynamically determine the stripes in future. 400 for (HStoreFile sf : storeFiles) { 401 byte[] startRow = startOf(sf), endRow = endOf(sf); 402 // Validate the range and put the files into place. 403 if (isInvalid(startRow) || isInvalid(endRow)) { 404 insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0. 405 ensureLevel0Metadata(sf); 406 } else if (!isOpen(startRow) && !isOpen(endRow) && nonOpenRowCompare(startRow, endRow) >= 0) { 407 LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row [" 408 + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0"); 409 insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also. 410 ensureLevel0Metadata(sf); 411 } else { 412 ArrayList<HStoreFile> stripe = candidateStripes.get(endRow); 413 if (stripe == null) { 414 stripe = new ArrayList<>(); 415 candidateStripes.put(endRow, stripe); 416 } 417 insertFileIntoStripe(stripe, sf); 418 } 419 } 420 // Possible improvement - for variable-count stripes, if all the files are in L0, we can 421 // instead create single, open-ended stripe with all files. 422 423 boolean hasOverlaps = false; 424 byte[] expectedStartRow = null; // first stripe can start wherever 425 Iterator<Map.Entry<byte[], ArrayList<HStoreFile>>> entryIter = 426 candidateStripes.entrySet().iterator(); 427 while (entryIter.hasNext()) { 428 Map.Entry<byte[], ArrayList<HStoreFile>> entry = entryIter.next(); 429 ArrayList<HStoreFile> files = entry.getValue(); 430 // Validate the file start rows, and remove the bad ones to level 0. 431 for (int i = 0; i < files.size(); ++i) { 432 HStoreFile sf = files.get(i); 433 byte[] startRow = startOf(sf); 434 if (expectedStartRow == null) { 435 expectedStartRow = startRow; // ensure that first stripe is still consistent 436 } else if (!rowEquals(expectedStartRow, startRow)) { 437 hasOverlaps = true; 438 LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at [" 439 + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow) 440 + "], to L0 it goes"); 441 HStoreFile badSf = files.remove(i); 442 insertFileIntoStripe(level0Files, badSf); 443 ensureLevel0Metadata(badSf); 444 --i; 445 } 446 } 447 // Check if any files from the candidate stripe are valid. If so, add a stripe. 448 byte[] endRow = entry.getKey(); 449 if (!files.isEmpty()) { 450 expectedStartRow = endRow; // Next stripe must start exactly at that key. 451 } else { 452 entryIter.remove(); 453 } 454 } 455 456 // In the end, there must be open ends on two sides. If not, and there were no errors i.e. 457 // files are consistent, they might be coming from a split. We will treat the boundaries 458 // as open keys anyway, and log the message. 459 // If there were errors, we'll play it safe and dump everything into L0. 460 if (!candidateStripes.isEmpty()) { 461 HStoreFile firstFile = candidateStripes.firstEntry().getValue().get(0); 462 boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey()); 463 if (!isOpen) { 464 LOG.warn("The range of the loaded files does not cover full key space: from [" 465 + Bytes.toString(startOf(firstFile)) + "], to [" 466 + Bytes.toString(candidateStripes.lastKey()) + "]"); 467 if (!hasOverlaps) { 468 ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true); 469 ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false); 470 } else { 471 LOG.warn("Inconsistent files, everything goes to L0."); 472 for (ArrayList<HStoreFile> files : candidateStripes.values()) { 473 for (HStoreFile sf : files) { 474 insertFileIntoStripe(level0Files, sf); 475 ensureLevel0Metadata(sf); 476 } 477 } 478 candidateStripes.clear(); 479 } 480 } 481 } 482 483 // Copy the results into the fields. 484 State state = new State(); 485 state.level0Files = ImmutableList.copyOf(level0Files); 486 state.stripeFiles = new ArrayList<>(candidateStripes.size()); 487 state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][]; 488 ArrayList<HStoreFile> newAllFiles = new ArrayList<>(level0Files); 489 int i = candidateStripes.size() - 1; 490 for (Map.Entry<byte[], ArrayList<HStoreFile>> entry : candidateStripes.entrySet()) { 491 state.stripeFiles.add(ImmutableList.copyOf(entry.getValue())); 492 newAllFiles.addAll(entry.getValue()); 493 if (i > 0) { 494 state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey(); 495 } 496 --i; 497 } 498 state.allFilesCached = ImmutableList.copyOf(newAllFiles); 499 this.state = state; 500 debugDumpState("Files loaded"); 501 } 502 503 private void ensureEdgeStripeMetadata(ArrayList<HStoreFile> stripe, boolean isFirst) { 504 HashMap<HStoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds; 505 for (HStoreFile sf : stripe) { 506 targetMap.put(sf, OPEN_KEY); 507 } 508 } 509 510 private void ensureLevel0Metadata(HStoreFile sf) { 511 if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP); 512 if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP); 513 } 514 515 private void debugDumpState(String string) { 516 if (!LOG.isDebugEnabled()) return; 517 StringBuilder sb = new StringBuilder(); 518 sb.append("\n" + string + "; current stripe state is as such:"); 519 sb.append("\n level 0 with ").append(state.level0Files.size()) 520 .append(" files: " + TraditionalBinaryPrefix 521 .long2String(StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";"); 522 for (int i = 0; i < state.stripeFiles.size(); ++i) { 523 String endRow = (i == state.stripeEndRows.length) 524 ? "(end)" 525 : "[" + Bytes.toString(state.stripeEndRows[i]) + "]"; 526 sb.append("\n stripe ending in ").append(endRow).append(" with ") 527 .append(state.stripeFiles.get(i).size()) 528 .append(" files: " + TraditionalBinaryPrefix.long2String( 529 StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";"); 530 } 531 sb.append("\n").append(state.stripeFiles.size()).append(" stripes total."); 532 sb.append("\n").append(getStorefileCount()).append(" files total."); 533 LOG.debug(sb.toString()); 534 } 535 536 /** 537 * Checks whether the key indicates an open interval boundary (i.e. infinity). 538 */ 539 private static final boolean isOpen(byte[] key) { 540 return key != null && key.length == 0; 541 } 542 543 private static final boolean isOpen(Cell key) { 544 return key != null && key.getRowLength() == 0; 545 } 546 547 /** 548 * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files). 549 */ 550 private static final boolean isInvalid(byte[] key) { 551 // No need to use Arrays.equals because INVALID_KEY is null 552 return key == INVALID_KEY; 553 } 554 555 /** 556 * Compare two keys for equality. 557 */ 558 private final boolean rowEquals(byte[] k1, byte[] k2) { 559 return Bytes.equals(k1, 0, k1.length, k2, 0, k2.length); 560 } 561 562 /** 563 * Compare two keys. Keys must not be open (isOpen(row) == false). 564 */ 565 private final int nonOpenRowCompare(byte[] k1, byte[] k2) { 566 assert !isOpen(k1) && !isOpen(k2); 567 return Bytes.compareTo(k1, k2); 568 } 569 570 private final int nonOpenRowCompare(Cell k1, byte[] k2) { 571 assert !isOpen(k1) && !isOpen(k2); 572 return cellComparator.compareRows(k1, k2, 0, k2.length); 573 } 574 575 /** 576 * Finds the stripe index by end row. 577 */ 578 private final int findStripeIndexByEndRow(byte[] endRow) { 579 assert !isInvalid(endRow); 580 if (isOpen(endRow)) return state.stripeEndRows.length; 581 return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR); 582 } 583 584 /** 585 * Finds the stripe index for the stripe containing a row provided externally for get/scan. 586 */ 587 private final int findStripeForRow(byte[] row, boolean isStart) { 588 if (isStart && Arrays.equals(row, HConstants.EMPTY_START_ROW)) return 0; 589 if (!isStart && Arrays.equals(row, HConstants.EMPTY_END_ROW)) { 590 return state.stripeFiles.size() - 1; 591 } 592 // If there's an exact match below, a stripe ends at "row". Stripe right boundary is 593 // exclusive, so that means the row is in the next stripe; thus, we need to add one to index. 594 // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where 595 // insertion point is the index of the next greater element, or list size if none. The 596 // insertion point happens to be exactly what we need, so we need to add one to the result. 597 return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1); 598 } 599 600 @Override 601 public final byte[] getStartRow(int stripeIndex) { 602 return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]); 603 } 604 605 @Override 606 public final byte[] getEndRow(int stripeIndex) { 607 return (stripeIndex == state.stripeEndRows.length 608 ? OPEN_KEY 609 : state.stripeEndRows[stripeIndex]); 610 } 611 612 private byte[] startOf(HStoreFile sf) { 613 byte[] result = fileStarts.get(sf); 614 615 // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter 616 // serves only as a marker and is not to be confused with other empty byte arrays. 617 // See Javadoc of INVALID_KEY_IN_MAP for more information 618 return (result == null) ? sf.getMetadataValue(STRIPE_START_KEY) 619 : result == INVALID_KEY_IN_MAP ? INVALID_KEY 620 : result; 621 } 622 623 private byte[] endOf(HStoreFile sf) { 624 byte[] result = fileEnds.get(sf); 625 626 // result and INVALID_KEY_IN_MAP are compared _only_ by reference on purpose here as the latter 627 // serves only as a marker and is not to be confused with other empty byte arrays. 628 // See Javadoc of INVALID_KEY_IN_MAP for more information 629 return (result == null) ? sf.getMetadataValue(STRIPE_END_KEY) 630 : result == INVALID_KEY_IN_MAP ? INVALID_KEY 631 : result; 632 } 633 634 /** 635 * Inserts a file in the correct place (by seqnum) in a stripe copy. 636 * @param stripe Stripe copy to insert into. 637 * @param sf File to insert. 638 */ 639 private static void insertFileIntoStripe(ArrayList<HStoreFile> stripe, HStoreFile sf) { 640 // The only operation for which sorting of the files matters is KeyBefore. Therefore, 641 // we will store the file in reverse order by seqNum from the outset. 642 for (int insertBefore = 0;; ++insertBefore) { 643 if ( 644 insertBefore == stripe.size() 645 || (StoreFileComparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0) 646 ) { 647 stripe.add(insertBefore, sf); 648 break; 649 } 650 } 651 } 652 653 /** 654 * An extension of ConcatenatedLists that has several peculiar properties. First, one can cut the 655 * tail of the logical list by removing last several sub-lists. Second, items can be removed thru 656 * iterator. Third, if the sub-lists are immutable, they are replaced with mutable copies when 657 * needed. On average KeyBefore operation will contain half the stripes as potential candidates, 658 * but will quickly cut down on them as it finds something in the more likely ones; thus, the 659 * above allow us to avoid unnecessary copying of a bunch of lists. 660 */ 661 private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<HStoreFile> { 662 @Override 663 public java.util.Iterator<HStoreFile> iterator() { 664 return new Iterator(); 665 } 666 667 public class Iterator extends ConcatenatedLists<HStoreFile>.Iterator { 668 public ArrayList<List<HStoreFile>> getComponents() { 669 return components; 670 } 671 672 public void removeComponents(int startIndex) { 673 List<List<HStoreFile>> subList = components.subList(startIndex, components.size()); 674 for (List<HStoreFile> entry : subList) { 675 size -= entry.size(); 676 } 677 assert size >= 0; 678 subList.clear(); 679 } 680 681 @Override 682 public void remove() { 683 if (!this.nextWasCalled) { 684 throw new IllegalStateException("No element to remove"); 685 } 686 this.nextWasCalled = false; 687 List<HStoreFile> src = components.get(currentComponent); 688 if (src instanceof ImmutableList<?>) { 689 src = new ArrayList<>(src); 690 components.set(currentComponent, src); 691 } 692 src.remove(indexWithinComponent); 693 --size; 694 --indexWithinComponent; 695 if (src.isEmpty()) { 696 components.remove(currentComponent); // indexWithinComponent is already -1 here. 697 } 698 } 699 } 700 } 701 702 /** 703 * Non-static helper class for merging compaction or flush results. Since we want to merge them 704 * atomically (more or less), it operates on lazy copies, then creates a new state object and puts 705 * it in place. 706 */ 707 private class CompactionOrFlushMergeCopy { 708 private ArrayList<List<HStoreFile>> stripeFiles = null; 709 private ArrayList<HStoreFile> level0Files = null; 710 private ArrayList<byte[]> stripeEndRows = null; 711 712 private Collection<HStoreFile> compactedFiles = null; 713 private Collection<HStoreFile> results = null; 714 715 private List<HStoreFile> l0Results = new ArrayList<>(); 716 private final boolean isFlush; 717 718 public CompactionOrFlushMergeCopy(boolean isFlush) { 719 // Create a lazy mutable copy (other fields are so lazy they start out as nulls). 720 this.stripeFiles = new ArrayList<>(StripeStoreFileManager.this.state.stripeFiles); 721 this.isFlush = isFlush; 722 } 723 724 private void mergeResults(Collection<HStoreFile> compactedFiles, 725 Collection<HStoreFile> results) { 726 assert this.compactedFiles == null && this.results == null; 727 this.compactedFiles = compactedFiles; 728 this.results = results; 729 // Do logical processing. 730 if (!isFlush) { 731 removeCompactedFiles(); 732 } 733 TreeMap<byte[], HStoreFile> newStripes = processResults(); 734 if (newStripes != null) { 735 processNewCandidateStripes(newStripes); 736 } 737 // Create new state and update parent. 738 State state = createNewState(false); 739 StripeStoreFileManager.this.state = state; 740 updateMetadataMaps(); 741 } 742 743 private void deleteResults(Collection<HStoreFile> compactedFiles) { 744 this.compactedFiles = compactedFiles; 745 // Create new state and update parent. 746 State state = createNewState(true); 747 StripeStoreFileManager.this.state = state; 748 updateMetadataMaps(); 749 } 750 751 private State createNewState(boolean delCompactedFiles) { 752 State oldState = StripeStoreFileManager.this.state; 753 // Stripe count should be the same unless the end rows changed. 754 assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null; 755 State newState = new State(); 756 newState.level0Files = 757 (this.level0Files == null) ? oldState.level0Files : ImmutableList.copyOf(this.level0Files); 758 newState.stripeEndRows = (this.stripeEndRows == null) 759 ? oldState.stripeEndRows 760 : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]); 761 newState.stripeFiles = new ArrayList<>(this.stripeFiles.size()); 762 for (List<HStoreFile> newStripe : this.stripeFiles) { 763 newState.stripeFiles.add(newStripe instanceof ImmutableList<?> 764 ? (ImmutableList<HStoreFile>) newStripe 765 : ImmutableList.copyOf(newStripe)); 766 } 767 768 List<HStoreFile> newAllFiles = new ArrayList<>(oldState.allFilesCached); 769 List<HStoreFile> newAllCompactedFiles = new ArrayList<>(oldState.allCompactedFilesCached); 770 if (!isFlush) { 771 newAllFiles.removeAll(compactedFiles); 772 if (delCompactedFiles) { 773 newAllCompactedFiles.removeAll(compactedFiles); 774 } else { 775 newAllCompactedFiles.addAll(compactedFiles); 776 } 777 } 778 if (results != null) { 779 newAllFiles.addAll(results); 780 } 781 newState.allFilesCached = ImmutableList.copyOf(newAllFiles); 782 newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles); 783 return newState; 784 } 785 786 private void updateMetadataMaps() { 787 StripeStoreFileManager parent = StripeStoreFileManager.this; 788 if (!isFlush) { 789 for (HStoreFile sf : this.compactedFiles) { 790 parent.fileStarts.remove(sf); 791 parent.fileEnds.remove(sf); 792 } 793 } 794 if (this.l0Results != null) { 795 for (HStoreFile sf : this.l0Results) { 796 parent.ensureLevel0Metadata(sf); 797 } 798 } 799 } 800 801 /** 802 * @param index Index of the stripe we need. 803 * @return A lazy stripe copy from current stripes. 804 */ 805 private final ArrayList<HStoreFile> getStripeCopy(int index) { 806 List<HStoreFile> stripeCopy = this.stripeFiles.get(index); 807 ArrayList<HStoreFile> result = null; 808 if (stripeCopy instanceof ImmutableList<?>) { 809 result = new ArrayList<>(stripeCopy); 810 this.stripeFiles.set(index, result); 811 } else { 812 result = (ArrayList<HStoreFile>) stripeCopy; 813 } 814 return result; 815 } 816 817 /** Returns A lazy L0 copy from current state. */ 818 private final ArrayList<HStoreFile> getLevel0Copy() { 819 if (this.level0Files == null) { 820 this.level0Files = new ArrayList<>(StripeStoreFileManager.this.state.level0Files); 821 } 822 return this.level0Files; 823 } 824 825 /** 826 * Process new files, and add them either to the structure of existing stripes, or to the list 827 * of new candidate stripes. 828 * @return New candidate stripes. 829 */ 830 private TreeMap<byte[], HStoreFile> processResults() { 831 TreeMap<byte[], HStoreFile> newStripes = null; 832 for (HStoreFile sf : this.results) { 833 byte[] startRow = startOf(sf), endRow = endOf(sf); 834 if (isInvalid(endRow) || isInvalid(startRow)) { 835 if (!isFlush) { 836 LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath()); 837 } 838 insertFileIntoStripe(getLevel0Copy(), sf); 839 this.l0Results.add(sf); 840 continue; 841 } 842 if (!this.stripeFiles.isEmpty()) { 843 int stripeIndex = findStripeIndexByEndRow(endRow); 844 if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) { 845 // Simple/common case - add file to an existing stripe. 846 insertFileIntoStripe(getStripeCopy(stripeIndex), sf); 847 continue; 848 } 849 } 850 851 // Make a new candidate stripe. 852 if (newStripes == null) { 853 newStripes = new TreeMap<>(MAP_COMPARATOR); 854 } 855 HStoreFile oldSf = newStripes.put(endRow, sf); 856 if (oldSf != null) { 857 throw new IllegalStateException( 858 "Compactor has produced multiple files for the stripe ending in [" 859 + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath()); 860 } 861 } 862 return newStripes; 863 } 864 865 /** 866 * Remove compacted files. 867 */ 868 private void removeCompactedFiles() { 869 for (HStoreFile oldFile : this.compactedFiles) { 870 byte[] oldEndRow = endOf(oldFile); 871 List<HStoreFile> source = null; 872 if (isInvalid(oldEndRow)) { 873 source = getLevel0Copy(); 874 } else { 875 int stripeIndex = findStripeIndexByEndRow(oldEndRow); 876 if (stripeIndex < 0) { 877 throw new IllegalStateException( 878 "An allegedly compacted file [" + oldFile + "] does not belong" 879 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])"); 880 } 881 source = getStripeCopy(stripeIndex); 882 } 883 if (!source.remove(oldFile)) { 884 LOG.warn("An allegedly compacted file [{}] was not found", oldFile); 885 } 886 } 887 } 888 889 /** 890 * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with new 891 * candidate stripes/removes old stripes; produces new set of stripe end rows. 892 * @param newStripes New stripes - files by end row. 893 */ 894 private void processNewCandidateStripes(TreeMap<byte[], HStoreFile> newStripes) { 895 // Validate that the removed and added aggregate ranges still make for a full key space. 896 boolean hasStripes = !this.stripeFiles.isEmpty(); 897 this.stripeEndRows = 898 new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows)); 899 int removeFrom = 0; 900 byte[] firstStartRow = startOf(newStripes.firstEntry().getValue()); 901 byte[] lastEndRow = newStripes.lastKey(); 902 if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) { 903 throw new IllegalStateException("Newly created stripes do not cover the entire key space."); 904 } 905 906 boolean canAddNewStripes = true; 907 Collection<HStoreFile> filesForL0 = null; 908 if (hasStripes) { 909 // Determine which stripes will need to be removed because they conflict with new stripes. 910 // The new boundaries should match old stripe boundaries, so we should get exact matches. 911 if (isOpen(firstStartRow)) { 912 removeFrom = 0; 913 } else { 914 removeFrom = findStripeIndexByEndRow(firstStartRow); 915 if (removeFrom < 0) { 916 throw new IllegalStateException("Compaction is trying to add a bad range."); 917 } 918 ++removeFrom; 919 } 920 int removeTo = findStripeIndexByEndRow(lastEndRow); 921 if (removeTo < 0) { 922 throw new IllegalStateException("Compaction is trying to add a bad range."); 923 } 924 // See if there are files in the stripes we are trying to replace. 925 ArrayList<HStoreFile> conflictingFiles = new ArrayList<>(); 926 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { 927 conflictingFiles.addAll(this.stripeFiles.get(removeIndex)); 928 } 929 if (!conflictingFiles.isEmpty()) { 930 // This can be caused by two things - concurrent flush into stripes, or a bug. 931 // Unfortunately, we cannot tell them apart without looking at timing or something 932 // like that. We will assume we are dealing with a flush and dump it into L0. 933 if (isFlush) { 934 long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values()); 935 LOG.warn("Stripes were created by a flush, but results of size " + newSize 936 + " cannot be added because the stripes have changed"); 937 canAddNewStripes = false; 938 filesForL0 = newStripes.values(); 939 } else { 940 long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles); 941 LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) " 942 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change"); 943 filesForL0 = conflictingFiles; 944 } 945 if (filesForL0 != null) { 946 for (HStoreFile sf : filesForL0) { 947 insertFileIntoStripe(getLevel0Copy(), sf); 948 } 949 l0Results.addAll(filesForL0); 950 } 951 } 952 953 if (canAddNewStripes) { 954 // Remove old empty stripes. 955 int originalCount = this.stripeFiles.size(); 956 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { 957 if (removeIndex != originalCount - 1) { 958 this.stripeEndRows.remove(removeIndex); 959 } 960 this.stripeFiles.remove(removeIndex); 961 } 962 } 963 } 964 965 if (!canAddNewStripes) { 966 return; // Files were already put into L0. 967 } 968 969 // Now, insert new stripes. The total ranges match, so we can insert where we removed. 970 byte[] previousEndRow = null; 971 int insertAt = removeFrom; 972 for (Map.Entry<byte[], HStoreFile> newStripe : newStripes.entrySet()) { 973 if (previousEndRow != null) { 974 // Validate that the ranges are contiguous. 975 assert !isOpen(previousEndRow); 976 byte[] startRow = startOf(newStripe.getValue()); 977 if (!rowEquals(previousEndRow, startRow)) { 978 throw new IllegalStateException("The new stripes produced by " 979 + (isFlush ? "flush" : "compaction") + " are not contiguous"); 980 } 981 } 982 // Add the new stripe. 983 ArrayList<HStoreFile> tmp = new ArrayList<>(); 984 tmp.add(newStripe.getValue()); 985 stripeFiles.add(insertAt, tmp); 986 previousEndRow = newStripe.getKey(); 987 if (!isOpen(previousEndRow)) { 988 stripeEndRows.add(insertAt, previousEndRow); 989 } 990 ++insertAt; 991 } 992 } 993 } 994 995 @Override 996 public List<HStoreFile> getLevel0Files() { 997 return this.state.level0Files; 998 } 999 1000 @Override 1001 public List<byte[]> getStripeBoundaries() { 1002 if (this.state.stripeFiles.isEmpty()) { 1003 return Collections.emptyList(); 1004 } 1005 ArrayList<byte[]> result = new ArrayList<>(this.state.stripeEndRows.length + 2); 1006 result.add(OPEN_KEY); 1007 Collections.addAll(result, this.state.stripeEndRows); 1008 result.add(OPEN_KEY); 1009 return result; 1010 } 1011 1012 @Override 1013 public ArrayList<ImmutableList<HStoreFile>> getStripes() { 1014 return this.state.stripeFiles; 1015 } 1016 1017 @Override 1018 public int getStripeCount() { 1019 return this.state.stripeFiles.size(); 1020 } 1021 1022 @Override 1023 public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) { 1024 // 1) We can never get rid of the last file which has the maximum seqid in a stripe. 1025 // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. 1026 State state = this.state; 1027 Collection<HStoreFile> expiredStoreFiles = null; 1028 for (ImmutableList<HStoreFile> stripe : state.stripeFiles) { 1029 expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles); 1030 } 1031 return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles); 1032 } 1033 1034 private Collection<HStoreFile> findExpiredFiles(ImmutableList<HStoreFile> stripe, long maxTs, 1035 List<HStoreFile> filesCompacting, Collection<HStoreFile> expiredStoreFiles) { 1036 // Order by seqnum is reversed. 1037 for (int i = 1; i < stripe.size(); ++i) { 1038 HStoreFile sf = stripe.get(i); 1039 synchronized (sf) { 1040 long fileTs = sf.getReader().getMaxTimestamp(); 1041 if (fileTs < maxTs && !filesCompacting.contains(sf)) { 1042 LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimestamp is " 1043 + fileTs + ", which is below " + maxTs); 1044 if (expiredStoreFiles == null) { 1045 expiredStoreFiles = new ArrayList<>(); 1046 } 1047 expiredStoreFiles.add(sf); 1048 } 1049 } 1050 } 1051 return expiredStoreFiles; 1052 } 1053 1054 @Override 1055 public double getCompactionPressure() { 1056 State stateLocal = this.state; 1057 if (stateLocal.allFilesCached.size() > blockingFileCount) { 1058 // just a hit to tell others that we have reached the blocking file count. 1059 return 2.0; 1060 } 1061 if (stateLocal.stripeFiles.isEmpty()) { 1062 return 0.0; 1063 } 1064 int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size(); 1065 // do not calculate L0 separately because data will be moved to stripe quickly and in most cases 1066 // we flush data to stripe directly. 1067 int delta = stateLocal.level0Files.isEmpty() ? 0 : 1; 1068 double max = 0.0; 1069 for (ImmutableList<HStoreFile> stripeFile : stateLocal.stripeFiles) { 1070 int stripeFileCount = stripeFile.size(); 1071 double normCount = (double) (stripeFileCount + delta - config.getStripeCompactMinFiles()) 1072 / (blockingFilePerStripe - config.getStripeCompactMinFiles()); 1073 if (normCount >= 1.0) { 1074 // This could happen if stripe is not split evenly. Do not return values that larger than 1075 // 1.0 because we have not reached the blocking file count actually. 1076 return 1.0; 1077 } 1078 if (normCount > max) { 1079 max = normCount; 1080 } 1081 } 1082 return max; 1083 } 1084 1085 @Override 1086 public Comparator<HStoreFile> getStoreFileComparator() { 1087 return StoreFileComparators.SEQ_ID; 1088 } 1089}