1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33
34 import org.apache.commons.cli.CommandLine;
35 import org.apache.commons.cli.CommandLineParser;
36 import org.apache.commons.cli.GnuParser;
37 import org.apache.commons.cli.HelpFormatter;
38 import org.apache.commons.cli.Option;
39 import org.apache.commons.cli.Options;
40 import org.apache.commons.cli.ParseException;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configured;
44 import org.apache.hadoop.fs.FSDataInputStream;
45 import org.apache.hadoop.fs.FileStatus;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.NamespaceDescriptor;
51 import org.apache.hadoop.hbase.io.FileLink;
52 import org.apache.hadoop.hbase.io.HFileLink;
53 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
54 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
55 import org.apache.hadoop.util.Tool;
56 import org.apache.hadoop.util.ToolRunner;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class HFileV1Detector extends Configured implements Tool {
73 private FileSystem fs;
74 private static final Log LOG = LogFactory.getLog(HFileV1Detector.class);
75 private static final int DEFAULT_NUM_OF_THREADS = 10;
76
77
78
79 private static final String PRE_NS_DOT_ARCHIVE = ".archive";
80
81
82
83 private static final String PRE_NS_DOT_TMP = ".tmp";
84 private int numOfThreads;
85
86
87
88 private Path targetDirPath;
89
90
91
92 private ExecutorService exec;
93
94
95
96
97 private final Set<Path> processedTables = new HashSet<Path>();
98
99
100
101 private final Set<Path> corruptedHFiles = Collections
102 .newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
103
104
105
106 private final Set<Path> hFileV1Set = Collections
107 .newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
108
109 private Options options = new Options();
110
111
112
113 private Path defaultNamespace;
114
115 public HFileV1Detector() {
116 Option pathOption = new Option("p", "path", true, "Path to a table, or hbase installation");
117 pathOption.setRequired(false);
118 options.addOption(pathOption);
119 Option threadOption = new Option("n", "numberOfThreads", true,
120 "Number of threads to use while processing HFiles.");
121 threadOption.setRequired(false);
122 options.addOption(threadOption);
123 options.addOption("h", "help", false, "Help");
124 }
125
126 private boolean parseOption(String[] args) throws ParseException, IOException {
127 if (args.length == 0) {
128 return true;
129 }
130 CommandLineParser parser = new GnuParser();
131 CommandLine cmd = parser.parse(options, args);
132 if (cmd.hasOption("h")) {
133 HelpFormatter formatter = new HelpFormatter();
134 formatter.printHelp("HFileV1Detector", options, true);
135 System.out
136 .println("In case no option is provided, it processes hbase.rootdir using 10 threads.");
137 System.out.println("Example:");
138 System.out.println(" To detect any HFileV1 in a given hbase installation '/myhbase':");
139 System.out.println(" $ $HBASE_HOME/bin/hbase " + this.getClass().getName() + " -p /myhbase");
140 System.out.println();
141 return false;
142 }
143
144 if (cmd.hasOption("p")) {
145 this.targetDirPath = new Path(FSUtils.getRootDir(getConf()), cmd.getOptionValue("p"));
146 }
147 try {
148 if (cmd.hasOption("n")) {
149 int n = Integer.parseInt(cmd.getOptionValue("n"));
150 if (n < 0 || n > 100) {
151 LOG.warn("Please use a positive number <= 100 for number of threads."
152 + " Continuing with default value " + DEFAULT_NUM_OF_THREADS);
153 return true;
154 }
155 this.numOfThreads = n;
156 }
157 } catch (NumberFormatException nfe) {
158 LOG.error("Please select a valid number for threads");
159 return false;
160 }
161 return true;
162 }
163
164
165
166
167
168
169
170
171 @Override
172 public int run(String args[]) throws IOException, ParseException {
173 FSUtils.setFsDefault(getConf(), new Path(FSUtils.getRootDir(getConf()).toUri()));
174 fs = FileSystem.get(getConf());
175 numOfThreads = DEFAULT_NUM_OF_THREADS;
176 targetDirPath = FSUtils.getRootDir(getConf());
177 if (!parseOption(args)) {
178 System.exit(-1);
179 }
180 this.exec = Executors.newFixedThreadPool(numOfThreads);
181 try {
182 return processResult(checkForV1Files(targetDirPath));
183 } catch (Exception e) {
184 LOG.error(e);
185 } finally {
186 exec.shutdown();
187 fs.close();
188 }
189 return -1;
190 }
191
192 private void setDefaultNamespaceDir() throws IOException {
193 Path dataDir = new Path(FSUtils.getRootDir(getConf()), HConstants.BASE_NAMESPACE_DIR);
194 defaultNamespace = new Path(dataDir, NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR);
195 }
196
197 private int processResult(Set<Path> regionsWithHFileV1) {
198 LOG.info("Result: \n");
199 printSet(processedTables, "Tables Processed: ");
200
201 int count = hFileV1Set.size();
202 LOG.info("Count of HFileV1: " + count);
203 if (count > 0) printSet(hFileV1Set, "HFileV1:");
204
205 count = corruptedHFiles.size();
206 LOG.info("Count of corrupted files: " + count);
207 if (count > 0) printSet(corruptedHFiles, "Corrupted Files: ");
208
209 count = regionsWithHFileV1.size();
210 LOG.info("Count of Regions with HFileV1: " + count);
211 if (count > 0) printSet(regionsWithHFileV1, "Regions to Major Compact: ");
212
213 return (hFileV1Set.isEmpty() && corruptedHFiles.isEmpty()) ? 0 : 1;
214 }
215
216 private void printSet(Set<Path> result, String msg) {
217 LOG.info(msg);
218 for (Path p : result) {
219 LOG.info(p);
220 }
221 }
222
223
224
225
226
227
228
229 private Set<Path> checkForV1Files(Path targetDir) throws IOException {
230 LOG.info("Target dir is: " + targetDir);
231 if (!fs.exists(targetDir)) {
232 throw new IOException("The given path does not exist: " + targetDir);
233 }
234 if (isTableDir(fs, targetDir)) {
235 processedTables.add(targetDir);
236 return processTable(targetDir);
237 }
238 Set<Path> regionsWithHFileV1 = new HashSet<Path>();
239 FileStatus[] fsStats = fs.listStatus(targetDir);
240 for (FileStatus fsStat : fsStats) {
241 if (isTableDir(fs, fsStat.getPath()) && !isRootTable(fsStat.getPath())) {
242 processedTables.add(fsStat.getPath());
243
244 regionsWithHFileV1.addAll(processTable(fsStat.getPath()));
245 } else {
246 LOG.info("Ignoring path: " + fsStat.getPath());
247 }
248 }
249 return regionsWithHFileV1;
250 }
251
252
253
254
255
256 private boolean isRootTable(Path path) {
257 if (path != null && path.toString().endsWith("-ROOT-")) return true;
258 return false;
259 }
260
261
262
263
264
265
266
267 private Set<Path> processTable(Path tableDir) throws IOException {
268
269 LOG.debug("processing table: " + tableDir);
270 List<Future<Path>> regionLevelResults = new ArrayList<Future<Path>>();
271 Set<Path> regionsWithHFileV1 = new HashSet<Path>();
272
273 FileStatus[] fsStats = fs.listStatus(tableDir);
274 for (FileStatus fsStat : fsStats) {
275
276 if (isRegionDir(fs, fsStat.getPath())) {
277 regionLevelResults.add(processRegion(fsStat.getPath()));
278 }
279 }
280 for (Future<Path> f : regionLevelResults) {
281 try {
282 if (f.get() != null) {
283 regionsWithHFileV1.add(f.get());
284 }
285 } catch (InterruptedException e) {
286 LOG.error(e);
287 } catch (ExecutionException e) {
288 LOG.error(e);
289 }
290 }
291 return regionsWithHFileV1;
292 }
293
294
295
296
297
298
299
300 private Future<Path> processRegion(final Path regionDir) {
301 LOG.debug("processing region: " + regionDir);
302 Callable<Path> regionCallable = new Callable<Path>() {
303 @Override
304 public Path call() throws Exception {
305 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
306 FileStatus[] storeFiles = FSUtils.listStatus(fs, familyDir);
307 if (storeFiles == null || storeFiles.length == 0) continue;
308 for (FileStatus storeFile : storeFiles) {
309 Path storeFilePath = storeFile.getPath();
310 FSDataInputStream fsdis = null;
311 long lenToRead = 0;
312 try {
313
314 if (StoreFileInfo.isReference(storeFilePath)) continue;
315
316 else if (HFileLink.isHFileLink(storeFilePath)) {
317 FileLink fLink = getFileLinkWithPreNSPath(storeFilePath);
318 fsdis = fLink.open(fs);
319 lenToRead = fLink.getFileStatus(fs).getLen();
320 } else {
321
322 fsdis = fs.open(storeFilePath);
323 lenToRead = storeFile.getLen();
324 }
325 int majorVersion = computeMajorVersion(fsdis, lenToRead);
326 if (majorVersion == 1) {
327 hFileV1Set.add(storeFilePath);
328
329 return regionDir;
330 }
331 if (majorVersion > 2 || majorVersion < 1) throw new IllegalArgumentException(
332 "Incorrect major version: " + majorVersion);
333 } catch (Exception iae) {
334 corruptedHFiles.add(storeFilePath);
335 LOG.error("Got exception while reading trailer for file: "+ storeFilePath, iae);
336 } finally {
337 if (fsdis != null) fsdis.close();
338 }
339 }
340 }
341 return null;
342 }
343
344 private int computeMajorVersion(FSDataInputStream istream, long fileSize)
345 throws IOException {
346
347 long seekPoint = fileSize - Bytes.SIZEOF_INT;
348 if (seekPoint < 0)
349 throw new IllegalArgumentException("File too small, no major version found");
350
351
352 istream.seek(seekPoint);
353 int version = istream.readInt();
354
355 return version & 0x00ffffff;
356 }
357 };
358 Future<Path> f = exec.submit(regionCallable);
359 return f;
360 }
361
362
363
364
365
366
367
368
369 public FileLink getFileLinkWithPreNSPath(Path storeFilePath) throws IOException {
370 HFileLink link = HFileLink.buildFromHFileLinkPattern(getConf(), storeFilePath);
371 List<Path> pathsToProcess = getPreNSPathsForHFileLink(link);
372 pathsToProcess.addAll(Arrays.asList(link.getLocations()));
373 return new FileLink(pathsToProcess);
374 }
375
376 private List<Path> getPreNSPathsForHFileLink(HFileLink fileLink) throws IOException {
377 if (defaultNamespace == null) setDefaultNamespaceDir();
378 List<Path> p = new ArrayList<Path>();
379 String relativeTablePath = removeDefaultNSPath(fileLink.getOriginPath());
380 p.add(getPreNSPath(PRE_NS_DOT_ARCHIVE, relativeTablePath));
381 p.add(getPreNSPath(PRE_NS_DOT_TMP, relativeTablePath));
382 p.add(getPreNSPath(null, relativeTablePath));
383 return p;
384 }
385
386
387
388
389
390 private String removeDefaultNSPath(Path originalPath) {
391 String pathStr = originalPath.toString();
392 if (!pathStr.startsWith(defaultNamespace.toString())) return pathStr;
393 return pathStr.substring(defaultNamespace.toString().length() + 1);
394 }
395
396 private Path getPreNSPath(String prefix, String relativeTablePath) throws IOException {
397 String relativePath = (prefix == null ? relativeTablePath : prefix + Path.SEPARATOR
398 + relativeTablePath);
399 return new Path(FSUtils.getRootDir(getConf()), relativePath);
400 }
401
402 private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
403
404
405 if (fs.isFile(path)) return false;
406 return (FSTableDescriptors.getTableInfoPath(fs, path) != null || FSTableDescriptors
407 .getCurrentTableInfoStatus(fs, path, false) != null) || path.toString().endsWith(".META.");
408 }
409
410 private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
411 if (fs.isFile(path)) return false;
412 Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
413 return fs.exists(regionInfo);
414
415 }
416
417 public static void main(String args[]) throws Exception {
418 System.exit(ToolRunner.run(HBaseConfiguration.create(), new HFileV1Detector(), args));
419 }
420
421 }