001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletionService;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorCompletionService;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.locks.ReadWriteLock;
035import java.util.concurrent.locks.ReentrantReadWriteLock;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.ExtendedCell;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.conf.ConfigKey;
043import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
044import org.apache.hadoop.hbase.log.HBaseMarkers;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
047import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
049import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
050import org.apache.hadoop.hbase.util.IOExceptionRunnable;
051import org.apache.hadoop.hbase.util.ReflectionUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
057import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
058
059/**
060 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
061 * all compaction policies, compactors and store file managers are compatible, they are tied
062 * together and replaced together via StoreEngine-s.
063 * <p/>
064 * We expose read write lock methods to upper layer for store operations:<br/>
065 * <ul>
066 * <li>Locked in shared mode when the list of component stores is looked at:
067 * <ul>
068 * <li>all reads/writes to table data</li>
069 * <li>checking for split</li>
070 * </ul>
071 * </li>
072 * <li>Locked in exclusive mode when the list of component stores is modified:
073 * <ul>
074 * <li>closing</li>
075 * <li>completing a compaction</li>
076 * </ul>
077 * </li>
078 * </ul>
079 * <p/>
080 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
081 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
082 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
083 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
084 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
085 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
086 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
087 * we will hold write lock when updating it, the lock is also used to protect the normal read/write
088 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
089 * memory state is, stripe or not, it does not effect how we track the store files. So consider all
090 * these facts, here we introduce a separated SFT to track the store files.
091 * <p/>
092 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
093 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
094 * reduce the possible misuse.
095 */
096@InterfaceAudience.Private
097public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
098  C extends Compactor<?>, SFM extends StoreFileManager> {
099
100  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
101
102  private static final String READ_FULLY_ON_VALIDATE_KEY = "hbase.hstore.validate.read_fully";
103  private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false;
104
105  protected SF storeFlusher;
106  protected CP compactionPolicy;
107  protected C compactor;
108  protected SFM storeFileManager;
109
110  private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics();
111  private Configuration conf;
112  private StoreContext ctx;
113  private RegionCoprocessorHost coprocessorHost;
114  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
115  private StoreFileTracker storeFileTracker;
116
117  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
118
119  /**
120   * The name of the configuration parameter that specifies the class of a store engine that is used
121   * to manage and compact HBase store files.
122   */
123  public static final String STORE_ENGINE_CLASS_KEY =
124    ConfigKey.CLASS("hbase.hstore.engine.class", StoreEngine.class);
125
126  private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
127    DefaultStoreEngine.class;
128
129  /**
130   * Acquire read lock of this store.
131   */
132  public void readLock() {
133    storeLock.readLock().lock();
134  }
135
136  /**
137   * Release read lock of this store.
138   */
139  public void readUnlock() {
140    storeLock.readLock().unlock();
141  }
142
143  /**
144   * Acquire write lock of this store.
145   */
146  public void writeLock() {
147    storeLock.writeLock().lock();
148  }
149
150  /**
151   * Release write lock of this store.
152   */
153  public void writeUnlock() {
154    storeLock.writeLock().unlock();
155  }
156
157  /** Returns Compaction policy to use. */
158  public CompactionPolicy getCompactionPolicy() {
159    return this.compactionPolicy;
160  }
161
162  /** Returns Compactor to use. */
163  public Compactor<?> getCompactor() {
164    return this.compactor;
165  }
166
167  /** Returns Store file manager to use. */
168  public StoreFileManager getStoreFileManager() {
169    return this.storeFileManager;
170  }
171
172  /** Returns Store flusher to use. */
173  StoreFlusher getStoreFlusher() {
174    return this.storeFlusher;
175  }
176
177  private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
178    return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
179      store.getStoreContext());
180  }
181
182  /**
183   * @param filesCompacting Files currently compacting
184   * @return whether a compaction selection is possible
185   */
186  public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
187
188  /**
189   * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
190   * start a compaction. See CompactionContext class comment.
191   * @return New CompactionContext object.
192   */
193  public abstract CompactionContext createCompaction() throws IOException;
194
195  /**
196   * Create the StoreEngine's components.
197   */
198  protected abstract void createComponents(Configuration conf, HStore store,
199    CellComparator cellComparator) throws IOException;
200
201  protected final void createComponentsOnce(Configuration conf, HStore store,
202    CellComparator cellComparator) throws IOException {
203    assert compactor == null && compactionPolicy == null && storeFileManager == null
204      && storeFlusher == null && storeFileTracker == null;
205    createComponents(conf, store, cellComparator);
206    this.conf = conf;
207    this.ctx = store.getStoreContext();
208    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
209    this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
210    this.storeFileTracker = createStoreFileTracker(conf, store);
211    assert compactor != null && compactionPolicy != null && storeFileManager != null
212      && storeFlusher != null;
213  }
214
215  /**
216   * Create a writer for writing new store files.
217   * @return Writer for a new StoreFile
218   */
219  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
220    return storeFileTracker.createWriter(params);
221  }
222
223  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
224    StoreFileInfo info = storeFileTracker.getStoreFileInfo(p, ctx.isPrimaryReplicaStore());
225    return createStoreFileAndReader(info);
226  }
227
228  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
229    info.setRegionCoprocessorHost(coprocessorHost);
230    HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(),
231      ctx.getCacheConf(), bloomFilterMetrics);
232    storeFile.initReader();
233    return storeFile;
234  }
235
236  /**
237   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
238   * operation.
239   * @param path         the path to the store file
240   * @param isCompaction whether this is called from the context of a compaction
241   */
242  public void validateStoreFile(Path path, boolean isCompaction) throws IOException {
243    HStoreFile storeFile = null;
244    try {
245      storeFile = createStoreFileAndReader(path);
246      if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, DEFAULT_READ_FULLY_ON_VALIDATE)) {
247        if (storeFile.getFirstKey().isEmpty()) {
248          LOG.debug("'{}=true' but storefile does not contain any data. skipping validation.",
249            READ_FULLY_ON_VALIDATE_KEY);
250          return;
251        }
252        LOG.debug("Validating the store file by reading the first cell from each block : {}", path);
253        StoreFileReader reader = storeFile.getReader();
254        try (StoreFileScanner scanner =
255          reader.getStoreFileScanner(false, false, isCompaction, Long.MAX_VALUE, 0, false)) {
256          boolean hasNext = scanner.seek(KeyValue.LOWESTKEY);
257          assert hasNext : "StoreFile contains no data";
258          for (ExtendedCell cell = scanner.next(); cell != null; cell = scanner.next()) {
259            ExtendedCell nextIndexedKey = scanner.getNextIndexedKey();
260            if (nextIndexedKey == null) {
261              break;
262            }
263            scanner.seek(nextIndexedKey);
264          }
265        }
266      }
267    } catch (IOException e) {
268      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
269      throw e;
270    } finally {
271      if (storeFile != null) {
272        storeFile.closeStoreFile(false);
273      }
274    }
275  }
276
277  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
278    throws IOException {
279    if (CollectionUtils.isEmpty(files)) {
280      return Collections.emptyList();
281    }
282    // initialize the thread pool for opening store files in parallel..
283    ExecutorService storeFileOpenerThreadPool =
284      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName()
285        + "-" + ctx.getFamily().getNameAsString());
286    CompletionService<HStoreFile> completionService =
287      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
288
289    int totalValidStoreFile = 0;
290    for (StoreFileInfo storeFileInfo : files) {
291      // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
292      // our store's CompoundConfiguration here.
293      storeFileInfo.setConf(conf);
294      // open each store file in parallel
295      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
296      totalValidStoreFile++;
297    }
298
299    Set<String> compactedStoreFiles = new HashSet<>();
300    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
301    IOException ioe = null;
302    try {
303      for (int i = 0; i < totalValidStoreFile; i++) {
304        try {
305          HStoreFile storeFile = completionService.take().get();
306          if (storeFile != null) {
307            LOG.debug("loaded {}", storeFile);
308            results.add(storeFile);
309            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
310          }
311        } catch (InterruptedException e) {
312          if (ioe == null) {
313            ioe = new InterruptedIOException(e.getMessage());
314          }
315        } catch (ExecutionException e) {
316          if (ioe == null) {
317            ioe = new IOException(e.getCause());
318          }
319        }
320      }
321    } finally {
322      storeFileOpenerThreadPool.shutdownNow();
323    }
324    if (ioe != null) {
325      // close StoreFile readers
326      boolean evictOnClose = ctx.getCacheConf() == null || ctx.getCacheConf().shouldEvictOnClose();
327      for (HStoreFile file : results) {
328        try {
329          if (file != null) {
330            file.closeStoreFile(evictOnClose);
331          }
332        } catch (IOException e) {
333          LOG.warn("Could not close store file {}", file, e);
334        }
335      }
336      throw ioe;
337    }
338
339    // Should not archive the compacted store files when region warmup. See HBASE-22163.
340    if (!warmup) {
341      // Remove the compacted files from result
342      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
343      for (HStoreFile storeFile : results) {
344        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
345          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
346          storeFile.getReader().close(
347            storeFile.getCacheConf() == null || storeFile.getCacheConf().shouldEvictOnClose());
348          filesToRemove.add(storeFile);
349        }
350      }
351      results.removeAll(filesToRemove);
352      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
353        LOG.debug("Moving the files {} to archive", filesToRemove);
354        storeFileTracker.removeStoreFiles(filesToRemove);
355      }
356    }
357
358    return results;
359  }
360
361  public void initialize(boolean warmup) throws IOException {
362    List<StoreFileInfo> fileInfos = storeFileTracker.load();
363    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
364    storeFileManager.loadFiles(files);
365  }
366
367  public void refreshStoreFiles() throws IOException {
368    List<StoreFileInfo> fileInfos = storeFileTracker.load();
369    refreshStoreFilesInternal(fileInfos);
370  }
371
372  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
373    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
374    for (String file : newFiles) {
375      storeFiles.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(),
376        file, storeFileTracker));
377    }
378    refreshStoreFilesInternal(storeFiles);
379  }
380
381  /**
382   * Checks the underlying store files, and opens the files that have not been opened, and removes
383   * the store file readers for store files no longer available. Mainly used by secondary region
384   * replicas to keep up to date with the primary region files.
385   */
386  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
387    Collection<HStoreFile> currentFiles = storeFileManager.getStoreFiles();
388    Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
389    if (currentFiles == null) {
390      currentFiles = Collections.emptySet();
391    }
392    if (newFiles == null) {
393      newFiles = Collections.emptySet();
394    }
395    if (compactedFiles == null) {
396      compactedFiles = Collections.emptySet();
397    }
398
399    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
400    for (HStoreFile sf : currentFiles) {
401      currentFilesSet.put(sf.getFileInfo(), sf);
402    }
403    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
404    for (HStoreFile sf : compactedFiles) {
405      compactedFilesSet.put(sf.getFileInfo(), sf);
406    }
407
408    Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles);
409    // Exclude the files that have already been compacted
410    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
411    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
412    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
413
414    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
415      return;
416    }
417
418    LOG.info("Refreshing store files for {} files to add: {} files to remove: {}", this,
419      toBeAddedFiles, toBeRemovedFiles);
420
421    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
422    for (StoreFileInfo sfi : toBeRemovedFiles) {
423      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
424    }
425
426    // try to open the files
427    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
428
429    // propagate the file changes to the underlying store file manager
430    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
431    }, () -> {
432    }); // won't throw an exception
433  }
434
435  /**
436   * Commit the given {@code files}.
437   * <p/>
438   * We will move the file into data directory, and open it.
439   * @param files        the files want to commit
440   * @param isCompaction whether this is called from the context of a compaction
441   * @param validate     whether to validate the store files
442   * @return the committed store files
443   */
444  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean isCompaction, boolean validate)
445    throws IOException {
446    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
447    HRegionFileSystem hfs = ctx.getRegionFileSystem();
448    String familyName = ctx.getFamily().getNameAsString();
449    Path storeDir = hfs.getStoreDir(familyName);
450    for (Path file : files) {
451      try {
452        if (validate) {
453          validateStoreFile(file, isCompaction);
454        }
455        Path committedPath;
456        // As we want to support writing to data directory directly, here we need to check whether
457        // the store file is already in the right place
458        if (file.getParent() != null && file.getParent().equals(storeDir)) {
459          // already in the right place, skip renaming
460          committedPath = file;
461        } else {
462          // Write-out finished successfully, move into the right spot
463          committedPath = hfs.commitStoreFile(familyName, file);
464        }
465        HStoreFile sf = createStoreFileAndReader(committedPath);
466        committedFiles.add(sf);
467      } catch (IOException e) {
468        LOG.error("Failed to commit store file {}", file, e);
469        // Try to delete the files we have committed before.
470        // It is OK to fail when deleting as leaving the file there does not cause any data
471        // corruption problem. It just introduces some duplicated data which may impact read
472        // performance a little when reading before compaction.
473        for (HStoreFile sf : committedFiles) {
474          Path pathToDelete = sf.getPath();
475          try {
476            sf.deleteStoreFile();
477          } catch (IOException deleteEx) {
478            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
479              deleteEx);
480          }
481        }
482        throw new IOException("Failed to commit the flush", e);
483      }
484    }
485    return committedFiles;
486  }
487
488  /**
489   * Add the store files to store file manager, and also record it in the store file tracker.
490   * <p/>
491   * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
492   * the lock protection. Usually this is for clear the memstore snapshot.
493   */
494  public void addStoreFiles(Collection<HStoreFile> storeFiles,
495    IOExceptionRunnable actionAfterAdding) throws IOException {
496    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
497    writeLock();
498    try {
499      storeFileManager.insertNewFiles(storeFiles);
500      actionAfterAdding.run();
501    } finally {
502      // We need the lock, as long as we are updating the storeFiles
503      // or changing the memstore. Let us release it before calling
504      // notifyChangeReadersObservers. See HBASE-4485 for a possible
505      // deadlock scenario that could have happened if continue to hold
506      // the lock.
507      writeUnlock();
508    }
509  }
510
511  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
512    Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
513    throws IOException {
514    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
515      StoreUtils.toStoreFileInfo(newFiles));
516    walMarkerWriter.run();
517    writeLock();
518    try {
519      storeFileManager.addCompactionResults(compactedFiles, newFiles);
520      actionUnderLock.run();
521    } finally {
522      writeUnlock();
523    }
524  }
525
526  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
527    writeLock();
528    try {
529      storeFileManager.removeCompactedFiles(compactedFiles);
530    } finally {
531      writeUnlock();
532    }
533  }
534
535  /**
536   * Create the StoreEngine configured for the given Store.
537   * @param store          The store. An unfortunate dependency needed due to it being passed to
538   *                       coprocessors via the compactor.
539   * @param conf           Store configuration.
540   * @param cellComparator CellComparator for storeFileManager.
541   * @return StoreEngine to use.
542   */
543  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
544    CellComparator cellComparator) throws IOException {
545    String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
546    try {
547      StoreEngine<?, ?, ?, ?> se =
548        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
549      se.createComponentsOnce(conf, store, cellComparator);
550      return se;
551    } catch (Exception e) {
552      throw new IOException("Unable to load configured store engine '" + className + "'", e);
553    }
554  }
555
556  /**
557   * Whether the implementation of the used storefile tracker requires you to write to temp
558   * directory first, i.e, does not allow broken store files under the actual data directory.
559   */
560  public boolean requireWritingToTmpDirFirst() {
561    return storeFileTracker.requireWritingToTmpDirFirst();
562  }
563
564  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
565      allowedOnPath = ".*/TestHStore.java")
566  ReadWriteLock getLock() {
567    return storeLock;
568  }
569
570  public BloomFilterMetrics getBloomFilterMetrics() {
571    return bloomFilterMetrics;
572  }
573}