001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.CompletionService; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorCompletionService; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.locks.ReadWriteLock; 035import java.util.concurrent.locks.ReentrantReadWriteLock; 036import java.util.function.Function; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.ExtendedCell; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.conf.ConfigKey; 043import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics; 044import org.apache.hadoop.hbase.log.HBaseMarkers; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 046import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; 047import org.apache.hadoop.hbase.regionserver.compactions.Compactor; 048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 049import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 050import org.apache.hadoop.hbase.util.IOExceptionRunnable; 051import org.apache.hadoop.hbase.util.ReflectionUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 057import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 058 059/** 060 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not 061 * all compaction policies, compactors and store file managers are compatible, they are tied 062 * together and replaced together via StoreEngine-s. 063 * <p/> 064 * We expose read write lock methods to upper layer for store operations:<br/> 065 * <ul> 066 * <li>Locked in shared mode when the list of component stores is looked at: 067 * <ul> 068 * <li>all reads/writes to table data</li> 069 * <li>checking for split</li> 070 * </ul> 071 * </li> 072 * <li>Locked in exclusive mode when the list of component stores is modified: 073 * <ul> 074 * <li>closing</li> 075 * <li>completing a compaction</li> 076 * </ul> 077 * </li> 078 * </ul> 079 * <p/> 080 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As 081 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM 082 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but 083 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory 084 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of 085 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem} 086 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states, 087 * we will hold write lock when updating it, the lock is also used to protect the normal read/write 088 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in 089 * memory state is, stripe or not, it does not effect how we track the store files. So consider all 090 * these facts, here we introduce a separated SFT to track the store files. 091 * <p/> 092 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in 093 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to 094 * reduce the possible misuse. 095 */ 096@InterfaceAudience.Private 097public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy, 098 C extends Compactor<?>, SFM extends StoreFileManager> { 099 100 private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); 101 102 private static final String READ_FULLY_ON_VALIDATE_KEY = "hbase.hstore.validate.read_fully"; 103 private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false; 104 105 protected SF storeFlusher; 106 protected CP compactionPolicy; 107 protected C compactor; 108 protected SFM storeFileManager; 109 110 private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics(); 111 private Configuration conf; 112 private StoreContext ctx; 113 private RegionCoprocessorHost coprocessorHost; 114 private Function<String, ExecutorService> openStoreFileThreadPoolCreator; 115 private StoreFileTracker storeFileTracker; 116 117 private final ReadWriteLock storeLock = new ReentrantReadWriteLock(); 118 119 /** 120 * The name of the configuration parameter that specifies the class of a store engine that is used 121 * to manage and compact HBase store files. 122 */ 123 public static final String STORE_ENGINE_CLASS_KEY = 124 ConfigKey.CLASS("hbase.hstore.engine.class", StoreEngine.class); 125 126 private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS = 127 DefaultStoreEngine.class; 128 129 /** 130 * Acquire read lock of this store. 131 */ 132 public void readLock() { 133 storeLock.readLock().lock(); 134 } 135 136 /** 137 * Release read lock of this store. 138 */ 139 public void readUnlock() { 140 storeLock.readLock().unlock(); 141 } 142 143 /** 144 * Acquire write lock of this store. 145 */ 146 public void writeLock() { 147 storeLock.writeLock().lock(); 148 } 149 150 /** 151 * Release write lock of this store. 152 */ 153 public void writeUnlock() { 154 storeLock.writeLock().unlock(); 155 } 156 157 /** Returns Compaction policy to use. */ 158 public CompactionPolicy getCompactionPolicy() { 159 return this.compactionPolicy; 160 } 161 162 /** Returns Compactor to use. */ 163 public Compactor<?> getCompactor() { 164 return this.compactor; 165 } 166 167 /** Returns Store file manager to use. */ 168 public StoreFileManager getStoreFileManager() { 169 return this.storeFileManager; 170 } 171 172 /** Returns Store flusher to use. */ 173 StoreFlusher getStoreFlusher() { 174 return this.storeFlusher; 175 } 176 177 private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) { 178 return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(), 179 store.getStoreContext()); 180 } 181 182 /** 183 * @param filesCompacting Files currently compacting 184 * @return whether a compaction selection is possible 185 */ 186 public abstract boolean needsCompaction(List<HStoreFile> filesCompacting); 187 188 /** 189 * Creates an instance of a compaction context specific to this engine. Doesn't actually select or 190 * start a compaction. See CompactionContext class comment. 191 * @return New CompactionContext object. 192 */ 193 public abstract CompactionContext createCompaction() throws IOException; 194 195 /** 196 * Create the StoreEngine's components. 197 */ 198 protected abstract void createComponents(Configuration conf, HStore store, 199 CellComparator cellComparator) throws IOException; 200 201 protected final void createComponentsOnce(Configuration conf, HStore store, 202 CellComparator cellComparator) throws IOException { 203 assert compactor == null && compactionPolicy == null && storeFileManager == null 204 && storeFlusher == null && storeFileTracker == null; 205 createComponents(conf, store, cellComparator); 206 this.conf = conf; 207 this.ctx = store.getStoreContext(); 208 this.coprocessorHost = store.getHRegion().getCoprocessorHost(); 209 this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool; 210 this.storeFileTracker = createStoreFileTracker(conf, store); 211 assert compactor != null && compactionPolicy != null && storeFileManager != null 212 && storeFlusher != null; 213 } 214 215 /** 216 * Create a writer for writing new store files. 217 * @return Writer for a new StoreFile 218 */ 219 public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 220 return storeFileTracker.createWriter(params); 221 } 222 223 public HStoreFile createStoreFileAndReader(Path p) throws IOException { 224 StoreFileInfo info = storeFileTracker.getStoreFileInfo(p, ctx.isPrimaryReplicaStore()); 225 return createStoreFileAndReader(info); 226 } 227 228 public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 229 info.setRegionCoprocessorHost(coprocessorHost); 230 HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(), 231 ctx.getCacheConf(), bloomFilterMetrics); 232 storeFile.initReader(); 233 return storeFile; 234 } 235 236 /** 237 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 238 * operation. 239 * @param path the path to the store file 240 * @param isCompaction whether this is called from the context of a compaction 241 */ 242 public void validateStoreFile(Path path, boolean isCompaction) throws IOException { 243 HStoreFile storeFile = null; 244 try { 245 storeFile = createStoreFileAndReader(path); 246 if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, DEFAULT_READ_FULLY_ON_VALIDATE)) { 247 if (storeFile.getFirstKey().isEmpty()) { 248 LOG.debug("'{}=true' but storefile does not contain any data. skipping validation.", 249 READ_FULLY_ON_VALIDATE_KEY); 250 return; 251 } 252 LOG.debug("Validating the store file by reading the first cell from each block : {}", path); 253 StoreFileReader reader = storeFile.getReader(); 254 try (StoreFileScanner scanner = 255 reader.getStoreFileScanner(false, false, isCompaction, Long.MAX_VALUE, 0, false)) { 256 boolean hasNext = scanner.seek(KeyValue.LOWESTKEY); 257 assert hasNext : "StoreFile contains no data"; 258 for (ExtendedCell cell = scanner.next(); cell != null; cell = scanner.next()) { 259 ExtendedCell nextIndexedKey = scanner.getNextIndexedKey(); 260 if (nextIndexedKey == null) { 261 break; 262 } 263 scanner.seek(nextIndexedKey); 264 } 265 } 266 } 267 } catch (IOException e) { 268 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 269 throw e; 270 } finally { 271 if (storeFile != null) { 272 storeFile.closeStoreFile(false); 273 } 274 } 275 } 276 277 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup) 278 throws IOException { 279 if (CollectionUtils.isEmpty(files)) { 280 return Collections.emptyList(); 281 } 282 // initialize the thread pool for opening store files in parallel.. 283 ExecutorService storeFileOpenerThreadPool = 284 openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName() 285 + "-" + ctx.getFamily().getNameAsString()); 286 CompletionService<HStoreFile> completionService = 287 new ExecutorCompletionService<>(storeFileOpenerThreadPool); 288 289 int totalValidStoreFile = 0; 290 for (StoreFileInfo storeFileInfo : files) { 291 // The StoreFileInfo will carry store configuration down to HFile, we need to set it to 292 // our store's CompoundConfiguration here. 293 storeFileInfo.setConf(conf); 294 // open each store file in parallel 295 completionService.submit(() -> createStoreFileAndReader(storeFileInfo)); 296 totalValidStoreFile++; 297 } 298 299 Set<String> compactedStoreFiles = new HashSet<>(); 300 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 301 IOException ioe = null; 302 try { 303 for (int i = 0; i < totalValidStoreFile; i++) { 304 try { 305 HStoreFile storeFile = completionService.take().get(); 306 if (storeFile != null) { 307 LOG.debug("loaded {}", storeFile); 308 results.add(storeFile); 309 compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); 310 } 311 } catch (InterruptedException e) { 312 if (ioe == null) { 313 ioe = new InterruptedIOException(e.getMessage()); 314 } 315 } catch (ExecutionException e) { 316 if (ioe == null) { 317 ioe = new IOException(e.getCause()); 318 } 319 } 320 } 321 } finally { 322 storeFileOpenerThreadPool.shutdownNow(); 323 } 324 if (ioe != null) { 325 // close StoreFile readers 326 boolean evictOnClose = ctx.getCacheConf() == null || ctx.getCacheConf().shouldEvictOnClose(); 327 for (HStoreFile file : results) { 328 try { 329 if (file != null) { 330 file.closeStoreFile(evictOnClose); 331 } 332 } catch (IOException e) { 333 LOG.warn("Could not close store file {}", file, e); 334 } 335 } 336 throw ioe; 337 } 338 339 // Should not archive the compacted store files when region warmup. See HBASE-22163. 340 if (!warmup) { 341 // Remove the compacted files from result 342 List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size()); 343 for (HStoreFile storeFile : results) { 344 if (compactedStoreFiles.contains(storeFile.getPath().getName())) { 345 LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); 346 storeFile.getReader().close( 347 storeFile.getCacheConf() == null || storeFile.getCacheConf().shouldEvictOnClose()); 348 filesToRemove.add(storeFile); 349 } 350 } 351 results.removeAll(filesToRemove); 352 if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) { 353 LOG.debug("Moving the files {} to archive", filesToRemove); 354 ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(), 355 filesToRemove); 356 } 357 } 358 359 return results; 360 } 361 362 public void initialize(boolean warmup) throws IOException { 363 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 364 List<HStoreFile> files = openStoreFiles(fileInfos, warmup); 365 storeFileManager.loadFiles(files); 366 } 367 368 public void refreshStoreFiles() throws IOException { 369 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 370 refreshStoreFilesInternal(fileInfos); 371 } 372 373 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 374 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 375 for (String file : newFiles) { 376 storeFiles.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), 377 file, storeFileTracker)); 378 } 379 refreshStoreFilesInternal(storeFiles); 380 } 381 382 /** 383 * Checks the underlying store files, and opens the files that have not been opened, and removes 384 * the store file readers for store files no longer available. Mainly used by secondary region 385 * replicas to keep up to date with the primary region files. 386 */ 387 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 388 Collection<HStoreFile> currentFiles = storeFileManager.getStoreFiles(); 389 Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles(); 390 if (currentFiles == null) { 391 currentFiles = Collections.emptySet(); 392 } 393 if (newFiles == null) { 394 newFiles = Collections.emptySet(); 395 } 396 if (compactedFiles == null) { 397 compactedFiles = Collections.emptySet(); 398 } 399 400 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 401 for (HStoreFile sf : currentFiles) { 402 currentFilesSet.put(sf.getFileInfo(), sf); 403 } 404 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 405 for (HStoreFile sf : compactedFiles) { 406 compactedFilesSet.put(sf.getFileInfo(), sf); 407 } 408 409 Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles); 410 // Exclude the files that have already been compacted 411 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 412 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 413 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 414 415 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 416 return; 417 } 418 419 LOG.info("Refreshing store files for {} files to add: {} files to remove: {}", this, 420 toBeAddedFiles, toBeRemovedFiles); 421 422 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 423 for (StoreFileInfo sfi : toBeRemovedFiles) { 424 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 425 } 426 427 // try to open the files 428 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false); 429 430 // propagate the file changes to the underlying store file manager 431 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> { 432 }, () -> { 433 }); // won't throw an exception 434 } 435 436 /** 437 * Commit the given {@code files}. 438 * <p/> 439 * We will move the file into data directory, and open it. 440 * @param files the files want to commit 441 * @param isCompaction whether this is called from the context of a compaction 442 * @param validate whether to validate the store files 443 * @return the committed store files 444 */ 445 public List<HStoreFile> commitStoreFiles(List<Path> files, boolean isCompaction, boolean validate) 446 throws IOException { 447 List<HStoreFile> committedFiles = new ArrayList<>(files.size()); 448 HRegionFileSystem hfs = ctx.getRegionFileSystem(); 449 String familyName = ctx.getFamily().getNameAsString(); 450 Path storeDir = hfs.getStoreDir(familyName); 451 for (Path file : files) { 452 try { 453 if (validate) { 454 validateStoreFile(file, isCompaction); 455 } 456 Path committedPath; 457 // As we want to support writing to data directory directly, here we need to check whether 458 // the store file is already in the right place 459 if (file.getParent() != null && file.getParent().equals(storeDir)) { 460 // already in the right place, skip renaming 461 committedPath = file; 462 } else { 463 // Write-out finished successfully, move into the right spot 464 committedPath = hfs.commitStoreFile(familyName, file); 465 } 466 HStoreFile sf = createStoreFileAndReader(committedPath); 467 committedFiles.add(sf); 468 } catch (IOException e) { 469 LOG.error("Failed to commit store file {}", file, e); 470 // Try to delete the files we have committed before. 471 // It is OK to fail when deleting as leaving the file there does not cause any data 472 // corruption problem. It just introduces some duplicated data which may impact read 473 // performance a little when reading before compaction. 474 for (HStoreFile sf : committedFiles) { 475 Path pathToDelete = sf.getPath(); 476 try { 477 sf.deleteStoreFile(); 478 } catch (IOException deleteEx) { 479 LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete, 480 deleteEx); 481 } 482 } 483 throw new IOException("Failed to commit the flush", e); 484 } 485 } 486 return committedFiles; 487 } 488 489 /** 490 * Add the store files to store file manager, and also record it in the store file tracker. 491 * <p/> 492 * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under 493 * the lock protection. Usually this is for clear the memstore snapshot. 494 */ 495 public void addStoreFiles(Collection<HStoreFile> storeFiles, 496 IOExceptionRunnable actionAfterAdding) throws IOException { 497 storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles)); 498 writeLock(); 499 try { 500 storeFileManager.insertNewFiles(storeFiles); 501 actionAfterAdding.run(); 502 } finally { 503 // We need the lock, as long as we are updating the storeFiles 504 // or changing the memstore. Let us release it before calling 505 // notifyChangeReadersObservers. See HBASE-4485 for a possible 506 // deadlock scenario that could have happened if continue to hold 507 // the lock. 508 writeUnlock(); 509 } 510 } 511 512 public void replaceStoreFiles(Collection<HStoreFile> compactedFiles, 513 Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock) 514 throws IOException { 515 storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles), 516 StoreUtils.toStoreFileInfo(newFiles)); 517 walMarkerWriter.run(); 518 writeLock(); 519 try { 520 storeFileManager.addCompactionResults(compactedFiles, newFiles); 521 actionUnderLock.run(); 522 } finally { 523 writeUnlock(); 524 } 525 } 526 527 public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) { 528 writeLock(); 529 try { 530 storeFileManager.removeCompactedFiles(compactedFiles); 531 } finally { 532 writeUnlock(); 533 } 534 } 535 536 /** 537 * Create the StoreEngine configured for the given Store. 538 * @param store The store. An unfortunate dependency needed due to it being passed to 539 * coprocessors via the compactor. 540 * @param conf Store configuration. 541 * @param cellComparator CellComparator for storeFileManager. 542 * @return StoreEngine to use. 543 */ 544 public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf, 545 CellComparator cellComparator) throws IOException { 546 String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); 547 try { 548 StoreEngine<?, ?, ?, ?> se = 549 ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {}); 550 se.createComponentsOnce(conf, store, cellComparator); 551 return se; 552 } catch (Exception e) { 553 throw new IOException("Unable to load configured store engine '" + className + "'", e); 554 } 555 } 556 557 /** 558 * Whether the implementation of the used storefile tracker requires you to write to temp 559 * directory first, i.e, does not allow broken store files under the actual data directory. 560 */ 561 public boolean requireWritingToTmpDirFirst() { 562 return storeFileTracker.requireWritingToTmpDirFirst(); 563 } 564 565 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 566 allowedOnPath = ".*/TestHStore.java") 567 ReadWriteLock getLock() { 568 return storeLock; 569 } 570 571 public BloomFilterMetrics getBloomFilterMetrics() { 572 return bloomFilterMetrics; 573 } 574}