001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletionService;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorCompletionService;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.locks.ReadWriteLock;
035import java.util.concurrent.locks.ReentrantReadWriteLock;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.ExtendedCell;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.conf.ConfigKey;
043import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
044import org.apache.hadoop.hbase.keymeta.ManagedKeyDataCache;
045import org.apache.hadoop.hbase.keymeta.SystemKeyCache;
046import org.apache.hadoop.hbase.log.HBaseMarkers;
047import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
048import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
049import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
050import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
051import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
052import org.apache.hadoop.hbase.util.IOExceptionRunnable;
053import org.apache.hadoop.hbase.util.ReflectionUtils;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
059import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
060
061/**
062 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
063 * all compaction policies, compactors and store file managers are compatible, they are tied
064 * together and replaced together via StoreEngine-s.
065 * <p/>
066 * We expose read write lock methods to upper layer for store operations:<br/>
067 * <ul>
068 * <li>Locked in shared mode when the list of component stores is looked at:
069 * <ul>
070 * <li>all reads/writes to table data</li>
071 * <li>checking for split</li>
072 * </ul>
073 * </li>
074 * <li>Locked in exclusive mode when the list of component stores is modified:
075 * <ul>
076 * <li>closing</li>
077 * <li>completing a compaction</li>
078 * </ul>
079 * </li>
080 * </ul>
081 * <p/>
082 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
083 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
084 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
085 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
086 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
087 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
088 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
089 * we will hold write lock when updating it, the lock is also used to protect the normal read/write
090 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
091 * memory state is, stripe or not, it does not effect how we track the store files. So consider all
092 * these facts, here we introduce a separated SFT to track the store files.
093 * <p/>
094 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
095 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
096 * reduce the possible misuse.
097 */
098@InterfaceAudience.Private
099public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
100  C extends Compactor<?>, SFM extends StoreFileManager> {
101
102  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
103
104  private static final String READ_FULLY_ON_VALIDATE_KEY = "hbase.hstore.validate.read_fully";
105  private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false;
106
107  protected SF storeFlusher;
108  protected CP compactionPolicy;
109  protected C compactor;
110  protected SFM storeFileManager;
111
112  private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics();
113  private Configuration conf;
114  private StoreContext ctx;
115  private RegionCoprocessorHost coprocessorHost;
116  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
117  private StoreFileTracker storeFileTracker;
118
119  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
120
121  private ManagedKeyDataCache managedKeyDataCache;
122
123  private SystemKeyCache systemKeyCache;
124
125  /**
126   * The name of the configuration parameter that specifies the class of a store engine that is used
127   * to manage and compact HBase store files.
128   */
129  public static final String STORE_ENGINE_CLASS_KEY =
130    ConfigKey.CLASS("hbase.hstore.engine.class", StoreEngine.class);
131
132  private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
133    DefaultStoreEngine.class;
134
135  /**
136   * Acquire read lock of this store.
137   */
138  public void readLock() {
139    storeLock.readLock().lock();
140  }
141
142  /**
143   * Release read lock of this store.
144   */
145  public void readUnlock() {
146    storeLock.readLock().unlock();
147  }
148
149  /**
150   * Acquire write lock of this store.
151   */
152  public void writeLock() {
153    storeLock.writeLock().lock();
154  }
155
156  /**
157   * Release write lock of this store.
158   */
159  public void writeUnlock() {
160    storeLock.writeLock().unlock();
161  }
162
163  /** Returns Compaction policy to use. */
164  public CompactionPolicy getCompactionPolicy() {
165    return this.compactionPolicy;
166  }
167
168  /** Returns Compactor to use. */
169  public Compactor<?> getCompactor() {
170    return this.compactor;
171  }
172
173  /** Returns Store file manager to use. */
174  public StoreFileManager getStoreFileManager() {
175    return this.storeFileManager;
176  }
177
178  /** Returns Store flusher to use. */
179  StoreFlusher getStoreFlusher() {
180    return this.storeFlusher;
181  }
182
183  private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
184    return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
185      store.getStoreContext());
186  }
187
188  /**
189   * @param filesCompacting Files currently compacting
190   * @return whether a compaction selection is possible
191   */
192  public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
193
194  /**
195   * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
196   * start a compaction. See CompactionContext class comment.
197   * @return New CompactionContext object.
198   */
199  public abstract CompactionContext createCompaction() throws IOException;
200
201  /**
202   * Create the StoreEngine's components.
203   */
204  protected abstract void createComponents(Configuration conf, HStore store,
205    CellComparator cellComparator) throws IOException;
206
207  protected final void createComponentsOnce(Configuration conf, HStore store,
208    CellComparator cellComparator) throws IOException {
209    assert compactor == null && compactionPolicy == null && storeFileManager == null
210      && storeFlusher == null && storeFileTracker == null;
211    createComponents(conf, store, cellComparator);
212    this.conf = conf;
213    this.ctx = store.getStoreContext();
214    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
215    this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
216    this.storeFileTracker = createStoreFileTracker(conf, store);
217    this.managedKeyDataCache = store.getHRegion().getManagedKeyDataCache();
218    this.systemKeyCache = store.getHRegion().getSystemKeyCache();
219    assert compactor != null && compactionPolicy != null && storeFileManager != null
220      && storeFlusher != null;
221  }
222
223  /**
224   * Create a writer for writing new store files.
225   * @return Writer for a new StoreFile
226   */
227  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
228    return storeFileTracker.createWriter(params);
229  }
230
231  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
232    StoreFileInfo info = storeFileTracker.getStoreFileInfo(p, ctx.isPrimaryReplicaStore());
233    return createStoreFileAndReader(info);
234  }
235
236  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
237    info.setRegionCoprocessorHost(coprocessorHost);
238    HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(),
239      ctx.getCacheConf(), bloomFilterMetrics, null, // keyNamespace - not yet implemented
240      systemKeyCache, managedKeyDataCache);
241    storeFile.initReader();
242    return storeFile;
243  }
244
245  /**
246   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
247   * operation.
248   * @param path         the path to the store file
249   * @param isCompaction whether this is called from the context of a compaction
250   */
251  public void validateStoreFile(Path path, boolean isCompaction) throws IOException {
252    HStoreFile storeFile = null;
253    try {
254      storeFile = createStoreFileAndReader(path);
255      if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, DEFAULT_READ_FULLY_ON_VALIDATE)) {
256        if (storeFile.getFirstKey().isEmpty()) {
257          LOG.debug("'{}=true' but storefile does not contain any data. skipping validation.",
258            READ_FULLY_ON_VALIDATE_KEY);
259          return;
260        }
261        LOG.debug("Validating the store file by reading the first cell from each block : {}", path);
262        StoreFileReader reader = storeFile.getReader();
263        try (StoreFileScanner scanner =
264          reader.getStoreFileScanner(false, false, isCompaction, Long.MAX_VALUE, 0, false)) {
265          boolean hasNext = scanner.seek(KeyValue.LOWESTKEY);
266          assert hasNext : "StoreFile contains no data";
267          for (ExtendedCell cell = scanner.next(); cell != null; cell = scanner.next()) {
268            ExtendedCell nextIndexedKey = scanner.getNextIndexedKey();
269            if (nextIndexedKey == null) {
270              break;
271            }
272            scanner.seek(nextIndexedKey);
273          }
274        }
275      }
276    } catch (IOException e) {
277      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
278      throw e;
279    } finally {
280      if (storeFile != null) {
281        storeFile.closeStoreFile(false);
282      }
283    }
284  }
285
286  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
287    throws IOException {
288    if (CollectionUtils.isEmpty(files)) {
289      return Collections.emptyList();
290    }
291    // initialize the thread pool for opening store files in parallel..
292    ExecutorService storeFileOpenerThreadPool =
293      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName()
294        + "-" + ctx.getFamily().getNameAsString());
295    CompletionService<HStoreFile> completionService =
296      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
297
298    int totalValidStoreFile = 0;
299    for (StoreFileInfo storeFileInfo : files) {
300      // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
301      // our store's CompoundConfiguration here.
302      storeFileInfo.setConf(conf);
303      // open each store file in parallel
304      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
305      totalValidStoreFile++;
306    }
307
308    Set<String> compactedStoreFiles = new HashSet<>();
309    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
310    IOException ioe = null;
311    try {
312      for (int i = 0; i < totalValidStoreFile; i++) {
313        try {
314          HStoreFile storeFile = completionService.take().get();
315          if (storeFile != null) {
316            LOG.debug("loaded {}", storeFile);
317            results.add(storeFile);
318            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
319          }
320        } catch (InterruptedException e) {
321          if (ioe == null) {
322            ioe = new InterruptedIOException(e.getMessage());
323          }
324        } catch (ExecutionException e) {
325          if (ioe == null) {
326            ioe = new IOException(e.getCause());
327          }
328        }
329      }
330    } finally {
331      storeFileOpenerThreadPool.shutdownNow();
332    }
333    if (ioe != null) {
334      // close StoreFile readers
335      boolean evictOnClose = ctx.getCacheConf() == null || ctx.getCacheConf().shouldEvictOnClose();
336      for (HStoreFile file : results) {
337        try {
338          if (file != null) {
339            file.closeStoreFile(evictOnClose);
340          }
341        } catch (IOException e) {
342          LOG.warn("Could not close store file {}", file, e);
343        }
344      }
345      throw ioe;
346    }
347
348    // Should not archive the compacted store files when region warmup. See HBASE-22163.
349    if (!warmup) {
350      // Remove the compacted files from result
351      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
352      for (HStoreFile storeFile : results) {
353        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
354          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
355          storeFile.getReader().close(
356            storeFile.getCacheConf() == null || storeFile.getCacheConf().shouldEvictOnClose());
357          filesToRemove.add(storeFile);
358        }
359      }
360      results.removeAll(filesToRemove);
361      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
362        LOG.debug("Moving the files {} to archive", filesToRemove);
363        storeFileTracker.removeStoreFiles(filesToRemove);
364      }
365    }
366
367    return results;
368  }
369
370  public void initialize(boolean warmup) throws IOException {
371    List<StoreFileInfo> fileInfos = storeFileTracker.load();
372    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
373    storeFileManager.loadFiles(files);
374  }
375
376  public void refreshStoreFiles() throws IOException {
377    List<StoreFileInfo> fileInfos = storeFileTracker.load();
378    refreshStoreFilesInternal(fileInfos);
379  }
380
381  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
382    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
383    for (String file : newFiles) {
384      storeFiles.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(),
385        file, storeFileTracker));
386    }
387    refreshStoreFilesInternal(storeFiles);
388  }
389
390  /**
391   * Checks the underlying store files, and opens the files that have not been opened, and removes
392   * the store file readers for store files no longer available. Mainly used by secondary region
393   * replicas to keep up to date with the primary region files.
394   */
395  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
396    Collection<HStoreFile> currentFiles = storeFileManager.getStoreFiles();
397    Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
398    if (currentFiles == null) {
399      currentFiles = Collections.emptySet();
400    }
401    if (newFiles == null) {
402      newFiles = Collections.emptySet();
403    }
404    if (compactedFiles == null) {
405      compactedFiles = Collections.emptySet();
406    }
407
408    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
409    for (HStoreFile sf : currentFiles) {
410      currentFilesSet.put(sf.getFileInfo(), sf);
411    }
412    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
413    for (HStoreFile sf : compactedFiles) {
414      compactedFilesSet.put(sf.getFileInfo(), sf);
415    }
416
417    Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles);
418    // Exclude the files that have already been compacted
419    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
420    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
421    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
422
423    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
424      return;
425    }
426
427    LOG.info("Refreshing store files for {} files to add: {} files to remove: {}", this,
428      toBeAddedFiles, toBeRemovedFiles);
429
430    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
431    for (StoreFileInfo sfi : toBeRemovedFiles) {
432      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
433    }
434
435    // try to open the files
436    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
437
438    // propagate the file changes to the underlying store file manager
439    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
440    }, () -> {
441    }); // won't throw an exception
442  }
443
444  /**
445   * Commit the given {@code files}.
446   * <p/>
447   * We will move the file into data directory, and open it.
448   * @param files        the files want to commit
449   * @param isCompaction whether this is called from the context of a compaction
450   * @param validate     whether to validate the store files
451   * @return the committed store files
452   */
453  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean isCompaction, boolean validate)
454    throws IOException {
455    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
456    HRegionFileSystem hfs = ctx.getRegionFileSystem();
457    String familyName = ctx.getFamily().getNameAsString();
458    Path storeDir = hfs.getStoreDir(familyName);
459    for (Path file : files) {
460      try {
461        if (validate) {
462          validateStoreFile(file, isCompaction);
463        }
464        Path committedPath;
465        // As we want to support writing to data directory directly, here we need to check whether
466        // the store file is already in the right place
467        if (file.getParent() != null && file.getParent().equals(storeDir)) {
468          // already in the right place, skip renaming
469          committedPath = file;
470        } else {
471          // Write-out finished successfully, move into the right spot
472          committedPath = hfs.commitStoreFile(familyName, file);
473        }
474        HStoreFile sf = createStoreFileAndReader(committedPath);
475        committedFiles.add(sf);
476      } catch (IOException e) {
477        LOG.error("Failed to commit store file {}", file, e);
478        // Try to delete the files we have committed before.
479        // It is OK to fail when deleting as leaving the file there does not cause any data
480        // corruption problem. It just introduces some duplicated data which may impact read
481        // performance a little when reading before compaction.
482        for (HStoreFile sf : committedFiles) {
483          Path pathToDelete = sf.getPath();
484          try {
485            sf.deleteStoreFile();
486          } catch (IOException deleteEx) {
487            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
488              deleteEx);
489          }
490        }
491        throw new IOException("Failed to commit the flush", e);
492      }
493    }
494    return committedFiles;
495  }
496
497  /**
498   * Add the store files to store file manager, and also record it in the store file tracker.
499   * <p/>
500   * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
501   * the lock protection. Usually this is for clear the memstore snapshot.
502   */
503  public void addStoreFiles(Collection<HStoreFile> storeFiles,
504    IOExceptionRunnable actionAfterAdding) throws IOException {
505    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
506    writeLock();
507    try {
508      storeFileManager.insertNewFiles(storeFiles);
509      actionAfterAdding.run();
510    } finally {
511      // We need the lock, as long as we are updating the storeFiles
512      // or changing the memstore. Let us release it before calling
513      // notifyChangeReadersObservers. See HBASE-4485 for a possible
514      // deadlock scenario that could have happened if continue to hold
515      // the lock.
516      writeUnlock();
517    }
518  }
519
520  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
521    Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
522    throws IOException {
523    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
524      StoreUtils.toStoreFileInfo(newFiles));
525    walMarkerWriter.run();
526    writeLock();
527    try {
528      storeFileManager.addCompactionResults(compactedFiles, newFiles);
529      actionUnderLock.run();
530    } finally {
531      writeUnlock();
532    }
533  }
534
535  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
536    writeLock();
537    try {
538      storeFileManager.removeCompactedFiles(compactedFiles);
539    } finally {
540      writeUnlock();
541    }
542  }
543
544  /**
545   * Create the StoreEngine configured for the given Store.
546   * @param store          The store. An unfortunate dependency needed due to it being passed to
547   *                       coprocessors via the compactor.
548   * @param conf           Store configuration.
549   * @param cellComparator CellComparator for storeFileManager.
550   * @return StoreEngine to use.
551   */
552  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
553    CellComparator cellComparator) throws IOException {
554    String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
555    try {
556      StoreEngine<?, ?, ?, ?> se =
557        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
558      se.createComponentsOnce(conf, store, cellComparator);
559      return se;
560    } catch (Exception e) {
561      throw new IOException("Unable to load configured store engine '" + className + "'", e);
562    }
563  }
564
565  /**
566   * Whether the implementation of the used storefile tracker requires you to write to temp
567   * directory first, i.e, does not allow broken store files under the actual data directory.
568   */
569  public boolean requireWritingToTmpDirFirst() {
570    return storeFileTracker.requireWritingToTmpDirFirst();
571  }
572
573  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
574      allowedOnPath = ".*/TestHStore.java")
575  ReadWriteLock getLock() {
576    return storeLock;
577  }
578
579  public BloomFilterMetrics getBloomFilterMetrics() {
580    return bloomFilterMetrics;
581  }
582}