001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.StoreFileWriter.shouldEnableHistoricalCompactionFiles;
021
022import edu.umd.cs.findbugs.annotations.Nullable;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Comparator;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Optional;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparator;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
041import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
042import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
043
044/**
045 * Default implementation of StoreFileManager. Not thread-safe.
046 */
047@InterfaceAudience.Private
048class DefaultStoreFileManager implements StoreFileManager {
049  private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFileManager.class);
050
051  private final CellComparator cellComparator;
052  private final CompactionConfiguration comConf;
053  private final int blockingFileCount;
054  private final Comparator<HStoreFile> storeFileComparator;
055
056  static class StoreFileList {
057    /**
058     * List of store files inside this store. This is an immutable list that is atomically replaced
059     * when its contents change.
060     */
061    final ImmutableList<HStoreFile> all;
062    /**
063     * List of store files that include the latest cells inside this store. This is an immutable
064     * list that is atomically replaced when its contents change.
065     */
066    @Nullable
067    final ImmutableList<HStoreFile> live;
068
069    StoreFileList(ImmutableList<HStoreFile> storeFiles, ImmutableList<HStoreFile> liveStoreFiles) {
070      this.all = storeFiles;
071      this.live = liveStoreFiles;
072    }
073  }
074
075  private volatile StoreFileList storeFiles;
076
077  /**
078   * List of compacted files inside this store that needs to be excluded in reads because further
079   * new reads will be using only the newly created files out of compaction. These compacted files
080   * will be deleted/cleared once all the existing readers on these compacted files are done.
081   */
082  private volatile ImmutableList<HStoreFile> compactedfiles = ImmutableList.of();
083  private final boolean enableLiveFileTracking;
084
085  public DefaultStoreFileManager(CellComparator cellComparator,
086    Comparator<HStoreFile> storeFileComparator, Configuration conf,
087    CompactionConfiguration comConf) {
088    this.cellComparator = cellComparator;
089    this.storeFileComparator = storeFileComparator;
090    this.comConf = comConf;
091    blockingFileCount =
092      conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
093    enableLiveFileTracking = shouldEnableHistoricalCompactionFiles(conf);
094    storeFiles =
095      new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null);
096  }
097
098  private List<HStoreFile> getLiveFiles(Collection<HStoreFile> storeFiles) throws IOException {
099    List<HStoreFile> liveFiles = new ArrayList<>(storeFiles.size());
100    for (HStoreFile file : storeFiles) {
101      file.initReader();
102      if (!file.isHistorical()) {
103        liveFiles.add(file);
104      }
105    }
106    return liveFiles;
107  }
108
109  @Override
110  public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
111    this.storeFiles = new StoreFileList(ImmutableList.sortedCopyOf(storeFileComparator, storeFiles),
112      enableLiveFileTracking
113        ? ImmutableList.sortedCopyOf(storeFileComparator, getLiveFiles(storeFiles))
114        : null);
115  }
116
117  @Override
118  public final Collection<HStoreFile> getStoreFiles() {
119    return storeFiles.all;
120  }
121
122  @Override
123  public Collection<HStoreFile> getCompactedfiles() {
124    return compactedfiles;
125  }
126
127  @Override
128  public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
129    storeFiles = new StoreFileList(
130      ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(storeFiles.all, sfs)),
131      enableLiveFileTracking
132        ? ImmutableList.sortedCopyOf(storeFileComparator,
133          Iterables.concat(storeFiles.live, getLiveFiles(sfs)))
134        : null);
135  }
136
137  @Override
138  public ImmutableCollection<HStoreFile> clearFiles() {
139    ImmutableList<HStoreFile> result = storeFiles.all;
140    storeFiles =
141      new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null);
142    return result;
143  }
144
145  @Override
146  public Collection<HStoreFile> clearCompactedFiles() {
147    List<HStoreFile> result = compactedfiles;
148    compactedfiles = ImmutableList.of();
149    return result;
150  }
151
152  @Override
153  public final int getStorefileCount() {
154    return storeFiles.all.size();
155  }
156
157  @Override
158  public final int getCompactedFilesCount() {
159    return compactedfiles.size();
160  }
161
162  @Override
163  public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
164    Collection<HStoreFile> results) throws IOException {
165    ImmutableList<HStoreFile> liveStoreFiles = null;
166    if (enableLiveFileTracking) {
167      liveStoreFiles = ImmutableList.sortedCopyOf(storeFileComparator,
168        Iterables.concat(Iterables.filter(storeFiles.live, sf -> !newCompactedfiles.contains(sf)),
169          getLiveFiles(results)));
170    }
171    storeFiles =
172      new StoreFileList(
173        ImmutableList
174          .sortedCopyOf(storeFileComparator,
175            Iterables.concat(
176              Iterables.filter(storeFiles.all, sf -> !newCompactedfiles.contains(sf)), results)),
177        liveStoreFiles);
178    // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
179    // Let a background thread close the actual reader on these compacted files and also
180    // ensure to evict the blocks from block cache so that they are no longer in
181    // cache
182    newCompactedfiles.forEach(HStoreFile::markCompactedAway);
183    compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
184      Iterables.concat(compactedfiles, newCompactedfiles));
185  }
186
187  @Override
188  public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles) {
189    compactedfiles = compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
190      .sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
191  }
192
193  @Override
194  public final Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(KeyValue targetKey) {
195    return storeFiles.all.reverse().iterator();
196  }
197
198  @Override
199  public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(
200    Iterator<HStoreFile> candidateFiles, KeyValue targetKey, Cell candidate) {
201    // Default store has nothing useful to do here.
202    // TODO: move this comment when implementing Level:
203    // Level store can trim the list by range, removing all the files which cannot have
204    // any useful candidates less than "candidate".
205    return candidateFiles;
206  }
207
208  @Override
209  public final Optional<byte[]> getSplitPoint() throws IOException {
210    return StoreUtils.getSplitPoint(storeFiles.all, cellComparator);
211  }
212
213  @Override
214  public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
215    byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) {
216    if (onlyLatestVersion && enableLiveFileTracking) {
217      return storeFiles.live;
218    }
219    // We cannot provide any useful input and already have the files sorted by seqNum.
220    return getStoreFiles();
221  }
222
223  @Override
224  public int getStoreCompactionPriority() {
225    int priority = blockingFileCount - storeFiles.all.size();
226    return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
227  }
228
229  @Override
230  public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) {
231    ImmutableList<HStoreFile> files = storeFiles.all;
232    // 1) We can never get rid of the last file which has the maximum seqid.
233    // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
234    return files.stream().limit(Math.max(0, files.size() - 1)).filter(sf -> {
235      long fileTs = sf.getReader().getMaxTimestamp();
236      if (fileTs < maxTs && !filesCompacting.contains(sf)) {
237        LOG.info("Found an expired store file {} whose maxTimestamp is {}, which is below {}",
238          sf.getPath(), fileTs, maxTs);
239        return true;
240      } else {
241        return false;
242      }
243    }).collect(Collectors.toList());
244  }
245
246  @Override
247  public double getCompactionPressure() {
248    int storefileCount = getStorefileCount();
249    int minFilesToCompact = comConf.getMinFilesToCompact();
250    if (storefileCount <= minFilesToCompact) {
251      return 0.0;
252    }
253    return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
254  }
255
256  @Override
257  public Comparator<HStoreFile> getStoreFileComparator() {
258    return storeFileComparator;
259  }
260}