001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.StoreFileWriter.shouldEnableHistoricalCompactionFiles; 021 022import edu.umd.cs.findbugs.annotations.Nullable; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Comparator; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Optional; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparator; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection; 041import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 042import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 043 044/** 045 * Default implementation of StoreFileManager. Not thread-safe. 046 */ 047@InterfaceAudience.Private 048class DefaultStoreFileManager implements StoreFileManager { 049 private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFileManager.class); 050 051 private final CellComparator cellComparator; 052 private final CompactionConfiguration comConf; 053 private final int blockingFileCount; 054 private final Comparator<HStoreFile> storeFileComparator; 055 056 static class StoreFileList { 057 /** 058 * List of store files inside this store. This is an immutable list that is atomically replaced 059 * when its contents change. 060 */ 061 final ImmutableList<HStoreFile> all; 062 /** 063 * List of store files that include the latest cells inside this store. This is an immutable 064 * list that is atomically replaced when its contents change. 065 */ 066 @Nullable 067 final ImmutableList<HStoreFile> live; 068 069 StoreFileList(ImmutableList<HStoreFile> storeFiles, ImmutableList<HStoreFile> liveStoreFiles) { 070 this.all = storeFiles; 071 this.live = liveStoreFiles; 072 } 073 } 074 075 private volatile StoreFileList storeFiles; 076 077 /** 078 * List of compacted files inside this store that needs to be excluded in reads because further 079 * new reads will be using only the newly created files out of compaction. These compacted files 080 * will be deleted/cleared once all the existing readers on these compacted files are done. 081 */ 082 private volatile ImmutableList<HStoreFile> compactedfiles = ImmutableList.of(); 083 private final boolean enableLiveFileTracking; 084 085 public DefaultStoreFileManager(CellComparator cellComparator, 086 Comparator<HStoreFile> storeFileComparator, Configuration conf, 087 CompactionConfiguration comConf) { 088 this.cellComparator = cellComparator; 089 this.storeFileComparator = storeFileComparator; 090 this.comConf = comConf; 091 blockingFileCount = 092 conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); 093 enableLiveFileTracking = shouldEnableHistoricalCompactionFiles(conf); 094 storeFiles = 095 new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null); 096 } 097 098 private List<HStoreFile> getLiveFiles(Collection<HStoreFile> storeFiles) throws IOException { 099 List<HStoreFile> liveFiles = new ArrayList<>(storeFiles.size()); 100 for (HStoreFile file : storeFiles) { 101 file.initReader(); 102 if (!file.isHistorical()) { 103 liveFiles.add(file); 104 } 105 } 106 return liveFiles; 107 } 108 109 @Override 110 public void loadFiles(List<HStoreFile> storeFiles) throws IOException { 111 this.storeFiles = new StoreFileList(ImmutableList.sortedCopyOf(storeFileComparator, storeFiles), 112 enableLiveFileTracking 113 ? ImmutableList.sortedCopyOf(storeFileComparator, getLiveFiles(storeFiles)) 114 : null); 115 } 116 117 @Override 118 public final Collection<HStoreFile> getStoreFiles() { 119 return storeFiles.all; 120 } 121 122 @Override 123 public Collection<HStoreFile> getCompactedfiles() { 124 return compactedfiles; 125 } 126 127 @Override 128 public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException { 129 storeFiles = new StoreFileList( 130 ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(storeFiles.all, sfs)), 131 enableLiveFileTracking 132 ? ImmutableList.sortedCopyOf(storeFileComparator, 133 Iterables.concat(storeFiles.live, getLiveFiles(sfs))) 134 : null); 135 } 136 137 @Override 138 public ImmutableCollection<HStoreFile> clearFiles() { 139 ImmutableList<HStoreFile> result = storeFiles.all; 140 storeFiles = 141 new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null); 142 return result; 143 } 144 145 @Override 146 public Collection<HStoreFile> clearCompactedFiles() { 147 List<HStoreFile> result = compactedfiles; 148 compactedfiles = ImmutableList.of(); 149 return result; 150 } 151 152 @Override 153 public final int getStorefileCount() { 154 return storeFiles.all.size(); 155 } 156 157 @Override 158 public final int getCompactedFilesCount() { 159 return compactedfiles.size(); 160 } 161 162 @Override 163 public void addCompactionResults(Collection<HStoreFile> newCompactedfiles, 164 Collection<HStoreFile> results) throws IOException { 165 ImmutableList<HStoreFile> liveStoreFiles = null; 166 if (enableLiveFileTracking) { 167 liveStoreFiles = ImmutableList.sortedCopyOf(storeFileComparator, 168 Iterables.concat(Iterables.filter(storeFiles.live, sf -> !newCompactedfiles.contains(sf)), 169 getLiveFiles(results))); 170 } 171 storeFiles = 172 new StoreFileList( 173 ImmutableList 174 .sortedCopyOf(storeFileComparator, 175 Iterables.concat( 176 Iterables.filter(storeFiles.all, sf -> !newCompactedfiles.contains(sf)), results)), 177 liveStoreFiles); 178 // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized 179 // Let a background thread close the actual reader on these compacted files and also 180 // ensure to evict the blocks from block cache so that they are no longer in 181 // cache 182 newCompactedfiles.forEach(HStoreFile::markCompactedAway); 183 compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator, 184 Iterables.concat(compactedfiles, newCompactedfiles)); 185 } 186 187 @Override 188 public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles) { 189 compactedfiles = compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf)) 190 .sorted(storeFileComparator).collect(ImmutableList.toImmutableList()); 191 } 192 193 @Override 194 public final Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(KeyValue targetKey) { 195 return storeFiles.all.reverse().iterator(); 196 } 197 198 @Override 199 public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore( 200 Iterator<HStoreFile> candidateFiles, KeyValue targetKey, Cell candidate) { 201 // Default store has nothing useful to do here. 202 // TODO: move this comment when implementing Level: 203 // Level store can trim the list by range, removing all the files which cannot have 204 // any useful candidates less than "candidate". 205 return candidateFiles; 206 } 207 208 @Override 209 public final Optional<byte[]> getSplitPoint() throws IOException { 210 return StoreUtils.getSplitPoint(storeFiles.all, cellComparator); 211 } 212 213 @Override 214 public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow, 215 byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) { 216 if (onlyLatestVersion && enableLiveFileTracking) { 217 return storeFiles.live; 218 } 219 // We cannot provide any useful input and already have the files sorted by seqNum. 220 return getStoreFiles(); 221 } 222 223 @Override 224 public int getStoreCompactionPriority() { 225 int priority = blockingFileCount - storeFiles.all.size(); 226 return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority; 227 } 228 229 @Override 230 public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) { 231 ImmutableList<HStoreFile> files = storeFiles.all; 232 // 1) We can never get rid of the last file which has the maximum seqid. 233 // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. 234 return files.stream().limit(Math.max(0, files.size() - 1)).filter(sf -> { 235 long fileTs = sf.getReader().getMaxTimestamp(); 236 if (fileTs < maxTs && !filesCompacting.contains(sf)) { 237 LOG.info("Found an expired store file {} whose maxTimestamp is {}, which is below {}", 238 sf.getPath(), fileTs, maxTs); 239 return true; 240 } else { 241 return false; 242 } 243 }).collect(Collectors.toList()); 244 } 245 246 @Override 247 public double getCompactionPressure() { 248 int storefileCount = getStorefileCount(); 249 int minFilesToCompact = comConf.getMinFilesToCompact(); 250 if (storefileCount <= minFilesToCompact) { 251 return 0.0; 252 } 253 return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact); 254 } 255 256 @Override 257 public Comparator<HStoreFile> getStoreFileComparator() { 258 return storeFileComparator; 259 } 260}