001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.CompletionService; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorCompletionService; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.locks.ReadWriteLock; 035import java.util.concurrent.locks.ReentrantReadWriteLock; 036import java.util.function.Function; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics; 041import org.apache.hadoop.hbase.log.HBaseMarkers; 042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; 044import org.apache.hadoop.hbase.regionserver.compactions.Compactor; 045import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 046import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 047import org.apache.hadoop.hbase.util.ReflectionUtils; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 053import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 054 055/** 056 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not 057 * all compaction policies, compactors and store file managers are compatible, they are tied 058 * together and replaced together via StoreEngine-s. 059 * <p/> 060 * We expose read write lock methods to upper layer for store operations:<br/> 061 * <ul> 062 * <li>Locked in shared mode when the list of component stores is looked at: 063 * <ul> 064 * <li>all reads/writes to table data</li> 065 * <li>checking for split</li> 066 * </ul> 067 * </li> 068 * <li>Locked in exclusive mode when the list of component stores is modified: 069 * <ul> 070 * <li>closing</li> 071 * <li>completing a compaction</li> 072 * </ul> 073 * </li> 074 * </ul> 075 * <p/> 076 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As 077 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM 078 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but 079 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory 080 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of 081 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem} 082 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states, 083 * we will hold write lock when updating it, the lock is also used to protect the normal read/write 084 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in 085 * memory state is, stripe or not, it does not effect how we track the store files. So consider all 086 * these facts, here we introduce a separated SFT to track the store files. 087 * <p/> 088 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in 089 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to 090 * reduce the possible misuse. 091 */ 092@InterfaceAudience.Private 093public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy, 094 C extends Compactor<?>, SFM extends StoreFileManager> { 095 096 private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); 097 098 protected SF storeFlusher; 099 protected CP compactionPolicy; 100 protected C compactor; 101 protected SFM storeFileManager; 102 103 private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics(); 104 private Configuration conf; 105 private StoreContext ctx; 106 private RegionCoprocessorHost coprocessorHost; 107 private Function<String, ExecutorService> openStoreFileThreadPoolCreator; 108 private StoreFileTracker storeFileTracker; 109 110 private final ReadWriteLock storeLock = new ReentrantReadWriteLock(); 111 112 /** 113 * The name of the configuration parameter that specifies the class of a store engine that is used 114 * to manage and compact HBase store files. 115 */ 116 public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class"; 117 118 private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS = 119 DefaultStoreEngine.class; 120 121 /** 122 * Acquire read lock of this store. 123 */ 124 public void readLock() { 125 storeLock.readLock().lock(); 126 } 127 128 /** 129 * Release read lock of this store. 130 */ 131 public void readUnlock() { 132 storeLock.readLock().unlock(); 133 } 134 135 /** 136 * Acquire write lock of this store. 137 */ 138 public void writeLock() { 139 storeLock.writeLock().lock(); 140 } 141 142 /** 143 * Release write lock of this store. 144 */ 145 public void writeUnlock() { 146 storeLock.writeLock().unlock(); 147 } 148 149 /** Returns Compaction policy to use. */ 150 public CompactionPolicy getCompactionPolicy() { 151 return this.compactionPolicy; 152 } 153 154 /** Returns Compactor to use. */ 155 public Compactor<?> getCompactor() { 156 return this.compactor; 157 } 158 159 /** Returns Store file manager to use. */ 160 public StoreFileManager getStoreFileManager() { 161 return this.storeFileManager; 162 } 163 164 /** Returns Store flusher to use. */ 165 public StoreFlusher getStoreFlusher() { 166 return this.storeFlusher; 167 } 168 169 private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) { 170 return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(), 171 store.getStoreContext()); 172 } 173 174 /** 175 * @param filesCompacting Files currently compacting 176 * @return whether a compaction selection is possible 177 */ 178 public abstract boolean needsCompaction(List<HStoreFile> filesCompacting); 179 180 /** 181 * Creates an instance of a compaction context specific to this engine. Doesn't actually select or 182 * start a compaction. See CompactionContext class comment. 183 * @return New CompactionContext object. 184 */ 185 public abstract CompactionContext createCompaction() throws IOException; 186 187 /** 188 * Create the StoreEngine's components. 189 */ 190 protected abstract void createComponents(Configuration conf, HStore store, 191 CellComparator cellComparator) throws IOException; 192 193 protected final void createComponentsOnce(Configuration conf, HStore store, 194 CellComparator cellComparator) throws IOException { 195 assert compactor == null && compactionPolicy == null && storeFileManager == null 196 && storeFlusher == null && storeFileTracker == null; 197 createComponents(conf, store, cellComparator); 198 this.conf = conf; 199 this.ctx = store.getStoreContext(); 200 this.coprocessorHost = store.getHRegion().getCoprocessorHost(); 201 this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool; 202 this.storeFileTracker = createStoreFileTracker(conf, store); 203 assert compactor != null && compactionPolicy != null && storeFileManager != null 204 && storeFlusher != null && storeFileTracker != null; 205 } 206 207 /** 208 * Create a writer for writing new store files. 209 * @return Writer for a new StoreFile 210 */ 211 public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 212 return storeFileTracker.createWriter(params); 213 } 214 215 public HStoreFile createStoreFileAndReader(Path p) throws IOException { 216 StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p, 217 ctx.isPrimaryReplicaStore()); 218 return createStoreFileAndReader(info); 219 } 220 221 public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 222 info.setRegionCoprocessorHost(coprocessorHost); 223 HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(), 224 ctx.getCacheConf(), bloomFilterMetrics); 225 storeFile.initReader(); 226 return storeFile; 227 } 228 229 /** 230 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 231 * operation. 232 * @param path the path to the store file 233 */ 234 public void validateStoreFile(Path path) throws IOException { 235 HStoreFile storeFile = null; 236 try { 237 storeFile = createStoreFileAndReader(path); 238 } catch (IOException e) { 239 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 240 throw e; 241 } finally { 242 if (storeFile != null) { 243 storeFile.closeStoreFile(false); 244 } 245 } 246 } 247 248 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup) 249 throws IOException { 250 if (CollectionUtils.isEmpty(files)) { 251 return Collections.emptyList(); 252 } 253 // initialize the thread pool for opening store files in parallel.. 254 ExecutorService storeFileOpenerThreadPool = 255 openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName() 256 + "-" + ctx.getFamily().getNameAsString()); 257 CompletionService<HStoreFile> completionService = 258 new ExecutorCompletionService<>(storeFileOpenerThreadPool); 259 260 int totalValidStoreFile = 0; 261 for (StoreFileInfo storeFileInfo : files) { 262 // The StoreFileInfo will carry store configuration down to HFile, we need to set it to 263 // our store's CompoundConfiguration here. 264 storeFileInfo.setConf(conf); 265 // open each store file in parallel 266 completionService.submit(() -> createStoreFileAndReader(storeFileInfo)); 267 totalValidStoreFile++; 268 } 269 270 Set<String> compactedStoreFiles = new HashSet<>(); 271 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 272 IOException ioe = null; 273 try { 274 for (int i = 0; i < totalValidStoreFile; i++) { 275 try { 276 HStoreFile storeFile = completionService.take().get(); 277 if (storeFile != null) { 278 LOG.debug("loaded {}", storeFile); 279 results.add(storeFile); 280 compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); 281 } 282 } catch (InterruptedException e) { 283 if (ioe == null) { 284 ioe = new InterruptedIOException(e.getMessage()); 285 } 286 } catch (ExecutionException e) { 287 if (ioe == null) { 288 ioe = new IOException(e.getCause()); 289 } 290 } 291 } 292 } finally { 293 storeFileOpenerThreadPool.shutdownNow(); 294 } 295 if (ioe != null) { 296 // close StoreFile readers 297 boolean evictOnClose = 298 ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true; 299 for (HStoreFile file : results) { 300 try { 301 if (file != null) { 302 file.closeStoreFile(evictOnClose); 303 } 304 } catch (IOException e) { 305 LOG.warn("Could not close store file {}", file, e); 306 } 307 } 308 throw ioe; 309 } 310 311 // Should not archive the compacted store files when region warmup. See HBASE-22163. 312 if (!warmup) { 313 // Remove the compacted files from result 314 List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size()); 315 for (HStoreFile storeFile : results) { 316 if (compactedStoreFiles.contains(storeFile.getPath().getName())) { 317 LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); 318 storeFile.getReader() 319 .close(storeFile.getCacheConf() != null 320 ? storeFile.getCacheConf().shouldEvictOnClose() 321 : true); 322 filesToRemove.add(storeFile); 323 } 324 } 325 results.removeAll(filesToRemove); 326 if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) { 327 LOG.debug("Moving the files {} to archive", filesToRemove); 328 ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(), 329 filesToRemove); 330 } 331 } 332 333 return results; 334 } 335 336 public void initialize(boolean warmup) throws IOException { 337 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 338 List<HStoreFile> files = openStoreFiles(fileInfos, warmup); 339 storeFileManager.loadFiles(files); 340 } 341 342 public void refreshStoreFiles() throws IOException { 343 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 344 refreshStoreFilesInternal(fileInfos); 345 } 346 347 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 348 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 349 for (String file : newFiles) { 350 storeFiles 351 .add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file)); 352 } 353 refreshStoreFilesInternal(storeFiles); 354 } 355 356 /** 357 * Checks the underlying store files, and opens the files that have not been opened, and removes 358 * the store file readers for store files no longer available. Mainly used by secondary region 359 * replicas to keep up to date with the primary region files. 360 */ 361 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 362 Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles(); 363 Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles(); 364 if (currentFiles == null) { 365 currentFiles = Collections.emptySet(); 366 } 367 if (newFiles == null) { 368 newFiles = Collections.emptySet(); 369 } 370 if (compactedFiles == null) { 371 compactedFiles = Collections.emptySet(); 372 } 373 374 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 375 for (HStoreFile sf : currentFiles) { 376 currentFilesSet.put(sf.getFileInfo(), sf); 377 } 378 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 379 for (HStoreFile sf : compactedFiles) { 380 compactedFilesSet.put(sf.getFileInfo(), sf); 381 } 382 383 Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles); 384 // Exclude the files that have already been compacted 385 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 386 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 387 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 388 389 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 390 return; 391 } 392 393 LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles 394 + " files to remove: " + toBeRemovedFiles); 395 396 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 397 for (StoreFileInfo sfi : toBeRemovedFiles) { 398 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 399 } 400 401 // try to open the files 402 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false); 403 404 // propogate the file changes to the underlying store file manager 405 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> { 406 }, () -> { 407 }); // won't throw an exception 408 } 409 410 /** 411 * Commit the given {@code files}. 412 * <p/> 413 * We will move the file into data directory, and open it. 414 * @param files the files want to commit 415 * @param validate whether to validate the store files 416 * @return the committed store files 417 */ 418 public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException { 419 List<HStoreFile> committedFiles = new ArrayList<>(files.size()); 420 HRegionFileSystem hfs = ctx.getRegionFileSystem(); 421 String familyName = ctx.getFamily().getNameAsString(); 422 Path storeDir = hfs.getStoreDir(familyName); 423 for (Path file : files) { 424 try { 425 if (validate) { 426 validateStoreFile(file); 427 } 428 Path committedPath; 429 // As we want to support writing to data directory directly, here we need to check whether 430 // the store file is already in the right place 431 if (file.getParent() != null && file.getParent().equals(storeDir)) { 432 // already in the right place, skip renmaing 433 committedPath = file; 434 } else { 435 // Write-out finished successfully, move into the right spot 436 committedPath = hfs.commitStoreFile(familyName, file); 437 } 438 HStoreFile sf = createStoreFileAndReader(committedPath); 439 committedFiles.add(sf); 440 } catch (IOException e) { 441 LOG.error("Failed to commit store file {}", file, e); 442 // Try to delete the files we have committed before. 443 // It is OK to fail when deleting as leaving the file there does not cause any data 444 // corruption problem. It just introduces some duplicated data which may impact read 445 // performance a little when reading before compaction. 446 for (HStoreFile sf : committedFiles) { 447 Path pathToDelete = sf.getPath(); 448 try { 449 sf.deleteStoreFile(); 450 } catch (IOException deleteEx) { 451 LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete, 452 deleteEx); 453 } 454 } 455 throw new IOException("Failed to commit the flush", e); 456 } 457 } 458 return committedFiles; 459 } 460 461 @FunctionalInterface 462 public interface IOExceptionRunnable { 463 void run() throws IOException; 464 } 465 466 /** 467 * Add the store files to store file manager, and also record it in the store file tracker. 468 * <p/> 469 * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under 470 * the lock protection. Usually this is for clear the memstore snapshot. 471 */ 472 public void addStoreFiles(Collection<HStoreFile> storeFiles, 473 IOExceptionRunnable actionAfterAdding) throws IOException { 474 storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles)); 475 writeLock(); 476 try { 477 storeFileManager.insertNewFiles(storeFiles); 478 actionAfterAdding.run(); 479 } finally { 480 // We need the lock, as long as we are updating the storeFiles 481 // or changing the memstore. Let us release it before calling 482 // notifyChangeReadersObservers. See HBASE-4485 for a possible 483 // deadlock scenario that could have happened if continue to hold 484 // the lock. 485 writeUnlock(); 486 } 487 } 488 489 public void replaceStoreFiles(Collection<HStoreFile> compactedFiles, 490 Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock) 491 throws IOException { 492 storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles), 493 StoreUtils.toStoreFileInfo(newFiles)); 494 walMarkerWriter.run(); 495 writeLock(); 496 try { 497 storeFileManager.addCompactionResults(compactedFiles, newFiles); 498 actionUnderLock.run(); 499 } finally { 500 writeUnlock(); 501 } 502 } 503 504 public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) { 505 writeLock(); 506 try { 507 storeFileManager.removeCompactedFiles(compactedFiles); 508 } finally { 509 writeUnlock(); 510 } 511 } 512 513 /** 514 * Create the StoreEngine configured for the given Store. 515 * @param store The store. An unfortunate dependency needed due to it being passed to 516 * coprocessors via the compactor. 517 * @param conf Store configuration. 518 * @param cellComparator CellComparator for storeFileManager. 519 * @return StoreEngine to use. 520 */ 521 public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf, 522 CellComparator cellComparator) throws IOException { 523 String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); 524 try { 525 StoreEngine<?, ?, ?, ?> se = 526 ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {}); 527 se.createComponentsOnce(conf, store, cellComparator); 528 return se; 529 } catch (Exception e) { 530 throw new IOException("Unable to load configured store engine '" + className + "'", e); 531 } 532 } 533 534 /** 535 * Whether the implementation of the used storefile tracker requires you to write to temp 536 * directory first, i.e, does not allow broken store files under the actual data directory. 537 */ 538 public boolean requireWritingToTmpDirFirst() { 539 return storeFileTracker.requireWritingToTmpDirFirst(); 540 } 541 542 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 543 allowedOnPath = ".*/TestHStore.java") 544 ReadWriteLock getLock() { 545 return storeLock; 546 } 547 548 public BloomFilterMetrics getBloomFilterMetrics() { 549 return bloomFilterMetrics; 550 } 551}