001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.Comparator; 027import java.util.List; 028import java.util.Map; 029import java.util.NavigableMap; 030import java.util.TreeMap; 031import java.util.concurrent.ForkJoinPool; 032import java.util.regex.Pattern; 033import java.util.zip.CRC32; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.regionserver.StoreContext; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 047 048import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; 049 050/** 051 * To fully avoid listing, here we use two files for tracking. When loading, we will try to read 052 * both the two files, if only one exists, we will trust this one, if both exist, we will compare 053 * the timestamp to see which one is newer and trust that one. And we will record in memory that 054 * which one is trusted by us, and when we need to update the store file list, we will write to the 055 * other file. 056 * <p/> 057 * So in this way, we could avoid listing when we want to load the store file list file. 058 * <p/> 059 * To prevent loading partial file, we use the first 4 bytes as file length, and also append a 4 060 * bytes crc32 checksum at the end. This is because the protobuf message parser sometimes can return 061 * without error on partial bytes if you stop at some special points, but the return message will 062 * have incorrect field value. We should try our best to prevent this happens because loading an 063 * incorrect store file list file usually leads to data loss. 064 * <p/> 065 * To prevent failing silently while downgrading, where we may miss some newly introduced fields in 066 * {@link StoreFileList} which are necessary, we introduce a 'version' field in 067 * {@link StoreFileList}. If we find out that we are reading a {@link StoreFileList} with higher 068 * version, we will fail immediately and tell users that you need extra steps while downgrading, to 069 * prevent potential data loss. 070 */ 071@InterfaceAudience.Private 072class StoreFileListFile { 073 074 private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class); 075 076 // the current version for StoreFileList 077 static final long VERSION = 1; 078 079 static final String TRACK_FILE_DIR = ".filelist"; 080 081 static final String TRACK_FILE_PREFIX = "f1"; 082 083 private static final String TRACK_FILE_ROTATE_PREFIX = "f2"; 084 085 static final char TRACK_FILE_SEPARATOR = '.'; 086 087 static final Pattern TRACK_FILE_PATTERN = Pattern.compile("^f(1|2)\\.\\d+$"); 088 089 // 16 MB, which is big enough for a tracker file 090 private static final int MAX_FILE_SIZE = 16 * 1024 * 1024; 091 092 private final StoreContext ctx; 093 094 private final Path trackFileDir; 095 096 private final Path[] trackFiles = new Path[2]; 097 098 // this is used to make sure that we do not go backwards 099 private long prevTimestamp = -1; 100 101 private int nextTrackFile = -1; 102 103 StoreFileListFile(StoreContext ctx) { 104 this.ctx = ctx; 105 trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR); 106 } 107 108 static StoreFileList load(FileSystem fs, Path path) throws IOException { 109 byte[] data; 110 int expectedChecksum; 111 try (FSDataInputStream in = fs.open(path)) { 112 int length = in.readInt(); 113 if (length <= 0 || length > MAX_FILE_SIZE) { 114 throw new IOException("Invalid file length " + length 115 + ", either less than 0 or greater then max allowed size " + MAX_FILE_SIZE); 116 } 117 data = new byte[length]; 118 in.readFully(data); 119 expectedChecksum = in.readInt(); 120 } 121 CRC32 crc32 = new CRC32(); 122 crc32.update(data); 123 int calculatedChecksum = (int) crc32.getValue(); 124 if (expectedChecksum != calculatedChecksum) { 125 throw new IOException( 126 "Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum); 127 } 128 StoreFileList storeFileList = StoreFileList.parseFrom(data); 129 if (storeFileList.getVersion() > VERSION) { 130 LOG.error( 131 "The loaded store file list is in version {}, which is higher than expected" 132 + " version {}. Stop loading to prevent potential data loss. This usually because your" 133 + " cluster is downgraded from a newer version. You need extra steps before downgrading," 134 + " like switching back to default store file tracker.", 135 storeFileList.getVersion(), VERSION); 136 throw new IOException("Higher store file list version detected, expected " + VERSION 137 + ", got " + storeFileList.getVersion()); 138 } 139 return storeFileList; 140 } 141 142 StoreFileList load(Path path) throws IOException { 143 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 144 return load(fs, path); 145 } 146 147 private int select(StoreFileList[] lists) { 148 if (lists[0] == null) { 149 return 1; 150 } 151 if (lists[1] == null) { 152 return 0; 153 } 154 return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1; 155 } 156 157 // file sequence id to path 158 private NavigableMap<Long, List<Path>> listFiles() throws IOException { 159 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 160 FileStatus[] statuses; 161 try { 162 statuses = fs.listStatus(trackFileDir); 163 } catch (FileNotFoundException e) { 164 LOG.debug("Track file directory {} does not exist", trackFileDir, e); 165 return Collections.emptyNavigableMap(); 166 } 167 if (statuses == null || statuses.length == 0) { 168 return Collections.emptyNavigableMap(); 169 } 170 TreeMap<Long, List<Path>> map = new TreeMap<>(Comparator.reverseOrder()); 171 for (FileStatus status : statuses) { 172 Path file = status.getPath(); 173 if (!status.isFile()) { 174 LOG.warn("Found invalid track file {}, which is not a file", file); 175 continue; 176 } 177 if (!TRACK_FILE_PATTERN.matcher(file.getName()).matches()) { 178 LOG.warn("Found invalid track file {}, skip", file); 179 continue; 180 } 181 List<String> parts = Splitter.on(TRACK_FILE_SEPARATOR).splitToList(file.getName()); 182 map.computeIfAbsent(Long.parseLong(parts.get(1)), k -> new ArrayList<>()).add(file); 183 } 184 return map; 185 } 186 187 private void initializeTrackFiles(long seqId) { 188 trackFiles[0] = new Path(trackFileDir, TRACK_FILE_PREFIX + TRACK_FILE_SEPARATOR + seqId); 189 trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE_PREFIX + TRACK_FILE_SEPARATOR + seqId); 190 LOG.info("Initialized track files: {}, {}", trackFiles[0], trackFiles[1]); 191 } 192 193 private void cleanUpTrackFiles(long loadedSeqId, 194 NavigableMap<Long, List<Path>> seqId2TrackFiles) { 195 LOG.info("Cleanup track file with sequence id < {}", loadedSeqId); 196 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 197 NavigableMap<Long, List<Path>> toDelete = 198 loadedSeqId >= 0 ? seqId2TrackFiles.tailMap(loadedSeqId, false) : seqId2TrackFiles; 199 toDelete.values().stream().flatMap(l -> l.stream()).forEach(file -> { 200 ForkJoinPool.commonPool().execute(() -> { 201 LOG.info("Deleting track file {}", file); 202 try { 203 fs.delete(file, false); 204 } catch (IOException e) { 205 LOG.warn("failed to delete unused track file {}", file, e); 206 } 207 }); 208 }); 209 } 210 211 StoreFileList load(boolean readOnly) throws IOException { 212 NavigableMap<Long, List<Path>> seqId2TrackFiles = listFiles(); 213 long seqId = -1L; 214 StoreFileList[] lists = new StoreFileList[2]; 215 for (Map.Entry<Long, List<Path>> entry : seqId2TrackFiles.entrySet()) { 216 List<Path> files = entry.getValue(); 217 // should not have more than 2 files, if not, it means that the track files are broken, just 218 // throw exception out and fail the region open. 219 if (files.size() > 2) { 220 throw new DoNotRetryIOException("Should only have at most 2 track files for sequence id " 221 + entry.getKey() + ", but got " + files.size() + " files: " + files); 222 } 223 boolean loaded = false; 224 for (int i = 0; i < files.size(); i++) { 225 try { 226 lists[i] = load(files.get(i)); 227 loaded = true; 228 } catch (EOFException e) { 229 // this is normal case, so just log at debug 230 LOG.debug("EOF loading track file {}, ignoring the exception", trackFiles[i], e); 231 } 232 } 233 if (loaded) { 234 seqId = entry.getKey(); 235 break; 236 } 237 } 238 if (readOnly) { 239 return lists[select(lists)]; 240 } 241 242 cleanUpTrackFiles(seqId, seqId2TrackFiles); 243 244 if (seqId < 0) { 245 initializeTrackFiles(System.currentTimeMillis()); 246 nextTrackFile = 0; 247 return null; 248 } 249 250 initializeTrackFiles(Math.max(System.currentTimeMillis(), seqId + 1)); 251 int winnerIndex = select(lists); 252 nextTrackFile = 1 - winnerIndex; 253 prevTimestamp = lists[winnerIndex].getTimestamp(); 254 return lists[winnerIndex]; 255 } 256 257 @RestrictedApi(explanation = "Should only be called in tests", link = "", 258 allowedOnPath = ".*/StoreFileListFile.java|.*/src/test/.*") 259 static void write(FileSystem fs, Path file, StoreFileList storeFileList) throws IOException { 260 byte[] data = storeFileList.toByteArray(); 261 CRC32 crc32 = new CRC32(); 262 crc32.update(data); 263 int checksum = (int) crc32.getValue(); 264 // 4 bytes length at the beginning, plus 4 bytes checksum 265 try (FSDataOutputStream out = fs.create(file, true)) { 266 out.writeInt(data.length); 267 out.write(data); 268 out.writeInt(checksum); 269 } 270 } 271 272 /** 273 * We will set the timestamp and version in this method so just pass the builder in 274 */ 275 void update(StoreFileList.Builder builder) throws IOException { 276 if (nextTrackFile < 0) { 277 // we need to call load first to load the prevTimestamp and also the next file 278 // we are already in the update method, which is not read only, so pass false 279 load(false); 280 } 281 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 282 long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); 283 write(fs, trackFiles[nextTrackFile], 284 builder.setTimestamp(timestamp).setVersion(VERSION).build()); 285 // record timestamp 286 prevTimestamp = timestamp; 287 // rotate the file 288 nextTrackFile = 1 - nextTrackFile; 289 290 try { 291 fs.delete(trackFiles[nextTrackFile], false); 292 } catch (IOException e) { 293 // we will create new file with overwrite = true, so not a big deal here, only for speed up 294 // loading as we do not need to read this file when loading 295 LOG.debug("Failed to delete old track file {}, ignoring the exception", 296 trackFiles[nextTrackFile], e); 297 } 298 } 299}