1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Set;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.conf.Configured;
33 import org.apache.hadoop.fs.FileStatus;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.FSDataOutputStream;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.io.LongWritable;
38 import org.apache.hadoop.io.NullWritable;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.util.LineReader;
41 import org.apache.hadoop.util.Tool;
42 import org.apache.hadoop.util.ToolRunner;
43 import org.apache.hadoop.mapreduce.InputSplit;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.mapreduce.JobContext;
46 import org.apache.hadoop.mapreduce.Mapper;
47 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
48 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
49 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
50 import org.apache.hadoop.hbase.HBaseConfiguration;
51 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
52 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
53 import org.apache.hadoop.hbase.HTableDescriptor;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.regionserver.HRegion;
56 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
57 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
58 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
59 import org.apache.hadoop.hbase.mapreduce.JobUtil;
60 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63 import org.apache.hadoop.hbase.util.FSTableDescriptors;
64 import org.apache.hadoop.hbase.util.FSUtils;
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
75 public class CompactionTool extends Configured implements Tool {
76 private static final Log LOG = LogFactory.getLog(CompactionTool.class);
77
78 private final static String CONF_TMP_DIR = "hbase.tmp.dir";
79 private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
80 private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
81 private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
82 private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
83
84
85
86
87
88 private static class CompactionWorker {
89 private final boolean keepCompactedFiles;
90 private final boolean deleteCompacted;
91 private final Configuration conf;
92 private final FileSystem fs;
93 private final Path tmpDir;
94
95 public CompactionWorker(final FileSystem fs, final Configuration conf) {
96 this.conf = conf;
97 this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
98 this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
99 this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
100 this.fs = fs;
101 }
102
103
104
105
106
107
108
109
110 public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
111 if (isFamilyDir(fs, path)) {
112 Path regionDir = path.getParent();
113 Path tableDir = regionDir.getParent();
114 HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
115 HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
116 compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce, major);
117 } else if (isRegionDir(fs, path)) {
118 Path tableDir = path.getParent();
119 HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
120 compactRegion(tableDir, htd, path, compactOnce, major);
121 } else if (isTableDir(fs, path)) {
122 compactTable(path, compactOnce, major);
123 } else {
124 throw new IOException(
125 "Specified path is not a table, region or family directory. path=" + path);
126 }
127 }
128
129 private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
130 throws IOException {
131 HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
132 for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
133 compactRegion(tableDir, htd, regionDir, compactOnce, major);
134 }
135 }
136
137 private void compactRegion(final Path tableDir, final HTableDescriptor htd,
138 final Path regionDir, final boolean compactOnce, final boolean major)
139 throws IOException {
140 HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
141 for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
142 compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
143 }
144 }
145
146
147
148
149
150
151 private void compactStoreFiles(final Path tableDir, final HTableDescriptor htd,
152 final HRegionInfo hri, final String familyName, final boolean compactOnce,
153 final boolean major) throws IOException {
154 HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
155 LOG.info("Compact table=" + htd.getTableName() +
156 " region=" + hri.getRegionNameAsString() +
157 " family=" + familyName);
158 if (major) {
159 store.triggerMajorCompaction();
160 }
161 do {
162 CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
163 if (compaction == null) break;
164 List<StoreFile> storeFiles =
165 store.compact(compaction, NoLimitCompactionThroughputController.INSTANCE);
166 if (storeFiles != null && !storeFiles.isEmpty()) {
167 if (keepCompactedFiles && deleteCompacted) {
168 for (StoreFile storeFile: storeFiles) {
169 fs.delete(storeFile.getPath(), false);
170 }
171 }
172 }
173 } while (store.needsCompaction() && !compactOnce);
174 }
175
176
177
178
179
180 private static HStore getStore(final Configuration conf, final FileSystem fs,
181 final Path tableDir, final HTableDescriptor htd, final HRegionInfo hri,
182 final String familyName, final Path tempDir) throws IOException {
183 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
184 @Override
185 public Path getTempDir() {
186 return tempDir;
187 }
188 };
189 HRegion region = new HRegion(regionFs, null, conf, htd, null);
190 return new HStore(region, htd.getFamily(Bytes.toBytes(familyName)), conf);
191 }
192 }
193
194 private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
195 Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
196 return fs.exists(regionInfo);
197 }
198
199 private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
200 return FSTableDescriptors.getTableInfoPath(fs, path) != null;
201 }
202
203 private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
204 return isRegionDir(fs, path.getParent());
205 }
206
207 private static class CompactionMapper
208 extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
209 private CompactionWorker compactor = null;
210 private boolean compactOnce = false;
211 private boolean major = false;
212
213 @Override
214 public void setup(Context context) {
215 Configuration conf = context.getConfiguration();
216 compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
217 major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
218
219 try {
220 FileSystem fs = FileSystem.get(conf);
221 this.compactor = new CompactionWorker(fs, conf);
222 } catch (IOException e) {
223 throw new RuntimeException("Could not get the input FileSystem", e);
224 }
225 }
226
227 @Override
228 public void map(LongWritable key, Text value, Context context)
229 throws InterruptedException, IOException {
230 Path path = new Path(value.toString());
231 this.compactor.compact(path, compactOnce, major);
232 }
233 }
234
235
236
237
238 private static class CompactionInputFormat extends TextInputFormat {
239 @Override
240 protected boolean isSplitable(JobContext context, Path file) {
241 return true;
242 }
243
244
245
246
247
248 @Override
249 public List<InputSplit> getSplits(JobContext job) throws IOException {
250 List<InputSplit> splits = new ArrayList<InputSplit>();
251 List<FileStatus> files = listStatus(job);
252
253 Text key = new Text();
254 for (FileStatus file: files) {
255 Path path = file.getPath();
256 FileSystem fs = path.getFileSystem(job.getConfiguration());
257 LineReader reader = new LineReader(fs.open(path));
258 long pos = 0;
259 int n;
260 try {
261 while ((n = reader.readLine(key)) > 0) {
262 String[] hosts = getStoreDirHosts(fs, path);
263 splits.add(new FileSplit(path, pos, n, hosts));
264 pos += n;
265 }
266 } finally {
267 reader.close();
268 }
269 }
270
271 return splits;
272 }
273
274
275
276
277 private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
278 throws IOException {
279 FileStatus[] files = FSUtils.listStatus(fs, path);
280 if (files == null) {
281 return new String[] {};
282 }
283
284 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
285 for (FileStatus hfileStatus: files) {
286 HDFSBlocksDistribution storeFileBlocksDistribution =
287 FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
288 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
289 }
290
291 List<String> hosts = hdfsBlocksDistribution.getTopHosts();
292 return hosts.toArray(new String[hosts.size()]);
293 }
294
295
296
297
298
299
300 public static void createInputFile(final FileSystem fs, final Path path,
301 final Set<Path> toCompactDirs) throws IOException {
302
303 List<Path> storeDirs = new LinkedList<Path>();
304 for (Path compactDir: toCompactDirs) {
305 if (isFamilyDir(fs, compactDir)) {
306 storeDirs.add(compactDir);
307 } else if (isRegionDir(fs, compactDir)) {
308 for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
309 storeDirs.add(familyDir);
310 }
311 } else if (isTableDir(fs, compactDir)) {
312
313 for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
314 for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
315 storeDirs.add(familyDir);
316 }
317 }
318 } else {
319 throw new IOException(
320 "Specified path is not a table, region or family directory. path=" + compactDir);
321 }
322 }
323
324
325 FSDataOutputStream stream = fs.create(path);
326 LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
327 try {
328 final byte[] newLine = Bytes.toBytes("\n");
329 for (Path storeDir: storeDirs) {
330 stream.write(Bytes.toBytes(storeDir.toString()));
331 stream.write(newLine);
332 }
333 } finally {
334 stream.close();
335 }
336 }
337 }
338
339
340
341
342 private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
343 final boolean compactOnce, final boolean major) throws Exception {
344 Configuration conf = getConf();
345 conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
346 conf.setBoolean(CONF_COMPACT_MAJOR, major);
347
348 Job job = new Job(conf);
349 job.setJobName("CompactionTool");
350 job.setJarByClass(CompactionTool.class);
351 job.setMapperClass(CompactionMapper.class);
352 job.setInputFormatClass(CompactionInputFormat.class);
353 job.setOutputFormatClass(NullOutputFormat.class);
354 job.setMapSpeculativeExecution(false);
355 job.setNumReduceTasks(0);
356
357
358 TableMapReduceUtil.addDependencyJars(job);
359
360 Path stagingDir = JobUtil.getStagingDir(conf);
361 try {
362
363 Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
364 CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
365 CompactionInputFormat.addInputPath(job, inputPath);
366
367
368 TableMapReduceUtil.initCredentials(job);
369
370
371 return job.waitForCompletion(true) ? 0 : 1;
372 } finally {
373 fs.delete(stagingDir, true);
374 }
375 }
376
377
378
379
380 private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
381 final boolean compactOnce, final boolean major) throws IOException {
382 CompactionWorker worker = new CompactionWorker(fs, getConf());
383 for (Path path: toCompactDirs) {
384 worker.compact(path, compactOnce, major);
385 }
386 return 0;
387 }
388
389 @Override
390 public int run(String[] args) throws Exception {
391 Set<Path> toCompactDirs = new HashSet<Path>();
392 boolean compactOnce = false;
393 boolean major = false;
394 boolean mapred = false;
395
396 Configuration conf = getConf();
397 FileSystem fs = FileSystem.get(conf);
398
399 try {
400 for (int i = 0; i < args.length; ++i) {
401 String opt = args[i];
402 if (opt.equals("-compactOnce")) {
403 compactOnce = true;
404 } else if (opt.equals("-major")) {
405 major = true;
406 } else if (opt.equals("-mapred")) {
407 mapred = true;
408 } else if (!opt.startsWith("-")) {
409 Path path = new Path(opt);
410 FileStatus status = fs.getFileStatus(path);
411 if (!status.isDirectory()) {
412 printUsage("Specified path is not a directory. path=" + path);
413 return 1;
414 }
415 toCompactDirs.add(path);
416 } else {
417 printUsage();
418 }
419 }
420 } catch (Exception e) {
421 printUsage(e.getMessage());
422 return 1;
423 }
424
425 if (toCompactDirs.size() == 0) {
426 printUsage("No directories to compact specified.");
427 return 1;
428 }
429
430
431 if (mapred) {
432 return doMapReduce(fs, toCompactDirs, compactOnce, major);
433 } else {
434 return doClient(fs, toCompactDirs, compactOnce, major);
435 }
436 }
437
438 private void printUsage() {
439 printUsage(null);
440 }
441
442 private void printUsage(final String message) {
443 if (message != null && message.length() > 0) {
444 System.err.println(message);
445 }
446 System.err.println("Usage: java " + this.getClass().getName() + " \\");
447 System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
448 System.err.println();
449 System.err.println("Options:");
450 System.err.println(" mapred Use MapReduce to run compaction.");
451 System.err.println(" compactOnce Execute just one compaction step. (default: while needed)");
452 System.err.println(" major Trigger major compaction.");
453 System.err.println();
454 System.err.println("Note: -D properties will be applied to the conf used. ");
455 System.err.println("For example: ");
456 System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
457 System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
458 System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
459 System.err.println();
460 System.err.println("Examples:");
461 System.err.println(" To compact the full 'TestTable' using MapReduce:");
462 System.err.println(" $ bin/hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable");
463 System.err.println();
464 System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
465 System.err.println(" $ bin/hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x");
466 }
467
468 public static void main(String[] args) throws Exception {
469 System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
470 }
471 }