001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.CompletionService; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorCompletionService; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.locks.ReadWriteLock; 035import java.util.concurrent.locks.ReentrantReadWriteLock; 036import java.util.function.Function; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.ExtendedCell; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.conf.ConfigKey; 043import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics; 044import org.apache.hadoop.hbase.keymeta.ManagedKeyDataCache; 045import org.apache.hadoop.hbase.keymeta.SystemKeyCache; 046import org.apache.hadoop.hbase.log.HBaseMarkers; 047import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 048import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; 049import org.apache.hadoop.hbase.regionserver.compactions.Compactor; 050import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 051import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 052import org.apache.hadoop.hbase.util.IOExceptionRunnable; 053import org.apache.hadoop.hbase.util.ReflectionUtils; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 059import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 060 061/** 062 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not 063 * all compaction policies, compactors and store file managers are compatible, they are tied 064 * together and replaced together via StoreEngine-s. 065 * <p/> 066 * We expose read write lock methods to upper layer for store operations:<br/> 067 * <ul> 068 * <li>Locked in shared mode when the list of component stores is looked at: 069 * <ul> 070 * <li>all reads/writes to table data</li> 071 * <li>checking for split</li> 072 * </ul> 073 * </li> 074 * <li>Locked in exclusive mode when the list of component stores is modified: 075 * <ul> 076 * <li>closing</li> 077 * <li>completing a compaction</li> 078 * </ul> 079 * </li> 080 * </ul> 081 * <p/> 082 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As 083 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM 084 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but 085 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory 086 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of 087 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem} 088 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states, 089 * we will hold write lock when updating it, the lock is also used to protect the normal read/write 090 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in 091 * memory state is, stripe or not, it does not effect how we track the store files. So consider all 092 * these facts, here we introduce a separated SFT to track the store files. 093 * <p/> 094 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in 095 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to 096 * reduce the possible misuse. 097 */ 098@InterfaceAudience.Private 099public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy, 100 C extends Compactor<?>, SFM extends StoreFileManager> { 101 102 private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); 103 104 private static final String READ_FULLY_ON_VALIDATE_KEY = "hbase.hstore.validate.read_fully"; 105 private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false; 106 107 protected SF storeFlusher; 108 protected CP compactionPolicy; 109 protected C compactor; 110 protected SFM storeFileManager; 111 112 private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics(); 113 private Configuration conf; 114 private StoreContext ctx; 115 private RegionCoprocessorHost coprocessorHost; 116 private Function<String, ExecutorService> openStoreFileThreadPoolCreator; 117 private StoreFileTracker storeFileTracker; 118 119 private final ReadWriteLock storeLock = new ReentrantReadWriteLock(); 120 121 private ManagedKeyDataCache managedKeyDataCache; 122 123 private SystemKeyCache systemKeyCache; 124 125 /** 126 * The name of the configuration parameter that specifies the class of a store engine that is used 127 * to manage and compact HBase store files. 128 */ 129 public static final String STORE_ENGINE_CLASS_KEY = 130 ConfigKey.CLASS("hbase.hstore.engine.class", StoreEngine.class); 131 132 private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS = 133 DefaultStoreEngine.class; 134 135 /** 136 * Acquire read lock of this store. 137 */ 138 public void readLock() { 139 storeLock.readLock().lock(); 140 } 141 142 /** 143 * Release read lock of this store. 144 */ 145 public void readUnlock() { 146 storeLock.readLock().unlock(); 147 } 148 149 /** 150 * Acquire write lock of this store. 151 */ 152 public void writeLock() { 153 storeLock.writeLock().lock(); 154 } 155 156 /** 157 * Release write lock of this store. 158 */ 159 public void writeUnlock() { 160 storeLock.writeLock().unlock(); 161 } 162 163 /** Returns Compaction policy to use. */ 164 public CompactionPolicy getCompactionPolicy() { 165 return this.compactionPolicy; 166 } 167 168 /** Returns Compactor to use. */ 169 public Compactor<?> getCompactor() { 170 return this.compactor; 171 } 172 173 /** Returns Store file manager to use. */ 174 public StoreFileManager getStoreFileManager() { 175 return this.storeFileManager; 176 } 177 178 /** Returns Store flusher to use. */ 179 StoreFlusher getStoreFlusher() { 180 return this.storeFlusher; 181 } 182 183 private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) { 184 return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(), 185 store.getStoreContext()); 186 } 187 188 /** 189 * @param filesCompacting Files currently compacting 190 * @return whether a compaction selection is possible 191 */ 192 public abstract boolean needsCompaction(List<HStoreFile> filesCompacting); 193 194 /** 195 * Creates an instance of a compaction context specific to this engine. Doesn't actually select or 196 * start a compaction. See CompactionContext class comment. 197 * @return New CompactionContext object. 198 */ 199 public abstract CompactionContext createCompaction() throws IOException; 200 201 /** 202 * Create the StoreEngine's components. 203 */ 204 protected abstract void createComponents(Configuration conf, HStore store, 205 CellComparator cellComparator) throws IOException; 206 207 protected final void createComponentsOnce(Configuration conf, HStore store, 208 CellComparator cellComparator) throws IOException { 209 assert compactor == null && compactionPolicy == null && storeFileManager == null 210 && storeFlusher == null && storeFileTracker == null; 211 createComponents(conf, store, cellComparator); 212 this.conf = conf; 213 this.ctx = store.getStoreContext(); 214 this.coprocessorHost = store.getHRegion().getCoprocessorHost(); 215 this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool; 216 this.storeFileTracker = createStoreFileTracker(conf, store); 217 this.managedKeyDataCache = store.getHRegion().getManagedKeyDataCache(); 218 this.systemKeyCache = store.getHRegion().getSystemKeyCache(); 219 assert compactor != null && compactionPolicy != null && storeFileManager != null 220 && storeFlusher != null; 221 } 222 223 /** 224 * Create a writer for writing new store files. 225 * @return Writer for a new StoreFile 226 */ 227 public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 228 return storeFileTracker.createWriter(params); 229 } 230 231 public HStoreFile createStoreFileAndReader(Path p) throws IOException { 232 StoreFileInfo info = storeFileTracker.getStoreFileInfo(p, ctx.isPrimaryReplicaStore()); 233 return createStoreFileAndReader(info); 234 } 235 236 public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 237 info.setRegionCoprocessorHost(coprocessorHost); 238 HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(), 239 ctx.getCacheConf(), bloomFilterMetrics, null, // keyNamespace - not yet implemented 240 systemKeyCache, managedKeyDataCache); 241 storeFile.initReader(); 242 return storeFile; 243 } 244 245 /** 246 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 247 * operation. 248 * @param path the path to the store file 249 * @param isCompaction whether this is called from the context of a compaction 250 */ 251 public void validateStoreFile(Path path, boolean isCompaction) throws IOException { 252 HStoreFile storeFile = null; 253 try { 254 storeFile = createStoreFileAndReader(path); 255 if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, DEFAULT_READ_FULLY_ON_VALIDATE)) { 256 if (storeFile.getFirstKey().isEmpty()) { 257 LOG.debug("'{}=true' but storefile does not contain any data. skipping validation.", 258 READ_FULLY_ON_VALIDATE_KEY); 259 return; 260 } 261 LOG.debug("Validating the store file by reading the first cell from each block : {}", path); 262 StoreFileReader reader = storeFile.getReader(); 263 try (StoreFileScanner scanner = 264 reader.getStoreFileScanner(false, false, isCompaction, Long.MAX_VALUE, 0, false)) { 265 boolean hasNext = scanner.seek(KeyValue.LOWESTKEY); 266 assert hasNext : "StoreFile contains no data"; 267 for (ExtendedCell cell = scanner.next(); cell != null; cell = scanner.next()) { 268 ExtendedCell nextIndexedKey = scanner.getNextIndexedKey(); 269 if (nextIndexedKey == null) { 270 break; 271 } 272 scanner.seek(nextIndexedKey); 273 } 274 } 275 } 276 } catch (IOException e) { 277 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 278 throw e; 279 } finally { 280 if (storeFile != null) { 281 storeFile.closeStoreFile(false); 282 } 283 } 284 } 285 286 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup) 287 throws IOException { 288 if (CollectionUtils.isEmpty(files)) { 289 return Collections.emptyList(); 290 } 291 // initialize the thread pool for opening store files in parallel.. 292 ExecutorService storeFileOpenerThreadPool = 293 openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName() 294 + "-" + ctx.getFamily().getNameAsString()); 295 CompletionService<HStoreFile> completionService = 296 new ExecutorCompletionService<>(storeFileOpenerThreadPool); 297 298 int totalValidStoreFile = 0; 299 for (StoreFileInfo storeFileInfo : files) { 300 // The StoreFileInfo will carry store configuration down to HFile, we need to set it to 301 // our store's CompoundConfiguration here. 302 storeFileInfo.setConf(conf); 303 // open each store file in parallel 304 completionService.submit(() -> createStoreFileAndReader(storeFileInfo)); 305 totalValidStoreFile++; 306 } 307 308 Set<String> compactedStoreFiles = new HashSet<>(); 309 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 310 IOException ioe = null; 311 try { 312 for (int i = 0; i < totalValidStoreFile; i++) { 313 try { 314 HStoreFile storeFile = completionService.take().get(); 315 if (storeFile != null) { 316 LOG.debug("loaded {}", storeFile); 317 results.add(storeFile); 318 compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); 319 } 320 } catch (InterruptedException e) { 321 if (ioe == null) { 322 ioe = new InterruptedIOException(e.getMessage()); 323 } 324 } catch (ExecutionException e) { 325 if (ioe == null) { 326 ioe = new IOException(e.getCause()); 327 } 328 } 329 } 330 } finally { 331 storeFileOpenerThreadPool.shutdownNow(); 332 } 333 if (ioe != null) { 334 // close StoreFile readers 335 boolean evictOnClose = ctx.getCacheConf() == null || ctx.getCacheConf().shouldEvictOnClose(); 336 for (HStoreFile file : results) { 337 try { 338 if (file != null) { 339 file.closeStoreFile(evictOnClose); 340 } 341 } catch (IOException e) { 342 LOG.warn("Could not close store file {}", file, e); 343 } 344 } 345 throw ioe; 346 } 347 348 // Should not archive the compacted store files when region warmup. See HBASE-22163. 349 if (!warmup) { 350 // Remove the compacted files from result 351 List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size()); 352 for (HStoreFile storeFile : results) { 353 if (compactedStoreFiles.contains(storeFile.getPath().getName())) { 354 LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); 355 storeFile.getReader().close( 356 storeFile.getCacheConf() == null || storeFile.getCacheConf().shouldEvictOnClose()); 357 filesToRemove.add(storeFile); 358 } 359 } 360 results.removeAll(filesToRemove); 361 if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) { 362 LOG.debug("Moving the files {} to archive", filesToRemove); 363 storeFileTracker.removeStoreFiles(filesToRemove); 364 } 365 } 366 367 return results; 368 } 369 370 public void initialize(boolean warmup) throws IOException { 371 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 372 List<HStoreFile> files = openStoreFiles(fileInfos, warmup); 373 storeFileManager.loadFiles(files); 374 } 375 376 public void refreshStoreFiles() throws IOException { 377 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 378 refreshStoreFilesInternal(fileInfos); 379 } 380 381 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 382 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 383 for (String file : newFiles) { 384 storeFiles.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), 385 file, storeFileTracker)); 386 } 387 refreshStoreFilesInternal(storeFiles); 388 } 389 390 /** 391 * Checks the underlying store files, and opens the files that have not been opened, and removes 392 * the store file readers for store files no longer available. Mainly used by secondary region 393 * replicas to keep up to date with the primary region files. 394 */ 395 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 396 Collection<HStoreFile> currentFiles = storeFileManager.getStoreFiles(); 397 Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles(); 398 if (currentFiles == null) { 399 currentFiles = Collections.emptySet(); 400 } 401 if (newFiles == null) { 402 newFiles = Collections.emptySet(); 403 } 404 if (compactedFiles == null) { 405 compactedFiles = Collections.emptySet(); 406 } 407 408 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 409 for (HStoreFile sf : currentFiles) { 410 currentFilesSet.put(sf.getFileInfo(), sf); 411 } 412 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 413 for (HStoreFile sf : compactedFiles) { 414 compactedFilesSet.put(sf.getFileInfo(), sf); 415 } 416 417 Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles); 418 // Exclude the files that have already been compacted 419 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 420 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 421 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 422 423 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 424 return; 425 } 426 427 LOG.info("Refreshing store files for {} files to add: {} files to remove: {}", this, 428 toBeAddedFiles, toBeRemovedFiles); 429 430 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 431 for (StoreFileInfo sfi : toBeRemovedFiles) { 432 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 433 } 434 435 // try to open the files 436 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false); 437 438 // propagate the file changes to the underlying store file manager 439 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> { 440 }, () -> { 441 }); // won't throw an exception 442 } 443 444 /** 445 * Commit the given {@code files}. 446 * <p/> 447 * We will move the file into data directory, and open it. 448 * @param files the files want to commit 449 * @param isCompaction whether this is called from the context of a compaction 450 * @param validate whether to validate the store files 451 * @return the committed store files 452 */ 453 public List<HStoreFile> commitStoreFiles(List<Path> files, boolean isCompaction, boolean validate) 454 throws IOException { 455 List<HStoreFile> committedFiles = new ArrayList<>(files.size()); 456 HRegionFileSystem hfs = ctx.getRegionFileSystem(); 457 String familyName = ctx.getFamily().getNameAsString(); 458 Path storeDir = hfs.getStoreDir(familyName); 459 for (Path file : files) { 460 try { 461 if (validate) { 462 validateStoreFile(file, isCompaction); 463 } 464 Path committedPath; 465 // As we want to support writing to data directory directly, here we need to check whether 466 // the store file is already in the right place 467 if (file.getParent() != null && file.getParent().equals(storeDir)) { 468 // already in the right place, skip renaming 469 committedPath = file; 470 } else { 471 // Write-out finished successfully, move into the right spot 472 committedPath = hfs.commitStoreFile(familyName, file); 473 } 474 HStoreFile sf = createStoreFileAndReader(committedPath); 475 committedFiles.add(sf); 476 } catch (IOException e) { 477 LOG.error("Failed to commit store file {}", file, e); 478 // Try to delete the files we have committed before. 479 // It is OK to fail when deleting as leaving the file there does not cause any data 480 // corruption problem. It just introduces some duplicated data which may impact read 481 // performance a little when reading before compaction. 482 for (HStoreFile sf : committedFiles) { 483 Path pathToDelete = sf.getPath(); 484 try { 485 sf.deleteStoreFile(); 486 } catch (IOException deleteEx) { 487 LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete, 488 deleteEx); 489 } 490 } 491 throw new IOException("Failed to commit the flush", e); 492 } 493 } 494 return committedFiles; 495 } 496 497 /** 498 * Add the store files to store file manager, and also record it in the store file tracker. 499 * <p/> 500 * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under 501 * the lock protection. Usually this is for clear the memstore snapshot. 502 */ 503 public void addStoreFiles(Collection<HStoreFile> storeFiles, 504 IOExceptionRunnable actionAfterAdding) throws IOException { 505 storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles)); 506 writeLock(); 507 try { 508 storeFileManager.insertNewFiles(storeFiles); 509 actionAfterAdding.run(); 510 } finally { 511 // We need the lock, as long as we are updating the storeFiles 512 // or changing the memstore. Let us release it before calling 513 // notifyChangeReadersObservers. See HBASE-4485 for a possible 514 // deadlock scenario that could have happened if continue to hold 515 // the lock. 516 writeUnlock(); 517 } 518 } 519 520 public void replaceStoreFiles(Collection<HStoreFile> compactedFiles, 521 Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock) 522 throws IOException { 523 storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles), 524 StoreUtils.toStoreFileInfo(newFiles)); 525 walMarkerWriter.run(); 526 writeLock(); 527 try { 528 storeFileManager.addCompactionResults(compactedFiles, newFiles); 529 actionUnderLock.run(); 530 } finally { 531 writeUnlock(); 532 } 533 } 534 535 public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) { 536 writeLock(); 537 try { 538 storeFileManager.removeCompactedFiles(compactedFiles); 539 } finally { 540 writeUnlock(); 541 } 542 } 543 544 /** 545 * Create the StoreEngine configured for the given Store. 546 * @param store The store. An unfortunate dependency needed due to it being passed to 547 * coprocessors via the compactor. 548 * @param conf Store configuration. 549 * @param cellComparator CellComparator for storeFileManager. 550 * @return StoreEngine to use. 551 */ 552 public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf, 553 CellComparator cellComparator) throws IOException { 554 String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); 555 try { 556 StoreEngine<?, ?, ?, ?> se = 557 ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {}); 558 se.createComponentsOnce(conf, store, cellComparator); 559 return se; 560 } catch (Exception e) { 561 throw new IOException("Unable to load configured store engine '" + className + "'", e); 562 } 563 } 564 565 /** 566 * Whether the implementation of the used storefile tracker requires you to write to temp 567 * directory first, i.e, does not allow broken store files under the actual data directory. 568 */ 569 public boolean requireWritingToTmpDirFirst() { 570 return storeFileTracker.requireWritingToTmpDirFirst(); 571 } 572 573 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 574 allowedOnPath = ".*/TestHStore.java") 575 ReadWriteLock getLock() { 576 return storeLock; 577 } 578 579 public BloomFilterMetrics getBloomFilterMetrics() { 580 return bloomFilterMetrics; 581 } 582}