001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.CompletionService; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorCompletionService; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.locks.ReadWriteLock; 035import java.util.concurrent.locks.ReentrantReadWriteLock; 036import java.util.function.Function; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics; 041import org.apache.hadoop.hbase.log.HBaseMarkers; 042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; 044import org.apache.hadoop.hbase.regionserver.compactions.Compactor; 045import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 046import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 047import org.apache.hadoop.hbase.util.IOExceptionRunnable; 048import org.apache.hadoop.hbase.util.ReflectionUtils; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 055 056/** 057 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not 058 * all compaction policies, compactors and store file managers are compatible, they are tied 059 * together and replaced together via StoreEngine-s. 060 * <p/> 061 * We expose read write lock methods to upper layer for store operations:<br/> 062 * <ul> 063 * <li>Locked in shared mode when the list of component stores is looked at: 064 * <ul> 065 * <li>all reads/writes to table data</li> 066 * <li>checking for split</li> 067 * </ul> 068 * </li> 069 * <li>Locked in exclusive mode when the list of component stores is modified: 070 * <ul> 071 * <li>closing</li> 072 * <li>completing a compaction</li> 073 * </ul> 074 * </li> 075 * </ul> 076 * <p/> 077 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As 078 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM 079 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but 080 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory 081 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of 082 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem} 083 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states, 084 * we will hold write lock when updating it, the lock is also used to protect the normal read/write 085 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in 086 * memory state is, stripe or not, it does not effect how we track the store files. So consider all 087 * these facts, here we introduce a separated SFT to track the store files. 088 * <p/> 089 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in 090 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to 091 * reduce the possible misuse. 092 */ 093@InterfaceAudience.Private 094public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy, 095 C extends Compactor<?>, SFM extends StoreFileManager> { 096 097 private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); 098 099 protected SF storeFlusher; 100 protected CP compactionPolicy; 101 protected C compactor; 102 protected SFM storeFileManager; 103 104 private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics(); 105 private Configuration conf; 106 private StoreContext ctx; 107 private RegionCoprocessorHost coprocessorHost; 108 private Function<String, ExecutorService> openStoreFileThreadPoolCreator; 109 private StoreFileTracker storeFileTracker; 110 111 private final ReadWriteLock storeLock = new ReentrantReadWriteLock(); 112 113 /** 114 * The name of the configuration parameter that specifies the class of a store engine that is used 115 * to manage and compact HBase store files. 116 */ 117 public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class"; 118 119 private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS = 120 DefaultStoreEngine.class; 121 122 /** 123 * Acquire read lock of this store. 124 */ 125 public void readLock() { 126 storeLock.readLock().lock(); 127 } 128 129 /** 130 * Release read lock of this store. 131 */ 132 public void readUnlock() { 133 storeLock.readLock().unlock(); 134 } 135 136 /** 137 * Acquire write lock of this store. 138 */ 139 public void writeLock() { 140 storeLock.writeLock().lock(); 141 } 142 143 /** 144 * Release write lock of this store. 145 */ 146 public void writeUnlock() { 147 storeLock.writeLock().unlock(); 148 } 149 150 /** Returns Compaction policy to use. */ 151 public CompactionPolicy getCompactionPolicy() { 152 return this.compactionPolicy; 153 } 154 155 /** Returns Compactor to use. */ 156 public Compactor<?> getCompactor() { 157 return this.compactor; 158 } 159 160 /** Returns Store file manager to use. */ 161 public StoreFileManager getStoreFileManager() { 162 return this.storeFileManager; 163 } 164 165 /** Returns Store flusher to use. */ 166 public StoreFlusher getStoreFlusher() { 167 return this.storeFlusher; 168 } 169 170 private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) { 171 return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(), 172 store.getStoreContext()); 173 } 174 175 /** 176 * @param filesCompacting Files currently compacting 177 * @return whether a compaction selection is possible 178 */ 179 public abstract boolean needsCompaction(List<HStoreFile> filesCompacting); 180 181 /** 182 * Creates an instance of a compaction context specific to this engine. Doesn't actually select or 183 * start a compaction. See CompactionContext class comment. 184 * @return New CompactionContext object. 185 */ 186 public abstract CompactionContext createCompaction() throws IOException; 187 188 /** 189 * Create the StoreEngine's components. 190 */ 191 protected abstract void createComponents(Configuration conf, HStore store, 192 CellComparator cellComparator) throws IOException; 193 194 protected final void createComponentsOnce(Configuration conf, HStore store, 195 CellComparator cellComparator) throws IOException { 196 assert compactor == null && compactionPolicy == null && storeFileManager == null 197 && storeFlusher == null && storeFileTracker == null; 198 createComponents(conf, store, cellComparator); 199 this.conf = conf; 200 this.ctx = store.getStoreContext(); 201 this.coprocessorHost = store.getHRegion().getCoprocessorHost(); 202 this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool; 203 this.storeFileTracker = createStoreFileTracker(conf, store); 204 assert compactor != null && compactionPolicy != null && storeFileManager != null 205 && storeFlusher != null && storeFileTracker != null; 206 } 207 208 /** 209 * Create a writer for writing new store files. 210 * @return Writer for a new StoreFile 211 */ 212 public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 213 return storeFileTracker.createWriter(params); 214 } 215 216 public HStoreFile createStoreFileAndReader(Path p) throws IOException { 217 StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p, 218 ctx.isPrimaryReplicaStore()); 219 return createStoreFileAndReader(info); 220 } 221 222 public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 223 info.setRegionCoprocessorHost(coprocessorHost); 224 HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(), 225 ctx.getCacheConf(), bloomFilterMetrics); 226 storeFile.initReader(); 227 return storeFile; 228 } 229 230 /** 231 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 232 * operation. 233 * @param path the path to the store file 234 */ 235 public void validateStoreFile(Path path) throws IOException { 236 HStoreFile storeFile = null; 237 try { 238 storeFile = createStoreFileAndReader(path); 239 } catch (IOException e) { 240 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 241 throw e; 242 } finally { 243 if (storeFile != null) { 244 storeFile.closeStoreFile(false); 245 } 246 } 247 } 248 249 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup) 250 throws IOException { 251 if (CollectionUtils.isEmpty(files)) { 252 return Collections.emptyList(); 253 } 254 // initialize the thread pool for opening store files in parallel.. 255 ExecutorService storeFileOpenerThreadPool = 256 openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName() 257 + "-" + ctx.getFamily().getNameAsString()); 258 CompletionService<HStoreFile> completionService = 259 new ExecutorCompletionService<>(storeFileOpenerThreadPool); 260 261 int totalValidStoreFile = 0; 262 for (StoreFileInfo storeFileInfo : files) { 263 // The StoreFileInfo will carry store configuration down to HFile, we need to set it to 264 // our store's CompoundConfiguration here. 265 storeFileInfo.setConf(conf); 266 // open each store file in parallel 267 completionService.submit(() -> createStoreFileAndReader(storeFileInfo)); 268 totalValidStoreFile++; 269 } 270 271 Set<String> compactedStoreFiles = new HashSet<>(); 272 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 273 IOException ioe = null; 274 try { 275 for (int i = 0; i < totalValidStoreFile; i++) { 276 try { 277 HStoreFile storeFile = completionService.take().get(); 278 if (storeFile != null) { 279 LOG.debug("loaded {}", storeFile); 280 results.add(storeFile); 281 compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); 282 } 283 } catch (InterruptedException e) { 284 if (ioe == null) { 285 ioe = new InterruptedIOException(e.getMessage()); 286 } 287 } catch (ExecutionException e) { 288 if (ioe == null) { 289 ioe = new IOException(e.getCause()); 290 } 291 } 292 } 293 } finally { 294 storeFileOpenerThreadPool.shutdownNow(); 295 } 296 if (ioe != null) { 297 // close StoreFile readers 298 boolean evictOnClose = 299 ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true; 300 for (HStoreFile file : results) { 301 try { 302 if (file != null) { 303 file.closeStoreFile(evictOnClose); 304 } 305 } catch (IOException e) { 306 LOG.warn("Could not close store file {}", file, e); 307 } 308 } 309 throw ioe; 310 } 311 312 // Should not archive the compacted store files when region warmup. See HBASE-22163. 313 if (!warmup) { 314 // Remove the compacted files from result 315 List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size()); 316 for (HStoreFile storeFile : results) { 317 if (compactedStoreFiles.contains(storeFile.getPath().getName())) { 318 LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); 319 storeFile.getReader() 320 .close(storeFile.getCacheConf() != null 321 ? storeFile.getCacheConf().shouldEvictOnClose() 322 : true); 323 filesToRemove.add(storeFile); 324 } 325 } 326 results.removeAll(filesToRemove); 327 if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) { 328 LOG.debug("Moving the files {} to archive", filesToRemove); 329 ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(), 330 filesToRemove); 331 } 332 } 333 334 return results; 335 } 336 337 public void initialize(boolean warmup) throws IOException { 338 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 339 List<HStoreFile> files = openStoreFiles(fileInfos, warmup); 340 storeFileManager.loadFiles(files); 341 } 342 343 public void refreshStoreFiles() throws IOException { 344 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 345 refreshStoreFilesInternal(fileInfos); 346 } 347 348 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 349 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 350 for (String file : newFiles) { 351 storeFiles 352 .add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file)); 353 } 354 refreshStoreFilesInternal(storeFiles); 355 } 356 357 /** 358 * Checks the underlying store files, and opens the files that have not been opened, and removes 359 * the store file readers for store files no longer available. Mainly used by secondary region 360 * replicas to keep up to date with the primary region files. 361 */ 362 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 363 Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles(); 364 Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles(); 365 if (currentFiles == null) { 366 currentFiles = Collections.emptySet(); 367 } 368 if (newFiles == null) { 369 newFiles = Collections.emptySet(); 370 } 371 if (compactedFiles == null) { 372 compactedFiles = Collections.emptySet(); 373 } 374 375 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 376 for (HStoreFile sf : currentFiles) { 377 currentFilesSet.put(sf.getFileInfo(), sf); 378 } 379 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 380 for (HStoreFile sf : compactedFiles) { 381 compactedFilesSet.put(sf.getFileInfo(), sf); 382 } 383 384 Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles); 385 // Exclude the files that have already been compacted 386 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 387 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 388 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 389 390 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 391 return; 392 } 393 394 LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles 395 + " files to remove: " + toBeRemovedFiles); 396 397 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 398 for (StoreFileInfo sfi : toBeRemovedFiles) { 399 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 400 } 401 402 // try to open the files 403 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false); 404 405 // propogate the file changes to the underlying store file manager 406 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> { 407 }, () -> { 408 }); // won't throw an exception 409 } 410 411 /** 412 * Commit the given {@code files}. 413 * <p/> 414 * We will move the file into data directory, and open it. 415 * @param files the files want to commit 416 * @param validate whether to validate the store files 417 * @return the committed store files 418 */ 419 public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException { 420 List<HStoreFile> committedFiles = new ArrayList<>(files.size()); 421 HRegionFileSystem hfs = ctx.getRegionFileSystem(); 422 String familyName = ctx.getFamily().getNameAsString(); 423 Path storeDir = hfs.getStoreDir(familyName); 424 for (Path file : files) { 425 try { 426 if (validate) { 427 validateStoreFile(file); 428 } 429 Path committedPath; 430 // As we want to support writing to data directory directly, here we need to check whether 431 // the store file is already in the right place 432 if (file.getParent() != null && file.getParent().equals(storeDir)) { 433 // already in the right place, skip renmaing 434 committedPath = file; 435 } else { 436 // Write-out finished successfully, move into the right spot 437 committedPath = hfs.commitStoreFile(familyName, file); 438 } 439 HStoreFile sf = createStoreFileAndReader(committedPath); 440 committedFiles.add(sf); 441 } catch (IOException e) { 442 LOG.error("Failed to commit store file {}", file, e); 443 // Try to delete the files we have committed before. 444 // It is OK to fail when deleting as leaving the file there does not cause any data 445 // corruption problem. It just introduces some duplicated data which may impact read 446 // performance a little when reading before compaction. 447 for (HStoreFile sf : committedFiles) { 448 Path pathToDelete = sf.getPath(); 449 try { 450 sf.deleteStoreFile(); 451 } catch (IOException deleteEx) { 452 LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete, 453 deleteEx); 454 } 455 } 456 throw new IOException("Failed to commit the flush", e); 457 } 458 } 459 return committedFiles; 460 } 461 462 /** 463 * Add the store files to store file manager, and also record it in the store file tracker. 464 * <p/> 465 * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under 466 * the lock protection. Usually this is for clear the memstore snapshot. 467 */ 468 public void addStoreFiles(Collection<HStoreFile> storeFiles, 469 IOExceptionRunnable actionAfterAdding) throws IOException { 470 storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles)); 471 writeLock(); 472 try { 473 storeFileManager.insertNewFiles(storeFiles); 474 actionAfterAdding.run(); 475 } finally { 476 // We need the lock, as long as we are updating the storeFiles 477 // or changing the memstore. Let us release it before calling 478 // notifyChangeReadersObservers. See HBASE-4485 for a possible 479 // deadlock scenario that could have happened if continue to hold 480 // the lock. 481 writeUnlock(); 482 } 483 } 484 485 public void replaceStoreFiles(Collection<HStoreFile> compactedFiles, 486 Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock) 487 throws IOException { 488 storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles), 489 StoreUtils.toStoreFileInfo(newFiles)); 490 walMarkerWriter.run(); 491 writeLock(); 492 try { 493 storeFileManager.addCompactionResults(compactedFiles, newFiles); 494 actionUnderLock.run(); 495 } finally { 496 writeUnlock(); 497 } 498 } 499 500 public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) { 501 writeLock(); 502 try { 503 storeFileManager.removeCompactedFiles(compactedFiles); 504 } finally { 505 writeUnlock(); 506 } 507 } 508 509 /** 510 * Create the StoreEngine configured for the given Store. 511 * @param store The store. An unfortunate dependency needed due to it being passed to 512 * coprocessors via the compactor. 513 * @param conf Store configuration. 514 * @param cellComparator CellComparator for storeFileManager. 515 * @return StoreEngine to use. 516 */ 517 public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf, 518 CellComparator cellComparator) throws IOException { 519 String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); 520 try { 521 StoreEngine<?, ?, ?, ?> se = 522 ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {}); 523 se.createComponentsOnce(conf, store, cellComparator); 524 return se; 525 } catch (Exception e) { 526 throw new IOException("Unable to load configured store engine '" + className + "'", e); 527 } 528 } 529 530 /** 531 * Whether the implementation of the used storefile tracker requires you to write to temp 532 * directory first, i.e, does not allow broken store files under the actual data directory. 533 */ 534 public boolean requireWritingToTmpDirFirst() { 535 return storeFileTracker.requireWritingToTmpDirFirst(); 536 } 537 538 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 539 allowedOnPath = ".*/TestHStore.java") 540 ReadWriteLock getLock() { 541 return storeLock; 542 } 543 544 public BloomFilterMetrics getBloomFilterMetrics() { 545 return bloomFilterMetrics; 546 } 547}