001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletionService;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorCompletionService;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.locks.ReadWriteLock;
035import java.util.concurrent.locks.ReentrantReadWriteLock;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.ExtendedCell;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.conf.ConfigKey;
043import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
044import org.apache.hadoop.hbase.log.HBaseMarkers;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
047import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
049import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
050import org.apache.hadoop.hbase.util.IOExceptionRunnable;
051import org.apache.hadoop.hbase.util.ReflectionUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
057import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
058
059/**
060 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
061 * all compaction policies, compactors and store file managers are compatible, they are tied
062 * together and replaced together via StoreEngine-s.
063 * <p/>
064 * We expose read write lock methods to upper layer for store operations:<br/>
065 * <ul>
066 * <li>Locked in shared mode when the list of component stores is looked at:
067 * <ul>
068 * <li>all reads/writes to table data</li>
069 * <li>checking for split</li>
070 * </ul>
071 * </li>
072 * <li>Locked in exclusive mode when the list of component stores is modified:
073 * <ul>
074 * <li>closing</li>
075 * <li>completing a compaction</li>
076 * </ul>
077 * </li>
078 * </ul>
079 * <p/>
080 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
081 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
082 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
083 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
084 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
085 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
086 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
087 * we will hold write lock when updating it, the lock is also used to protect the normal read/write
088 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
089 * memory state is, stripe or not, it does not effect how we track the store files. So consider all
090 * these facts, here we introduce a separated SFT to track the store files.
091 * <p/>
092 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
093 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
094 * reduce the possible misuse.
095 */
096@InterfaceAudience.Private
097public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
098  C extends Compactor<?>, SFM extends StoreFileManager> {
099
100  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
101
102  private static final String READ_FULLY_ON_VALIDATE_KEY = "hbase.hstore.validate.read_fully";
103  private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false;
104
105  protected SF storeFlusher;
106  protected CP compactionPolicy;
107  protected C compactor;
108  protected SFM storeFileManager;
109
110  private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics();
111  private Configuration conf;
112  private StoreContext ctx;
113  private RegionCoprocessorHost coprocessorHost;
114  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
115  private StoreFileTracker storeFileTracker;
116
117  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
118
119  /**
120   * The name of the configuration parameter that specifies the class of a store engine that is used
121   * to manage and compact HBase store files.
122   */
123  public static final String STORE_ENGINE_CLASS_KEY =
124    ConfigKey.CLASS("hbase.hstore.engine.class", StoreEngine.class);
125
126  private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
127    DefaultStoreEngine.class;
128
129  /**
130   * Acquire read lock of this store.
131   */
132  public void readLock() {
133    storeLock.readLock().lock();
134  }
135
136  /**
137   * Release read lock of this store.
138   */
139  public void readUnlock() {
140    storeLock.readLock().unlock();
141  }
142
143  /**
144   * Acquire write lock of this store.
145   */
146  public void writeLock() {
147    storeLock.writeLock().lock();
148  }
149
150  /**
151   * Release write lock of this store.
152   */
153  public void writeUnlock() {
154    storeLock.writeLock().unlock();
155  }
156
157  /** Returns Compaction policy to use. */
158  public CompactionPolicy getCompactionPolicy() {
159    return this.compactionPolicy;
160  }
161
162  /** Returns Compactor to use. */
163  public Compactor<?> getCompactor() {
164    return this.compactor;
165  }
166
167  /** Returns Store file manager to use. */
168  public StoreFileManager getStoreFileManager() {
169    return this.storeFileManager;
170  }
171
172  /** Returns Store flusher to use. */
173  StoreFlusher getStoreFlusher() {
174    return this.storeFlusher;
175  }
176
177  private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
178    return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
179      store.getStoreContext());
180  }
181
182  /**
183   * @param filesCompacting Files currently compacting
184   * @return whether a compaction selection is possible
185   */
186  public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
187
188  /**
189   * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
190   * start a compaction. See CompactionContext class comment.
191   * @return New CompactionContext object.
192   */
193  public abstract CompactionContext createCompaction() throws IOException;
194
195  /**
196   * Create the StoreEngine's components.
197   */
198  protected abstract void createComponents(Configuration conf, HStore store,
199    CellComparator cellComparator) throws IOException;
200
201  protected final void createComponentsOnce(Configuration conf, HStore store,
202    CellComparator cellComparator) throws IOException {
203    assert compactor == null && compactionPolicy == null && storeFileManager == null
204      && storeFlusher == null && storeFileTracker == null;
205    createComponents(conf, store, cellComparator);
206    this.conf = conf;
207    this.ctx = store.getStoreContext();
208    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
209    this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
210    this.storeFileTracker = createStoreFileTracker(conf, store);
211    assert compactor != null && compactionPolicy != null && storeFileManager != null
212      && storeFlusher != null;
213  }
214
215  /**
216   * Create a writer for writing new store files.
217   * @return Writer for a new StoreFile
218   */
219  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
220    return storeFileTracker.createWriter(params);
221  }
222
223  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
224    StoreFileInfo info = storeFileTracker.getStoreFileInfo(p, ctx.isPrimaryReplicaStore());
225    return createStoreFileAndReader(info);
226  }
227
228  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
229    info.setRegionCoprocessorHost(coprocessorHost);
230    HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(),
231      ctx.getCacheConf(), bloomFilterMetrics);
232    storeFile.initReader();
233    return storeFile;
234  }
235
236  /**
237   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
238   * operation.
239   * @param path         the path to the store file
240   * @param isCompaction whether this is called from the context of a compaction
241   */
242  public void validateStoreFile(Path path, boolean isCompaction) throws IOException {
243    HStoreFile storeFile = null;
244    try {
245      storeFile = createStoreFileAndReader(path);
246      if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, DEFAULT_READ_FULLY_ON_VALIDATE)) {
247        if (storeFile.getFirstKey().isEmpty()) {
248          LOG.debug("'{}=true' but storefile does not contain any data. skipping validation.",
249            READ_FULLY_ON_VALIDATE_KEY);
250          return;
251        }
252        LOG.debug("Validating the store file by reading the first cell from each block : {}", path);
253        StoreFileReader reader = storeFile.getReader();
254        try (StoreFileScanner scanner =
255          reader.getStoreFileScanner(false, false, isCompaction, Long.MAX_VALUE, 0, false)) {
256          boolean hasNext = scanner.seek(KeyValue.LOWESTKEY);
257          assert hasNext : "StoreFile contains no data";
258          for (ExtendedCell cell = scanner.next(); cell != null; cell = scanner.next()) {
259            ExtendedCell nextIndexedKey = scanner.getNextIndexedKey();
260            if (nextIndexedKey == null) {
261              break;
262            }
263            scanner.seek(nextIndexedKey);
264          }
265        }
266      }
267    } catch (IOException e) {
268      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
269      throw e;
270    } finally {
271      if (storeFile != null) {
272        storeFile.closeStoreFile(false);
273      }
274    }
275  }
276
277  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
278    throws IOException {
279    if (CollectionUtils.isEmpty(files)) {
280      return Collections.emptyList();
281    }
282    // initialize the thread pool for opening store files in parallel..
283    ExecutorService storeFileOpenerThreadPool =
284      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName()
285        + "-" + ctx.getFamily().getNameAsString());
286    CompletionService<HStoreFile> completionService =
287      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
288
289    int totalValidStoreFile = 0;
290    for (StoreFileInfo storeFileInfo : files) {
291      // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
292      // our store's CompoundConfiguration here.
293      storeFileInfo.setConf(conf);
294      // open each store file in parallel
295      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
296      totalValidStoreFile++;
297    }
298
299    Set<String> compactedStoreFiles = new HashSet<>();
300    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
301    IOException ioe = null;
302    try {
303      for (int i = 0; i < totalValidStoreFile; i++) {
304        try {
305          HStoreFile storeFile = completionService.take().get();
306          if (storeFile != null) {
307            LOG.debug("loaded {}", storeFile);
308            results.add(storeFile);
309            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
310          }
311        } catch (InterruptedException e) {
312          if (ioe == null) {
313            ioe = new InterruptedIOException(e.getMessage());
314          }
315        } catch (ExecutionException e) {
316          if (ioe == null) {
317            ioe = new IOException(e.getCause());
318          }
319        }
320      }
321    } finally {
322      storeFileOpenerThreadPool.shutdownNow();
323    }
324    if (ioe != null) {
325      // close StoreFile readers
326      boolean evictOnClose = ctx.getCacheConf() == null || ctx.getCacheConf().shouldEvictOnClose();
327      for (HStoreFile file : results) {
328        try {
329          if (file != null) {
330            file.closeStoreFile(evictOnClose);
331          }
332        } catch (IOException e) {
333          LOG.warn("Could not close store file {}", file, e);
334        }
335      }
336      throw ioe;
337    }
338
339    // Should not archive the compacted store files when region warmup. See HBASE-22163.
340    if (!warmup) {
341      // Remove the compacted files from result
342      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
343      for (HStoreFile storeFile : results) {
344        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
345          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
346          storeFile.getReader().close(
347            storeFile.getCacheConf() == null || storeFile.getCacheConf().shouldEvictOnClose());
348          filesToRemove.add(storeFile);
349        }
350      }
351      results.removeAll(filesToRemove);
352      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
353        LOG.debug("Moving the files {} to archive", filesToRemove);
354        ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
355          filesToRemove);
356      }
357    }
358
359    return results;
360  }
361
362  public void initialize(boolean warmup) throws IOException {
363    List<StoreFileInfo> fileInfos = storeFileTracker.load();
364    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
365    storeFileManager.loadFiles(files);
366  }
367
368  public void refreshStoreFiles() throws IOException {
369    List<StoreFileInfo> fileInfos = storeFileTracker.load();
370    refreshStoreFilesInternal(fileInfos);
371  }
372
373  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
374    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
375    for (String file : newFiles) {
376      storeFiles.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(),
377        file, storeFileTracker));
378    }
379    refreshStoreFilesInternal(storeFiles);
380  }
381
382  /**
383   * Checks the underlying store files, and opens the files that have not been opened, and removes
384   * the store file readers for store files no longer available. Mainly used by secondary region
385   * replicas to keep up to date with the primary region files.
386   */
387  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
388    Collection<HStoreFile> currentFiles = storeFileManager.getStoreFiles();
389    Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
390    if (currentFiles == null) {
391      currentFiles = Collections.emptySet();
392    }
393    if (newFiles == null) {
394      newFiles = Collections.emptySet();
395    }
396    if (compactedFiles == null) {
397      compactedFiles = Collections.emptySet();
398    }
399
400    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
401    for (HStoreFile sf : currentFiles) {
402      currentFilesSet.put(sf.getFileInfo(), sf);
403    }
404    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
405    for (HStoreFile sf : compactedFiles) {
406      compactedFilesSet.put(sf.getFileInfo(), sf);
407    }
408
409    Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles);
410    // Exclude the files that have already been compacted
411    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
412    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
413    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
414
415    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
416      return;
417    }
418
419    LOG.info("Refreshing store files for {} files to add: {} files to remove: {}", this,
420      toBeAddedFiles, toBeRemovedFiles);
421
422    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
423    for (StoreFileInfo sfi : toBeRemovedFiles) {
424      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
425    }
426
427    // try to open the files
428    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
429
430    // propagate the file changes to the underlying store file manager
431    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
432    }, () -> {
433    }); // won't throw an exception
434  }
435
436  /**
437   * Commit the given {@code files}.
438   * <p/>
439   * We will move the file into data directory, and open it.
440   * @param files        the files want to commit
441   * @param isCompaction whether this is called from the context of a compaction
442   * @param validate     whether to validate the store files
443   * @return the committed store files
444   */
445  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean isCompaction, boolean validate)
446    throws IOException {
447    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
448    HRegionFileSystem hfs = ctx.getRegionFileSystem();
449    String familyName = ctx.getFamily().getNameAsString();
450    Path storeDir = hfs.getStoreDir(familyName);
451    for (Path file : files) {
452      try {
453        if (validate) {
454          validateStoreFile(file, isCompaction);
455        }
456        Path committedPath;
457        // As we want to support writing to data directory directly, here we need to check whether
458        // the store file is already in the right place
459        if (file.getParent() != null && file.getParent().equals(storeDir)) {
460          // already in the right place, skip renaming
461          committedPath = file;
462        } else {
463          // Write-out finished successfully, move into the right spot
464          committedPath = hfs.commitStoreFile(familyName, file);
465        }
466        HStoreFile sf = createStoreFileAndReader(committedPath);
467        committedFiles.add(sf);
468      } catch (IOException e) {
469        LOG.error("Failed to commit store file {}", file, e);
470        // Try to delete the files we have committed before.
471        // It is OK to fail when deleting as leaving the file there does not cause any data
472        // corruption problem. It just introduces some duplicated data which may impact read
473        // performance a little when reading before compaction.
474        for (HStoreFile sf : committedFiles) {
475          Path pathToDelete = sf.getPath();
476          try {
477            sf.deleteStoreFile();
478          } catch (IOException deleteEx) {
479            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
480              deleteEx);
481          }
482        }
483        throw new IOException("Failed to commit the flush", e);
484      }
485    }
486    return committedFiles;
487  }
488
489  /**
490   * Add the store files to store file manager, and also record it in the store file tracker.
491   * <p/>
492   * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
493   * the lock protection. Usually this is for clear the memstore snapshot.
494   */
495  public void addStoreFiles(Collection<HStoreFile> storeFiles,
496    IOExceptionRunnable actionAfterAdding) throws IOException {
497    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
498    writeLock();
499    try {
500      storeFileManager.insertNewFiles(storeFiles);
501      actionAfterAdding.run();
502    } finally {
503      // We need the lock, as long as we are updating the storeFiles
504      // or changing the memstore. Let us release it before calling
505      // notifyChangeReadersObservers. See HBASE-4485 for a possible
506      // deadlock scenario that could have happened if continue to hold
507      // the lock.
508      writeUnlock();
509    }
510  }
511
512  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
513    Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
514    throws IOException {
515    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
516      StoreUtils.toStoreFileInfo(newFiles));
517    walMarkerWriter.run();
518    writeLock();
519    try {
520      storeFileManager.addCompactionResults(compactedFiles, newFiles);
521      actionUnderLock.run();
522    } finally {
523      writeUnlock();
524    }
525  }
526
527  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
528    writeLock();
529    try {
530      storeFileManager.removeCompactedFiles(compactedFiles);
531    } finally {
532      writeUnlock();
533    }
534  }
535
536  /**
537   * Create the StoreEngine configured for the given Store.
538   * @param store          The store. An unfortunate dependency needed due to it being passed to
539   *                       coprocessors via the compactor.
540   * @param conf           Store configuration.
541   * @param cellComparator CellComparator for storeFileManager.
542   * @return StoreEngine to use.
543   */
544  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
545    CellComparator cellComparator) throws IOException {
546    String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
547    try {
548      StoreEngine<?, ?, ?, ?> se =
549        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
550      se.createComponentsOnce(conf, store, cellComparator);
551      return se;
552    } catch (Exception e) {
553      throw new IOException("Unable to load configured store engine '" + className + "'", e);
554    }
555  }
556
557  /**
558   * Whether the implementation of the used storefile tracker requires you to write to temp
559   * directory first, i.e, does not allow broken store files under the actual data directory.
560   */
561  public boolean requireWritingToTmpDirFirst() {
562    return storeFileTracker.requireWritingToTmpDirFirst();
563  }
564
565  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
566      allowedOnPath = ".*/TestHStore.java")
567  ReadWriteLock getLock() {
568    return storeLock;
569  }
570
571  public BloomFilterMetrics getBloomFilterMetrics() {
572    return bloomFilterMetrics;
573  }
574}