001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletionService;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorCompletionService;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.locks.ReadWriteLock;
035import java.util.concurrent.locks.ReentrantReadWriteLock;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
041import org.apache.hadoop.hbase.log.HBaseMarkers;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
044import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
045import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
046import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
047import org.apache.hadoop.hbase.util.IOExceptionRunnable;
048import org.apache.hadoop.hbase.util.ReflectionUtils;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
055
056/**
057 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
058 * all compaction policies, compactors and store file managers are compatible, they are tied
059 * together and replaced together via StoreEngine-s.
060 * <p/>
061 * We expose read write lock methods to upper layer for store operations:<br/>
062 * <ul>
063 * <li>Locked in shared mode when the list of component stores is looked at:
064 * <ul>
065 * <li>all reads/writes to table data</li>
066 * <li>checking for split</li>
067 * </ul>
068 * </li>
069 * <li>Locked in exclusive mode when the list of component stores is modified:
070 * <ul>
071 * <li>closing</li>
072 * <li>completing a compaction</li>
073 * </ul>
074 * </li>
075 * </ul>
076 * <p/>
077 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
078 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
079 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
080 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
081 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
082 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
083 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
084 * we will hold write lock when updating it, the lock is also used to protect the normal read/write
085 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
086 * memory state is, stripe or not, it does not effect how we track the store files. So consider all
087 * these facts, here we introduce a separated SFT to track the store files.
088 * <p/>
089 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
090 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
091 * reduce the possible misuse.
092 */
093@InterfaceAudience.Private
094public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
095  C extends Compactor<?>, SFM extends StoreFileManager> {
096
097  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
098
099  protected SF storeFlusher;
100  protected CP compactionPolicy;
101  protected C compactor;
102  protected SFM storeFileManager;
103
104  private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics();
105  private Configuration conf;
106  private StoreContext ctx;
107  private RegionCoprocessorHost coprocessorHost;
108  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
109  private StoreFileTracker storeFileTracker;
110
111  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
112
113  /**
114   * The name of the configuration parameter that specifies the class of a store engine that is used
115   * to manage and compact HBase store files.
116   */
117  public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
118
119  private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
120    DefaultStoreEngine.class;
121
122  /**
123   * Acquire read lock of this store.
124   */
125  public void readLock() {
126    storeLock.readLock().lock();
127  }
128
129  /**
130   * Release read lock of this store.
131   */
132  public void readUnlock() {
133    storeLock.readLock().unlock();
134  }
135
136  /**
137   * Acquire write lock of this store.
138   */
139  public void writeLock() {
140    storeLock.writeLock().lock();
141  }
142
143  /**
144   * Release write lock of this store.
145   */
146  public void writeUnlock() {
147    storeLock.writeLock().unlock();
148  }
149
150  /** Returns Compaction policy to use. */
151  public CompactionPolicy getCompactionPolicy() {
152    return this.compactionPolicy;
153  }
154
155  /** Returns Compactor to use. */
156  public Compactor<?> getCompactor() {
157    return this.compactor;
158  }
159
160  /** Returns Store file manager to use. */
161  public StoreFileManager getStoreFileManager() {
162    return this.storeFileManager;
163  }
164
165  /** Returns Store flusher to use. */
166  public StoreFlusher getStoreFlusher() {
167    return this.storeFlusher;
168  }
169
170  private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
171    return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
172      store.getStoreContext());
173  }
174
175  /**
176   * @param filesCompacting Files currently compacting
177   * @return whether a compaction selection is possible
178   */
179  public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
180
181  /**
182   * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
183   * start a compaction. See CompactionContext class comment.
184   * @return New CompactionContext object.
185   */
186  public abstract CompactionContext createCompaction() throws IOException;
187
188  /**
189   * Create the StoreEngine's components.
190   */
191  protected abstract void createComponents(Configuration conf, HStore store,
192    CellComparator cellComparator) throws IOException;
193
194  protected final void createComponentsOnce(Configuration conf, HStore store,
195    CellComparator cellComparator) throws IOException {
196    assert compactor == null && compactionPolicy == null && storeFileManager == null
197      && storeFlusher == null && storeFileTracker == null;
198    createComponents(conf, store, cellComparator);
199    this.conf = conf;
200    this.ctx = store.getStoreContext();
201    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
202    this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
203    this.storeFileTracker = createStoreFileTracker(conf, store);
204    assert compactor != null && compactionPolicy != null && storeFileManager != null
205      && storeFlusher != null && storeFileTracker != null;
206  }
207
208  /**
209   * Create a writer for writing new store files.
210   * @return Writer for a new StoreFile
211   */
212  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
213    return storeFileTracker.createWriter(params);
214  }
215
216  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
217    StoreFileInfo info = storeFileTracker.getStoreFileInfo(p, ctx.isPrimaryReplicaStore());
218    return createStoreFileAndReader(info);
219  }
220
221  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
222    info.setRegionCoprocessorHost(coprocessorHost);
223    HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(),
224      ctx.getCacheConf(), bloomFilterMetrics);
225    storeFile.initReader();
226    return storeFile;
227  }
228
229  /**
230   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
231   * operation.
232   * @param path the path to the store file
233   */
234  public void validateStoreFile(Path path) throws IOException {
235    HStoreFile storeFile = null;
236    try {
237      storeFile = createStoreFileAndReader(path);
238    } catch (IOException e) {
239      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
240      throw e;
241    } finally {
242      if (storeFile != null) {
243        storeFile.closeStoreFile(false);
244      }
245    }
246  }
247
248  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
249    throws IOException {
250    if (CollectionUtils.isEmpty(files)) {
251      return Collections.emptyList();
252    }
253    // initialize the thread pool for opening store files in parallel..
254    ExecutorService storeFileOpenerThreadPool =
255      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName()
256        + "-" + ctx.getFamily().getNameAsString());
257    CompletionService<HStoreFile> completionService =
258      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
259
260    int totalValidStoreFile = 0;
261    for (StoreFileInfo storeFileInfo : files) {
262      // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
263      // our store's CompoundConfiguration here.
264      storeFileInfo.setConf(conf);
265      // open each store file in parallel
266      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
267      totalValidStoreFile++;
268    }
269
270    Set<String> compactedStoreFiles = new HashSet<>();
271    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
272    IOException ioe = null;
273    try {
274      for (int i = 0; i < totalValidStoreFile; i++) {
275        try {
276          HStoreFile storeFile = completionService.take().get();
277          if (storeFile != null) {
278            LOG.debug("loaded {}", storeFile);
279            results.add(storeFile);
280            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
281          }
282        } catch (InterruptedException e) {
283          if (ioe == null) {
284            ioe = new InterruptedIOException(e.getMessage());
285          }
286        } catch (ExecutionException e) {
287          if (ioe == null) {
288            ioe = new IOException(e.getCause());
289          }
290        }
291      }
292    } finally {
293      storeFileOpenerThreadPool.shutdownNow();
294    }
295    if (ioe != null) {
296      // close StoreFile readers
297      boolean evictOnClose =
298        ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true;
299      for (HStoreFile file : results) {
300        try {
301          if (file != null) {
302            file.closeStoreFile(evictOnClose);
303          }
304        } catch (IOException e) {
305          LOG.warn("Could not close store file {}", file, e);
306        }
307      }
308      throw ioe;
309    }
310
311    // Should not archive the compacted store files when region warmup. See HBASE-22163.
312    if (!warmup) {
313      // Remove the compacted files from result
314      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
315      for (HStoreFile storeFile : results) {
316        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
317          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
318          storeFile.getReader()
319            .close(storeFile.getCacheConf() != null
320              ? storeFile.getCacheConf().shouldEvictOnClose()
321              : true);
322          filesToRemove.add(storeFile);
323        }
324      }
325      results.removeAll(filesToRemove);
326      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
327        LOG.debug("Moving the files {} to archive", filesToRemove);
328        ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
329          filesToRemove);
330      }
331    }
332
333    return results;
334  }
335
336  public void initialize(boolean warmup) throws IOException {
337    List<StoreFileInfo> fileInfos = storeFileTracker.load();
338    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
339    storeFileManager.loadFiles(files);
340  }
341
342  public void refreshStoreFiles() throws IOException {
343    List<StoreFileInfo> fileInfos = storeFileTracker.load();
344    refreshStoreFilesInternal(fileInfos);
345  }
346
347  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
348    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
349    for (String file : newFiles) {
350      storeFiles.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(),
351        file, storeFileTracker));
352    }
353    refreshStoreFilesInternal(storeFiles);
354  }
355
356  /**
357   * Checks the underlying store files, and opens the files that have not been opened, and removes
358   * the store file readers for store files no longer available. Mainly used by secondary region
359   * replicas to keep up to date with the primary region files.
360   */
361  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
362    Collection<HStoreFile> currentFiles = storeFileManager.getStoreFiles();
363    Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
364    if (currentFiles == null) {
365      currentFiles = Collections.emptySet();
366    }
367    if (newFiles == null) {
368      newFiles = Collections.emptySet();
369    }
370    if (compactedFiles == null) {
371      compactedFiles = Collections.emptySet();
372    }
373
374    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
375    for (HStoreFile sf : currentFiles) {
376      currentFilesSet.put(sf.getFileInfo(), sf);
377    }
378    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
379    for (HStoreFile sf : compactedFiles) {
380      compactedFilesSet.put(sf.getFileInfo(), sf);
381    }
382
383    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
384    // Exclude the files that have already been compacted
385    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
386    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
387    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
388
389    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
390      return;
391    }
392
393    LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles
394      + " files to remove: " + toBeRemovedFiles);
395
396    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
397    for (StoreFileInfo sfi : toBeRemovedFiles) {
398      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
399    }
400
401    // try to open the files
402    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
403
404    // propogate the file changes to the underlying store file manager
405    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
406    }, () -> {
407    }); // won't throw an exception
408  }
409
410  /**
411   * Commit the given {@code files}.
412   * <p/>
413   * We will move the file into data directory, and open it.
414   * @param files    the files want to commit
415   * @param validate whether to validate the store files
416   * @return the committed store files
417   */
418  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
419    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
420    HRegionFileSystem hfs = ctx.getRegionFileSystem();
421    String familyName = ctx.getFamily().getNameAsString();
422    Path storeDir = hfs.getStoreDir(familyName);
423    for (Path file : files) {
424      try {
425        if (validate) {
426          validateStoreFile(file);
427        }
428        Path committedPath;
429        // As we want to support writing to data directory directly, here we need to check whether
430        // the store file is already in the right place
431        if (file.getParent() != null && file.getParent().equals(storeDir)) {
432          // already in the right place, skip renmaing
433          committedPath = file;
434        } else {
435          // Write-out finished successfully, move into the right spot
436          committedPath = hfs.commitStoreFile(familyName, file);
437        }
438        HStoreFile sf = createStoreFileAndReader(committedPath);
439        committedFiles.add(sf);
440      } catch (IOException e) {
441        LOG.error("Failed to commit store file {}", file, e);
442        // Try to delete the files we have committed before.
443        // It is OK to fail when deleting as leaving the file there does not cause any data
444        // corruption problem. It just introduces some duplicated data which may impact read
445        // performance a little when reading before compaction.
446        for (HStoreFile sf : committedFiles) {
447          Path pathToDelete = sf.getPath();
448          try {
449            sf.deleteStoreFile();
450          } catch (IOException deleteEx) {
451            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
452              deleteEx);
453          }
454        }
455        throw new IOException("Failed to commit the flush", e);
456      }
457    }
458    return committedFiles;
459  }
460
461  /**
462   * Add the store files to store file manager, and also record it in the store file tracker.
463   * <p/>
464   * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
465   * the lock protection. Usually this is for clear the memstore snapshot.
466   */
467  public void addStoreFiles(Collection<HStoreFile> storeFiles,
468    IOExceptionRunnable actionAfterAdding) throws IOException {
469    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
470    writeLock();
471    try {
472      storeFileManager.insertNewFiles(storeFiles);
473      actionAfterAdding.run();
474    } finally {
475      // We need the lock, as long as we are updating the storeFiles
476      // or changing the memstore. Let us release it before calling
477      // notifyChangeReadersObservers. See HBASE-4485 for a possible
478      // deadlock scenario that could have happened if continue to hold
479      // the lock.
480      writeUnlock();
481    }
482  }
483
484  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
485    Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
486    throws IOException {
487    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
488      StoreUtils.toStoreFileInfo(newFiles));
489    walMarkerWriter.run();
490    writeLock();
491    try {
492      storeFileManager.addCompactionResults(compactedFiles, newFiles);
493      actionUnderLock.run();
494    } finally {
495      writeUnlock();
496    }
497  }
498
499  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
500    writeLock();
501    try {
502      storeFileManager.removeCompactedFiles(compactedFiles);
503    } finally {
504      writeUnlock();
505    }
506  }
507
508  /**
509   * Create the StoreEngine configured for the given Store.
510   * @param store          The store. An unfortunate dependency needed due to it being passed to
511   *                       coprocessors via the compactor.
512   * @param conf           Store configuration.
513   * @param cellComparator CellComparator for storeFileManager.
514   * @return StoreEngine to use.
515   */
516  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
517    CellComparator cellComparator) throws IOException {
518    String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
519    try {
520      StoreEngine<?, ?, ?, ?> se =
521        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
522      se.createComponentsOnce(conf, store, cellComparator);
523      return se;
524    } catch (Exception e) {
525      throw new IOException("Unable to load configured store engine '" + className + "'", e);
526    }
527  }
528
529  /**
530   * Whether the implementation of the used storefile tracker requires you to write to temp
531   * directory first, i.e, does not allow broken store files under the actual data directory.
532   */
533  public boolean requireWritingToTmpDirFirst() {
534    return storeFileTracker.requireWritingToTmpDirFirst();
535  }
536
537  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
538      allowedOnPath = ".*/TestHStore.java")
539  ReadWriteLock getLock() {
540    return storeLock;
541  }
542
543  public BloomFilterMetrics getBloomFilterMetrics() {
544    return bloomFilterMetrics;
545  }
546}