001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletionService;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorCompletionService;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.locks.ReadWriteLock;
035import java.util.concurrent.locks.ReentrantReadWriteLock;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
041import org.apache.hadoop.hbase.log.HBaseMarkers;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
044import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
045import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
046import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
047import org.apache.hadoop.hbase.util.IOExceptionRunnable;
048import org.apache.hadoop.hbase.util.ReflectionUtils;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
055
056/**
057 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
058 * all compaction policies, compactors and store file managers are compatible, they are tied
059 * together and replaced together via StoreEngine-s.
060 * <p/>
061 * We expose read write lock methods to upper layer for store operations:<br/>
062 * <ul>
063 * <li>Locked in shared mode when the list of component stores is looked at:
064 * <ul>
065 * <li>all reads/writes to table data</li>
066 * <li>checking for split</li>
067 * </ul>
068 * </li>
069 * <li>Locked in exclusive mode when the list of component stores is modified:
070 * <ul>
071 * <li>closing</li>
072 * <li>completing a compaction</li>
073 * </ul>
074 * </li>
075 * </ul>
076 * <p/>
077 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
078 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
079 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
080 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
081 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
082 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
083 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
084 * we will hold write lock when updating it, the lock is also used to protect the normal read/write
085 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
086 * memory state is, stripe or not, it does not effect how we track the store files. So consider all
087 * these facts, here we introduce a separated SFT to track the store files.
088 * <p/>
089 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
090 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
091 * reduce the possible misuse.
092 */
093@InterfaceAudience.Private
094public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
095  C extends Compactor<?>, SFM extends StoreFileManager> {
096
097  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
098
099  protected SF storeFlusher;
100  protected CP compactionPolicy;
101  protected C compactor;
102  protected SFM storeFileManager;
103
104  private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics();
105  private Configuration conf;
106  private StoreContext ctx;
107  private RegionCoprocessorHost coprocessorHost;
108  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
109  private StoreFileTracker storeFileTracker;
110
111  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
112
113  /**
114   * The name of the configuration parameter that specifies the class of a store engine that is used
115   * to manage and compact HBase store files.
116   */
117  public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
118
119  private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
120    DefaultStoreEngine.class;
121
122  /**
123   * Acquire read lock of this store.
124   */
125  public void readLock() {
126    storeLock.readLock().lock();
127  }
128
129  /**
130   * Release read lock of this store.
131   */
132  public void readUnlock() {
133    storeLock.readLock().unlock();
134  }
135
136  /**
137   * Acquire write lock of this store.
138   */
139  public void writeLock() {
140    storeLock.writeLock().lock();
141  }
142
143  /**
144   * Release write lock of this store.
145   */
146  public void writeUnlock() {
147    storeLock.writeLock().unlock();
148  }
149
150  /** Returns Compaction policy to use. */
151  public CompactionPolicy getCompactionPolicy() {
152    return this.compactionPolicy;
153  }
154
155  /** Returns Compactor to use. */
156  public Compactor<?> getCompactor() {
157    return this.compactor;
158  }
159
160  /** Returns Store file manager to use. */
161  public StoreFileManager getStoreFileManager() {
162    return this.storeFileManager;
163  }
164
165  /** Returns Store flusher to use. */
166  public StoreFlusher getStoreFlusher() {
167    return this.storeFlusher;
168  }
169
170  private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
171    return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
172      store.getStoreContext());
173  }
174
175  /**
176   * @param filesCompacting Files currently compacting
177   * @return whether a compaction selection is possible
178   */
179  public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
180
181  /**
182   * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
183   * start a compaction. See CompactionContext class comment.
184   * @return New CompactionContext object.
185   */
186  public abstract CompactionContext createCompaction() throws IOException;
187
188  /**
189   * Create the StoreEngine's components.
190   */
191  protected abstract void createComponents(Configuration conf, HStore store,
192    CellComparator cellComparator) throws IOException;
193
194  protected final void createComponentsOnce(Configuration conf, HStore store,
195    CellComparator cellComparator) throws IOException {
196    assert compactor == null && compactionPolicy == null && storeFileManager == null
197      && storeFlusher == null && storeFileTracker == null;
198    createComponents(conf, store, cellComparator);
199    this.conf = conf;
200    this.ctx = store.getStoreContext();
201    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
202    this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
203    this.storeFileTracker = createStoreFileTracker(conf, store);
204    assert compactor != null && compactionPolicy != null && storeFileManager != null
205      && storeFlusher != null && storeFileTracker != null;
206  }
207
208  /**
209   * Create a writer for writing new store files.
210   * @return Writer for a new StoreFile
211   */
212  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
213    return storeFileTracker.createWriter(params);
214  }
215
216  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
217    StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p,
218      ctx.isPrimaryReplicaStore());
219    return createStoreFileAndReader(info);
220  }
221
222  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
223    info.setRegionCoprocessorHost(coprocessorHost);
224    HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(),
225      ctx.getCacheConf(), bloomFilterMetrics);
226    storeFile.initReader();
227    return storeFile;
228  }
229
230  /**
231   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
232   * operation.
233   * @param path the path to the store file
234   */
235  public void validateStoreFile(Path path) throws IOException {
236    HStoreFile storeFile = null;
237    try {
238      storeFile = createStoreFileAndReader(path);
239    } catch (IOException e) {
240      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
241      throw e;
242    } finally {
243      if (storeFile != null) {
244        storeFile.closeStoreFile(false);
245      }
246    }
247  }
248
249  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
250    throws IOException {
251    if (CollectionUtils.isEmpty(files)) {
252      return Collections.emptyList();
253    }
254    // initialize the thread pool for opening store files in parallel..
255    ExecutorService storeFileOpenerThreadPool =
256      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName()
257        + "-" + ctx.getFamily().getNameAsString());
258    CompletionService<HStoreFile> completionService =
259      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
260
261    int totalValidStoreFile = 0;
262    for (StoreFileInfo storeFileInfo : files) {
263      // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
264      // our store's CompoundConfiguration here.
265      storeFileInfo.setConf(conf);
266      // open each store file in parallel
267      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
268      totalValidStoreFile++;
269    }
270
271    Set<String> compactedStoreFiles = new HashSet<>();
272    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
273    IOException ioe = null;
274    try {
275      for (int i = 0; i < totalValidStoreFile; i++) {
276        try {
277          HStoreFile storeFile = completionService.take().get();
278          if (storeFile != null) {
279            LOG.debug("loaded {}", storeFile);
280            results.add(storeFile);
281            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
282          }
283        } catch (InterruptedException e) {
284          if (ioe == null) {
285            ioe = new InterruptedIOException(e.getMessage());
286          }
287        } catch (ExecutionException e) {
288          if (ioe == null) {
289            ioe = new IOException(e.getCause());
290          }
291        }
292      }
293    } finally {
294      storeFileOpenerThreadPool.shutdownNow();
295    }
296    if (ioe != null) {
297      // close StoreFile readers
298      boolean evictOnClose =
299        ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true;
300      for (HStoreFile file : results) {
301        try {
302          if (file != null) {
303            file.closeStoreFile(evictOnClose);
304          }
305        } catch (IOException e) {
306          LOG.warn("Could not close store file {}", file, e);
307        }
308      }
309      throw ioe;
310    }
311
312    // Should not archive the compacted store files when region warmup. See HBASE-22163.
313    if (!warmup) {
314      // Remove the compacted files from result
315      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
316      for (HStoreFile storeFile : results) {
317        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
318          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
319          storeFile.getReader()
320            .close(storeFile.getCacheConf() != null
321              ? storeFile.getCacheConf().shouldEvictOnClose()
322              : true);
323          filesToRemove.add(storeFile);
324        }
325      }
326      results.removeAll(filesToRemove);
327      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
328        LOG.debug("Moving the files {} to archive", filesToRemove);
329        ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
330          filesToRemove);
331      }
332    }
333
334    return results;
335  }
336
337  public void initialize(boolean warmup) throws IOException {
338    List<StoreFileInfo> fileInfos = storeFileTracker.load();
339    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
340    storeFileManager.loadFiles(files);
341  }
342
343  public void refreshStoreFiles() throws IOException {
344    List<StoreFileInfo> fileInfos = storeFileTracker.load();
345    refreshStoreFilesInternal(fileInfos);
346  }
347
348  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
349    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
350    for (String file : newFiles) {
351      storeFiles
352        .add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file));
353    }
354    refreshStoreFilesInternal(storeFiles);
355  }
356
357  /**
358   * Checks the underlying store files, and opens the files that have not been opened, and removes
359   * the store file readers for store files no longer available. Mainly used by secondary region
360   * replicas to keep up to date with the primary region files.
361   */
362  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
363    Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles();
364    Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
365    if (currentFiles == null) {
366      currentFiles = Collections.emptySet();
367    }
368    if (newFiles == null) {
369      newFiles = Collections.emptySet();
370    }
371    if (compactedFiles == null) {
372      compactedFiles = Collections.emptySet();
373    }
374
375    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
376    for (HStoreFile sf : currentFiles) {
377      currentFilesSet.put(sf.getFileInfo(), sf);
378    }
379    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
380    for (HStoreFile sf : compactedFiles) {
381      compactedFilesSet.put(sf.getFileInfo(), sf);
382    }
383
384    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
385    // Exclude the files that have already been compacted
386    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
387    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
388    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
389
390    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
391      return;
392    }
393
394    LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles
395      + " files to remove: " + toBeRemovedFiles);
396
397    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
398    for (StoreFileInfo sfi : toBeRemovedFiles) {
399      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
400    }
401
402    // try to open the files
403    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
404
405    // propogate the file changes to the underlying store file manager
406    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
407    }, () -> {
408    }); // won't throw an exception
409  }
410
411  /**
412   * Commit the given {@code files}.
413   * <p/>
414   * We will move the file into data directory, and open it.
415   * @param files    the files want to commit
416   * @param validate whether to validate the store files
417   * @return the committed store files
418   */
419  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
420    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
421    HRegionFileSystem hfs = ctx.getRegionFileSystem();
422    String familyName = ctx.getFamily().getNameAsString();
423    Path storeDir = hfs.getStoreDir(familyName);
424    for (Path file : files) {
425      try {
426        if (validate) {
427          validateStoreFile(file);
428        }
429        Path committedPath;
430        // As we want to support writing to data directory directly, here we need to check whether
431        // the store file is already in the right place
432        if (file.getParent() != null && file.getParent().equals(storeDir)) {
433          // already in the right place, skip renmaing
434          committedPath = file;
435        } else {
436          // Write-out finished successfully, move into the right spot
437          committedPath = hfs.commitStoreFile(familyName, file);
438        }
439        HStoreFile sf = createStoreFileAndReader(committedPath);
440        committedFiles.add(sf);
441      } catch (IOException e) {
442        LOG.error("Failed to commit store file {}", file, e);
443        // Try to delete the files we have committed before.
444        // It is OK to fail when deleting as leaving the file there does not cause any data
445        // corruption problem. It just introduces some duplicated data which may impact read
446        // performance a little when reading before compaction.
447        for (HStoreFile sf : committedFiles) {
448          Path pathToDelete = sf.getPath();
449          try {
450            sf.deleteStoreFile();
451          } catch (IOException deleteEx) {
452            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
453              deleteEx);
454          }
455        }
456        throw new IOException("Failed to commit the flush", e);
457      }
458    }
459    return committedFiles;
460  }
461
462  /**
463   * Add the store files to store file manager, and also record it in the store file tracker.
464   * <p/>
465   * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
466   * the lock protection. Usually this is for clear the memstore snapshot.
467   */
468  public void addStoreFiles(Collection<HStoreFile> storeFiles,
469    IOExceptionRunnable actionAfterAdding) throws IOException {
470    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
471    writeLock();
472    try {
473      storeFileManager.insertNewFiles(storeFiles);
474      actionAfterAdding.run();
475    } finally {
476      // We need the lock, as long as we are updating the storeFiles
477      // or changing the memstore. Let us release it before calling
478      // notifyChangeReadersObservers. See HBASE-4485 for a possible
479      // deadlock scenario that could have happened if continue to hold
480      // the lock.
481      writeUnlock();
482    }
483  }
484
485  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
486    Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
487    throws IOException {
488    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
489      StoreUtils.toStoreFileInfo(newFiles));
490    walMarkerWriter.run();
491    writeLock();
492    try {
493      storeFileManager.addCompactionResults(compactedFiles, newFiles);
494      actionUnderLock.run();
495    } finally {
496      writeUnlock();
497    }
498  }
499
500  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
501    writeLock();
502    try {
503      storeFileManager.removeCompactedFiles(compactedFiles);
504    } finally {
505      writeUnlock();
506    }
507  }
508
509  /**
510   * Create the StoreEngine configured for the given Store.
511   * @param store          The store. An unfortunate dependency needed due to it being passed to
512   *                       coprocessors via the compactor.
513   * @param conf           Store configuration.
514   * @param cellComparator CellComparator for storeFileManager.
515   * @return StoreEngine to use.
516   */
517  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
518    CellComparator cellComparator) throws IOException {
519    String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
520    try {
521      StoreEngine<?, ?, ?, ?> se =
522        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
523      se.createComponentsOnce(conf, store, cellComparator);
524      return se;
525    } catch (Exception e) {
526      throw new IOException("Unable to load configured store engine '" + className + "'", e);
527    }
528  }
529
530  /**
531   * Whether the implementation of the used storefile tracker requires you to write to temp
532   * directory first, i.e, does not allow broken store files under the actual data directory.
533   */
534  public boolean requireWritingToTmpDirFirst() {
535    return storeFileTracker.requireWritingToTmpDirFirst();
536  }
537
538  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
539      allowedOnPath = ".*/TestHStore.java")
540  ReadWriteLock getLock() {
541    return storeLock;
542  }
543
544  public BloomFilterMetrics getBloomFilterMetrics() {
545    return bloomFilterMetrics;
546  }
547}