001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletionService;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorCompletionService;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.locks.ReadWriteLock;
035import java.util.concurrent.locks.ReentrantReadWriteLock;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
041import org.apache.hadoop.hbase.log.HBaseMarkers;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
044import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
045import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
046import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
047import org.apache.hadoop.hbase.util.ReflectionUtils;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
053import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
054
055/**
056 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
057 * all compaction policies, compactors and store file managers are compatible, they are tied
058 * together and replaced together via StoreEngine-s.
059 * <p/>
060 * We expose read write lock methods to upper layer for store operations:<br/>
061 * <ul>
062 * <li>Locked in shared mode when the list of component stores is looked at:
063 * <ul>
064 * <li>all reads/writes to table data</li>
065 * <li>checking for split</li>
066 * </ul>
067 * </li>
068 * <li>Locked in exclusive mode when the list of component stores is modified:
069 * <ul>
070 * <li>closing</li>
071 * <li>completing a compaction</li>
072 * </ul>
073 * </li>
074 * </ul>
075 * <p/>
076 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
077 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
078 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
079 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
080 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
081 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
082 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
083 * we will hold write lock when updating it, the lock is also used to protect the normal read/write
084 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
085 * memory state is, stripe or not, it does not effect how we track the store files. So consider all
086 * these facts, here we introduce a separated SFT to track the store files.
087 * <p/>
088 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
089 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
090 * reduce the possible misuse.
091 */
092@InterfaceAudience.Private
093public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
094  C extends Compactor<?>, SFM extends StoreFileManager> {
095
096  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
097
098  protected SF storeFlusher;
099  protected CP compactionPolicy;
100  protected C compactor;
101  protected SFM storeFileManager;
102
103  private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics();
104  private Configuration conf;
105  private StoreContext ctx;
106  private RegionCoprocessorHost coprocessorHost;
107  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
108  private StoreFileTracker storeFileTracker;
109
110  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
111
112  /**
113   * The name of the configuration parameter that specifies the class of a store engine that is used
114   * to manage and compact HBase store files.
115   */
116  public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
117
118  private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
119    DefaultStoreEngine.class;
120
121  /**
122   * Acquire read lock of this store.
123   */
124  public void readLock() {
125    storeLock.readLock().lock();
126  }
127
128  /**
129   * Release read lock of this store.
130   */
131  public void readUnlock() {
132    storeLock.readLock().unlock();
133  }
134
135  /**
136   * Acquire write lock of this store.
137   */
138  public void writeLock() {
139    storeLock.writeLock().lock();
140  }
141
142  /**
143   * Release write lock of this store.
144   */
145  public void writeUnlock() {
146    storeLock.writeLock().unlock();
147  }
148
149  /** Returns Compaction policy to use. */
150  public CompactionPolicy getCompactionPolicy() {
151    return this.compactionPolicy;
152  }
153
154  /** Returns Compactor to use. */
155  public Compactor<?> getCompactor() {
156    return this.compactor;
157  }
158
159  /** Returns Store file manager to use. */
160  public StoreFileManager getStoreFileManager() {
161    return this.storeFileManager;
162  }
163
164  /** Returns Store flusher to use. */
165  public StoreFlusher getStoreFlusher() {
166    return this.storeFlusher;
167  }
168
169  private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
170    return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
171      store.getStoreContext());
172  }
173
174  /**
175   * @param filesCompacting Files currently compacting
176   * @return whether a compaction selection is possible
177   */
178  public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
179
180  /**
181   * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
182   * start a compaction. See CompactionContext class comment.
183   * @return New CompactionContext object.
184   */
185  public abstract CompactionContext createCompaction() throws IOException;
186
187  /**
188   * Create the StoreEngine's components.
189   */
190  protected abstract void createComponents(Configuration conf, HStore store,
191    CellComparator cellComparator) throws IOException;
192
193  protected final void createComponentsOnce(Configuration conf, HStore store,
194    CellComparator cellComparator) throws IOException {
195    assert compactor == null && compactionPolicy == null && storeFileManager == null
196      && storeFlusher == null && storeFileTracker == null;
197    createComponents(conf, store, cellComparator);
198    this.conf = conf;
199    this.ctx = store.getStoreContext();
200    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
201    this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
202    this.storeFileTracker = createStoreFileTracker(conf, store);
203    assert compactor != null && compactionPolicy != null && storeFileManager != null
204      && storeFlusher != null && storeFileTracker != null;
205  }
206
207  /**
208   * Create a writer for writing new store files.
209   * @return Writer for a new StoreFile
210   */
211  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
212    return storeFileTracker.createWriter(params);
213  }
214
215  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
216    StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p,
217      ctx.isPrimaryReplicaStore());
218    return createStoreFileAndReader(info);
219  }
220
221  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
222    info.setRegionCoprocessorHost(coprocessorHost);
223    HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(),
224      ctx.getCacheConf(), bloomFilterMetrics);
225    storeFile.initReader();
226    return storeFile;
227  }
228
229  /**
230   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
231   * operation.
232   * @param path the path to the store file
233   */
234  public void validateStoreFile(Path path) throws IOException {
235    HStoreFile storeFile = null;
236    try {
237      storeFile = createStoreFileAndReader(path);
238    } catch (IOException e) {
239      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
240      throw e;
241    } finally {
242      if (storeFile != null) {
243        storeFile.closeStoreFile(false);
244      }
245    }
246  }
247
248  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
249    throws IOException {
250    if (CollectionUtils.isEmpty(files)) {
251      return Collections.emptyList();
252    }
253    // initialize the thread pool for opening store files in parallel..
254    ExecutorService storeFileOpenerThreadPool =
255      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName()
256        + "-" + ctx.getFamily().getNameAsString());
257    CompletionService<HStoreFile> completionService =
258      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
259
260    int totalValidStoreFile = 0;
261    for (StoreFileInfo storeFileInfo : files) {
262      // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
263      // our store's CompoundConfiguration here.
264      storeFileInfo.setConf(conf);
265      // open each store file in parallel
266      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
267      totalValidStoreFile++;
268    }
269
270    Set<String> compactedStoreFiles = new HashSet<>();
271    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
272    IOException ioe = null;
273    try {
274      for (int i = 0; i < totalValidStoreFile; i++) {
275        try {
276          HStoreFile storeFile = completionService.take().get();
277          if (storeFile != null) {
278            LOG.debug("loaded {}", storeFile);
279            results.add(storeFile);
280            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
281          }
282        } catch (InterruptedException e) {
283          if (ioe == null) {
284            ioe = new InterruptedIOException(e.getMessage());
285          }
286        } catch (ExecutionException e) {
287          if (ioe == null) {
288            ioe = new IOException(e.getCause());
289          }
290        }
291      }
292    } finally {
293      storeFileOpenerThreadPool.shutdownNow();
294    }
295    if (ioe != null) {
296      // close StoreFile readers
297      boolean evictOnClose =
298        ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true;
299      for (HStoreFile file : results) {
300        try {
301          if (file != null) {
302            file.closeStoreFile(evictOnClose);
303          }
304        } catch (IOException e) {
305          LOG.warn("Could not close store file {}", file, e);
306        }
307      }
308      throw ioe;
309    }
310
311    // Should not archive the compacted store files when region warmup. See HBASE-22163.
312    if (!warmup) {
313      // Remove the compacted files from result
314      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
315      for (HStoreFile storeFile : results) {
316        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
317          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
318          storeFile.getReader()
319            .close(storeFile.getCacheConf() != null
320              ? storeFile.getCacheConf().shouldEvictOnClose()
321              : true);
322          filesToRemove.add(storeFile);
323        }
324      }
325      results.removeAll(filesToRemove);
326      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
327        LOG.debug("Moving the files {} to archive", filesToRemove);
328        ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
329          filesToRemove);
330      }
331    }
332
333    return results;
334  }
335
336  public void initialize(boolean warmup) throws IOException {
337    List<StoreFileInfo> fileInfos = storeFileTracker.load();
338    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
339    storeFileManager.loadFiles(files);
340  }
341
342  public void refreshStoreFiles() throws IOException {
343    List<StoreFileInfo> fileInfos = storeFileTracker.load();
344    refreshStoreFilesInternal(fileInfos);
345  }
346
347  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
348    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
349    for (String file : newFiles) {
350      storeFiles
351        .add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file));
352    }
353    refreshStoreFilesInternal(storeFiles);
354  }
355
356  /**
357   * Checks the underlying store files, and opens the files that have not been opened, and removes
358   * the store file readers for store files no longer available. Mainly used by secondary region
359   * replicas to keep up to date with the primary region files.
360   */
361  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
362    Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles();
363    Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
364    if (currentFiles == null) {
365      currentFiles = Collections.emptySet();
366    }
367    if (newFiles == null) {
368      newFiles = Collections.emptySet();
369    }
370    if (compactedFiles == null) {
371      compactedFiles = Collections.emptySet();
372    }
373
374    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
375    for (HStoreFile sf : currentFiles) {
376      currentFilesSet.put(sf.getFileInfo(), sf);
377    }
378    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
379    for (HStoreFile sf : compactedFiles) {
380      compactedFilesSet.put(sf.getFileInfo(), sf);
381    }
382
383    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
384    // Exclude the files that have already been compacted
385    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
386    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
387    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
388
389    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
390      return;
391    }
392
393    LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles
394      + " files to remove: " + toBeRemovedFiles);
395
396    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
397    for (StoreFileInfo sfi : toBeRemovedFiles) {
398      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
399    }
400
401    // try to open the files
402    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
403
404    // propogate the file changes to the underlying store file manager
405    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
406    }, () -> {
407    }); // won't throw an exception
408  }
409
410  /**
411   * Commit the given {@code files}.
412   * <p/>
413   * We will move the file into data directory, and open it.
414   * @param files    the files want to commit
415   * @param validate whether to validate the store files
416   * @return the committed store files
417   */
418  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
419    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
420    HRegionFileSystem hfs = ctx.getRegionFileSystem();
421    String familyName = ctx.getFamily().getNameAsString();
422    Path storeDir = hfs.getStoreDir(familyName);
423    for (Path file : files) {
424      try {
425        if (validate) {
426          validateStoreFile(file);
427        }
428        Path committedPath;
429        // As we want to support writing to data directory directly, here we need to check whether
430        // the store file is already in the right place
431        if (file.getParent() != null && file.getParent().equals(storeDir)) {
432          // already in the right place, skip renmaing
433          committedPath = file;
434        } else {
435          // Write-out finished successfully, move into the right spot
436          committedPath = hfs.commitStoreFile(familyName, file);
437        }
438        HStoreFile sf = createStoreFileAndReader(committedPath);
439        committedFiles.add(sf);
440      } catch (IOException e) {
441        LOG.error("Failed to commit store file {}", file, e);
442        // Try to delete the files we have committed before.
443        // It is OK to fail when deleting as leaving the file there does not cause any data
444        // corruption problem. It just introduces some duplicated data which may impact read
445        // performance a little when reading before compaction.
446        for (HStoreFile sf : committedFiles) {
447          Path pathToDelete = sf.getPath();
448          try {
449            sf.deleteStoreFile();
450          } catch (IOException deleteEx) {
451            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
452              deleteEx);
453          }
454        }
455        throw new IOException("Failed to commit the flush", e);
456      }
457    }
458    return committedFiles;
459  }
460
461  @FunctionalInterface
462  public interface IOExceptionRunnable {
463    void run() throws IOException;
464  }
465
466  /**
467   * Add the store files to store file manager, and also record it in the store file tracker.
468   * <p/>
469   * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
470   * the lock protection. Usually this is for clear the memstore snapshot.
471   */
472  public void addStoreFiles(Collection<HStoreFile> storeFiles,
473    IOExceptionRunnable actionAfterAdding) throws IOException {
474    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
475    writeLock();
476    try {
477      storeFileManager.insertNewFiles(storeFiles);
478      actionAfterAdding.run();
479    } finally {
480      // We need the lock, as long as we are updating the storeFiles
481      // or changing the memstore. Let us release it before calling
482      // notifyChangeReadersObservers. See HBASE-4485 for a possible
483      // deadlock scenario that could have happened if continue to hold
484      // the lock.
485      writeUnlock();
486    }
487  }
488
489  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
490    Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
491    throws IOException {
492    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
493      StoreUtils.toStoreFileInfo(newFiles));
494    walMarkerWriter.run();
495    writeLock();
496    try {
497      storeFileManager.addCompactionResults(compactedFiles, newFiles);
498      actionUnderLock.run();
499    } finally {
500      writeUnlock();
501    }
502  }
503
504  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
505    writeLock();
506    try {
507      storeFileManager.removeCompactedFiles(compactedFiles);
508    } finally {
509      writeUnlock();
510    }
511  }
512
513  /**
514   * Create the StoreEngine configured for the given Store.
515   * @param store          The store. An unfortunate dependency needed due to it being passed to
516   *                       coprocessors via the compactor.
517   * @param conf           Store configuration.
518   * @param cellComparator CellComparator for storeFileManager.
519   * @return StoreEngine to use.
520   */
521  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
522    CellComparator cellComparator) throws IOException {
523    String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
524    try {
525      StoreEngine<?, ?, ?, ?> se =
526        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
527      se.createComponentsOnce(conf, store, cellComparator);
528      return se;
529    } catch (Exception e) {
530      throw new IOException("Unable to load configured store engine '" + className + "'", e);
531    }
532  }
533
534  /**
535   * Whether the implementation of the used storefile tracker requires you to write to temp
536   * directory first, i.e, does not allow broken store files under the actual data directory.
537   */
538  public boolean requireWritingToTmpDirFirst() {
539    return storeFileTracker.requireWritingToTmpDirFirst();
540  }
541
542  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
543      allowedOnPath = ".*/TestHStore.java")
544  ReadWriteLock getLock() {
545    return storeLock;
546  }
547
548  public BloomFilterMetrics getBloomFilterMetrics() {
549    return bloomFilterMetrics;
550  }
551}