View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Set;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.conf.Configured;
33  import org.apache.hadoop.fs.FileStatus;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.FSDataOutputStream;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.io.LongWritable;
38  import org.apache.hadoop.io.NullWritable;
39  import org.apache.hadoop.io.Text;
40  import org.apache.hadoop.util.LineReader;
41  import org.apache.hadoop.util.Tool;
42  import org.apache.hadoop.util.ToolRunner;
43  import org.apache.hadoop.mapreduce.InputSplit;
44  import org.apache.hadoop.mapreduce.Job;
45  import org.apache.hadoop.mapreduce.JobContext;
46  import org.apache.hadoop.mapreduce.Mapper;
47  import org.apache.hadoop.mapreduce.lib.input.FileSplit;
48  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
49  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
52  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
53  import org.apache.hadoop.hbase.HTableDescriptor;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.regionserver.HRegion;
56  import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
57  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
58  import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
59  import org.apache.hadoop.hbase.mapreduce.JobUtil;
60  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63  import org.apache.hadoop.hbase.util.FSTableDescriptors;
64  import org.apache.hadoop.hbase.util.FSUtils;
65  
66  /*
67   * The CompactionTool allows to execute a compaction specifying a:
68   * <ul>
69   *  <li>table folder (all regions and families will be compacted)
70   *  <li>region folder (all families in the region will be compacted)
71   *  <li>family folder (the store files will be compacted)
72   * </ul>
73   */
74  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
75  public class CompactionTool extends Configured implements Tool {
76    private static final Log LOG = LogFactory.getLog(CompactionTool.class);
77  
78    private final static String CONF_TMP_DIR = "hbase.tmp.dir";
79    private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
80    private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
81    private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
82    private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
83  
84    /**
85     * Class responsible to execute the Compaction on the specified path.
86     * The path can be a table, region or family directory.
87     */
88    private static class CompactionWorker {
89      private final boolean keepCompactedFiles;
90      private final boolean deleteCompacted;
91      private final Configuration conf;
92      private final FileSystem fs;
93      private final Path tmpDir;
94  
95      public CompactionWorker(final FileSystem fs, final Configuration conf) {
96        this.conf = conf;
97        this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
98        this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
99        this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
100       this.fs = fs;
101     }
102 
103     /**
104      * Execute the compaction on the specified path.
105      *
106      * @param path Directory path on which to run compaction.
107      * @param compactOnce Execute just a single step of compaction.
108      * @param major Request major compaction.
109      */
110     public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
111       if (isFamilyDir(fs, path)) {
112         Path regionDir = path.getParent();
113         Path tableDir = regionDir.getParent();
114         HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
115         HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
116         compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce, major);
117       } else if (isRegionDir(fs, path)) {
118         Path tableDir = path.getParent();
119         HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
120         compactRegion(tableDir, htd, path, compactOnce, major);
121       } else if (isTableDir(fs, path)) {
122         compactTable(path, compactOnce, major);
123       } else {
124         throw new IOException(
125           "Specified path is not a table, region or family directory. path=" + path);
126       }
127     }
128 
129     private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
130         throws IOException {
131       HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
132       for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
133         compactRegion(tableDir, htd, regionDir, compactOnce, major);
134       }
135     }
136 
137     private void compactRegion(final Path tableDir, final HTableDescriptor htd,
138         final Path regionDir, final boolean compactOnce, final boolean major)
139         throws IOException {
140       HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
141       for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
142         compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
143       }
144     }
145 
146     /**
147      * Execute the actual compaction job.
148      * If the compact once flag is not specified, execute the compaction until
149      * no more compactions are needed. Uses the Configuration settings provided.
150      */
151     private void compactStoreFiles(final Path tableDir, final HTableDescriptor htd,
152         final HRegionInfo hri, final String familyName, final boolean compactOnce,
153         final boolean major) throws IOException {
154       HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
155       LOG.info("Compact table=" + htd.getTableName() +
156         " region=" + hri.getRegionNameAsString() +
157         " family=" + familyName);
158       if (major) {
159         store.triggerMajorCompaction();
160       }
161       do {
162         CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
163         if (compaction == null) break;
164         List<StoreFile> storeFiles =
165             store.compact(compaction, NoLimitCompactionThroughputController.INSTANCE);
166         if (storeFiles != null && !storeFiles.isEmpty()) {
167           if (keepCompactedFiles && deleteCompacted) {
168             for (StoreFile storeFile: storeFiles) {
169               fs.delete(storeFile.getPath(), false);
170             }
171           }
172         }
173       } while (store.needsCompaction() && !compactOnce);
174     }
175 
176     /**
177      * Create a "mock" HStore that uses the tmpDir specified by the user and
178      * the store dir to compact as source.
179      */
180     private static HStore getStore(final Configuration conf, final FileSystem fs,
181         final Path tableDir, final HTableDescriptor htd, final HRegionInfo hri,
182         final String familyName, final Path tempDir) throws IOException {
183       HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
184         @Override
185         public Path getTempDir() {
186           return tempDir;
187         }
188       };
189       HRegion region = new HRegion(regionFs, null, conf, htd, null);
190       return new HStore(region, htd.getFamily(Bytes.toBytes(familyName)), conf);
191     }
192   }
193 
194   private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
195     Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
196     return fs.exists(regionInfo);
197   }
198 
199   private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
200     return FSTableDescriptors.getTableInfoPath(fs, path) != null;
201   }
202 
203   private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
204     return isRegionDir(fs, path.getParent());
205   }
206 
207   private static class CompactionMapper
208       extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
209     private CompactionWorker compactor = null;
210     private boolean compactOnce = false;
211     private boolean major = false;
212 
213     @Override
214     public void setup(Context context) {
215       Configuration conf = context.getConfiguration();
216       compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
217       major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
218 
219       try {
220         FileSystem fs = FileSystem.get(conf);
221         this.compactor = new CompactionWorker(fs, conf);
222       } catch (IOException e) {
223         throw new RuntimeException("Could not get the input FileSystem", e);
224       }
225     }
226 
227     @Override
228     public void map(LongWritable key, Text value, Context context)
229         throws InterruptedException, IOException {
230       Path path = new Path(value.toString());
231       this.compactor.compact(path, compactOnce, major);
232     }
233   }
234 
235   /**
236    * Input format that uses store files block location as input split locality.
237    */
238   private static class CompactionInputFormat extends TextInputFormat {
239     @Override
240     protected boolean isSplitable(JobContext context, Path file) {
241       return true;
242     }
243 
244     /**
245      * Returns a split for each store files directory using the block location
246      * of each file as locality reference.
247      */
248     @Override
249     public List<InputSplit> getSplits(JobContext job) throws IOException {
250       List<InputSplit> splits = new ArrayList<InputSplit>();
251       List<FileStatus> files = listStatus(job);
252 
253       Text key = new Text();
254       for (FileStatus file: files) {
255         Path path = file.getPath();
256         FileSystem fs = path.getFileSystem(job.getConfiguration());
257         LineReader reader = new LineReader(fs.open(path));
258         long pos = 0;
259         int n;
260         try {
261           while ((n = reader.readLine(key)) > 0) {
262             String[] hosts = getStoreDirHosts(fs, path);
263             splits.add(new FileSplit(path, pos, n, hosts));
264             pos += n;
265           }
266         } finally {
267           reader.close();
268         }
269       }
270 
271       return splits;
272     }
273 
274     /**
275      * return the top hosts of the store files, used by the Split
276      */
277     private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
278         throws IOException {
279       FileStatus[] files = FSUtils.listStatus(fs, path);
280       if (files == null) {
281         return new String[] {};
282       }
283 
284       HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
285       for (FileStatus hfileStatus: files) {
286         HDFSBlocksDistribution storeFileBlocksDistribution =
287           FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
288         hdfsBlocksDistribution.add(storeFileBlocksDistribution);
289       }
290 
291       List<String> hosts = hdfsBlocksDistribution.getTopHosts();
292       return hosts.toArray(new String[hosts.size()]);
293     }
294 
295     /**
296      * Create the input file for the given directories to compact.
297      * The file is a TextFile with each line corrisponding to a
298      * store files directory to compact.
299      */
300     public static void createInputFile(final FileSystem fs, final Path path,
301         final Set<Path> toCompactDirs) throws IOException {
302       // Extract the list of store dirs
303       List<Path> storeDirs = new LinkedList<Path>();
304       for (Path compactDir: toCompactDirs) {
305         if (isFamilyDir(fs, compactDir)) {
306           storeDirs.add(compactDir);
307         } else if (isRegionDir(fs, compactDir)) {
308           for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
309             storeDirs.add(familyDir);
310           }
311         } else if (isTableDir(fs, compactDir)) {
312           // Lookup regions
313           for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
314             for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
315               storeDirs.add(familyDir);
316             }
317           }
318         } else {
319           throw new IOException(
320             "Specified path is not a table, region or family directory. path=" + compactDir);
321         }
322       }
323 
324       // Write Input File
325       FSDataOutputStream stream = fs.create(path);
326       LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
327       try {
328         final byte[] newLine = Bytes.toBytes("\n");
329         for (Path storeDir: storeDirs) {
330           stream.write(Bytes.toBytes(storeDir.toString()));
331           stream.write(newLine);
332         }
333       } finally {
334         stream.close();
335       }
336     }
337   }
338 
339   /**
340    * Execute compaction, using a Map-Reduce job.
341    */
342   private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
343       final boolean compactOnce, final boolean major) throws Exception {
344     Configuration conf = getConf();
345     conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
346     conf.setBoolean(CONF_COMPACT_MAJOR, major);
347 
348     Job job = new Job(conf);
349     job.setJobName("CompactionTool");
350     job.setJarByClass(CompactionTool.class);
351     job.setMapperClass(CompactionMapper.class);
352     job.setInputFormatClass(CompactionInputFormat.class);
353     job.setOutputFormatClass(NullOutputFormat.class);
354     job.setMapSpeculativeExecution(false);
355     job.setNumReduceTasks(0);
356 
357     // add dependencies (including HBase ones)
358     TableMapReduceUtil.addDependencyJars(job);
359 
360     Path stagingDir = JobUtil.getStagingDir(conf);
361     try {
362       // Create input file with the store dirs
363       Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
364       CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
365       CompactionInputFormat.addInputPath(job, inputPath);
366 
367       // Initialize credential for secure cluster
368       TableMapReduceUtil.initCredentials(job);
369 
370       // Start the MR Job and wait
371       return job.waitForCompletion(true) ? 0 : 1;
372     } finally {
373       fs.delete(stagingDir, true);
374     }
375   }
376 
377   /**
378    * Execute compaction, from this client, one path at the time.
379    */
380   private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
381       final boolean compactOnce, final boolean major) throws IOException {
382     CompactionWorker worker = new CompactionWorker(fs, getConf());
383     for (Path path: toCompactDirs) {
384       worker.compact(path, compactOnce, major);
385     }
386     return 0;
387   }
388 
389   @Override
390   public int run(String[] args) throws Exception {
391     Set<Path> toCompactDirs = new HashSet<Path>();
392     boolean compactOnce = false;
393     boolean major = false;
394     boolean mapred = false;
395 
396     Configuration conf = getConf();
397     FileSystem fs = FileSystem.get(conf);
398 
399     try {
400       for (int i = 0; i < args.length; ++i) {
401         String opt = args[i];
402         if (opt.equals("-compactOnce")) {
403           compactOnce = true;
404         } else if (opt.equals("-major")) {
405           major = true;
406         } else if (opt.equals("-mapred")) {
407           mapred = true;
408         } else if (!opt.startsWith("-")) {
409           Path path = new Path(opt);
410           FileStatus status = fs.getFileStatus(path);
411           if (!status.isDirectory()) {
412             printUsage("Specified path is not a directory. path=" + path);
413             return 1;
414           }
415           toCompactDirs.add(path);
416         } else {
417           printUsage();
418         }
419       }
420     } catch (Exception e) {
421       printUsage(e.getMessage());
422       return 1;
423     }
424 
425     if (toCompactDirs.size() == 0) {
426       printUsage("No directories to compact specified.");
427       return 1;
428     }
429 
430     // Execute compaction!
431     if (mapred) {
432       return doMapReduce(fs, toCompactDirs, compactOnce, major);
433     } else {
434       return doClient(fs, toCompactDirs, compactOnce, major);
435     }
436   }
437 
438   private void printUsage() {
439     printUsage(null);
440   }
441 
442   private void printUsage(final String message) {
443     if (message != null && message.length() > 0) {
444       System.err.println(message);
445     }
446     System.err.println("Usage: java " + this.getClass().getName() + " \\");
447     System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
448     System.err.println();
449     System.err.println("Options:");
450     System.err.println(" mapred         Use MapReduce to run compaction.");
451     System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
452     System.err.println(" major          Trigger major compaction.");
453     System.err.println();
454     System.err.println("Note: -D properties will be applied to the conf used. ");
455     System.err.println("For example: ");
456     System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
457     System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
458     System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
459     System.err.println();
460     System.err.println("Examples:");
461     System.err.println(" To compact the full 'TestTable' using MapReduce:");
462     System.err.println(" $ bin/hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable");
463     System.err.println();
464     System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
465     System.err.println(" $ bin/hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x");
466   }
467 
468   public static void main(String[] args) throws Exception {
469     System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
470   }
471 }