View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Set;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.conf.Configured;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.TableDescriptor;
39  import org.apache.hadoop.io.LongWritable;
40  import org.apache.hadoop.io.NullWritable;
41  import org.apache.hadoop.io.Text;
42  import org.apache.hadoop.util.LineReader;
43  import org.apache.hadoop.util.Tool;
44  import org.apache.hadoop.util.ToolRunner;
45  
46  import org.apache.hadoop.mapreduce.InputSplit;
47  import org.apache.hadoop.mapreduce.Job;
48  import org.apache.hadoop.mapreduce.JobContext;
49  import org.apache.hadoop.mapreduce.Mapper;
50  import org.apache.hadoop.mapreduce.lib.input.FileSplit;
51  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
52  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
53  
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.regionserver.HRegion;
59  import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
60  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
61  import org.apache.hadoop.hbase.mapreduce.JobUtil;
62  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
63  import org.apache.hadoop.hbase.util.Bytes;
64  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65  import org.apache.hadoop.hbase.util.FSTableDescriptors;
66  import org.apache.hadoop.hbase.util.FSUtils;
67  
68  /*
69   * The CompactionTool allows to execute a compaction specifying a:
70   * <ul>
71   *  <li>table folder (all regions and families will be compacted)
72   *  <li>region folder (all families in the region will be compacted)
73   *  <li>family folder (the store files will be compacted)
74   * </ul>
75   */
76  @InterfaceAudience.Public
77  public class CompactionTool extends Configured implements Tool {
78    private static final Log LOG = LogFactory.getLog(CompactionTool.class);
79  
80    private final static String CONF_TMP_DIR = "hbase.tmp.dir";
81    private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
82    private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
83    private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
84    private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
85  
86    /**
87     * Class responsible to execute the Compaction on the specified path.
88     * The path can be a table, region or family directory.
89     */
90    private static class CompactionWorker {
91      private final boolean keepCompactedFiles;
92      private final boolean deleteCompacted;
93      private final Configuration conf;
94      private final FileSystem fs;
95      private final Path tmpDir;
96  
97      public CompactionWorker(final FileSystem fs, final Configuration conf) {
98        this.conf = conf;
99        this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
100       this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
101       this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
102       this.fs = fs;
103     }
104 
105     /**
106      * Execute the compaction on the specified path.
107      *
108      * @param path Directory path on which to run compaction.
109      * @param compactOnce Execute just a single step of compaction.
110      * @param major Request major compaction.
111      */
112     public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
113       if (isFamilyDir(fs, path)) {
114         Path regionDir = path.getParent();
115         Path tableDir = regionDir.getParent();
116         TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
117         HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
118         compactStoreFiles(tableDir, htd.getHTableDescriptor(), hri,
119             path.getName(), compactOnce, major);
120       } else if (isRegionDir(fs, path)) {
121         Path tableDir = path.getParent();
122         TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
123         compactRegion(tableDir, htd.getHTableDescriptor(), path, compactOnce, major);
124       } else if (isTableDir(fs, path)) {
125         compactTable(path, compactOnce, major);
126       } else {
127         throw new IOException(
128           "Specified path is not a table, region or family directory. path=" + path);
129       }
130     }
131 
132     private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
133         throws IOException {
134       TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
135       for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
136         compactRegion(tableDir, htd.getHTableDescriptor(), regionDir, compactOnce, major);
137       }
138     }
139 
140     private void compactRegion(final Path tableDir, final HTableDescriptor htd,
141         final Path regionDir, final boolean compactOnce, final boolean major)
142         throws IOException {
143       HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
144       for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
145         compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
146       }
147     }
148 
149     /**
150      * Execute the actual compaction job.
151      * If the compact once flag is not specified, execute the compaction until
152      * no more compactions are needed. Uses the Configuration settings provided.
153      */
154     private void compactStoreFiles(final Path tableDir, final HTableDescriptor htd,
155         final HRegionInfo hri, final String familyName, final boolean compactOnce,
156         final boolean major) throws IOException {
157       HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
158       LOG.info("Compact table=" + htd.getTableName() +
159         " region=" + hri.getRegionNameAsString() +
160         " family=" + familyName);
161       if (major) {
162         store.triggerMajorCompaction();
163       }
164       do {
165         CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
166         if (compaction == null) break;
167         List<StoreFile> storeFiles = store.compact(compaction);
168         if (storeFiles != null && !storeFiles.isEmpty()) {
169           if (keepCompactedFiles && deleteCompacted) {
170             for (StoreFile storeFile: storeFiles) {
171               fs.delete(storeFile.getPath(), false);
172             }
173           }
174         }
175       } while (store.needsCompaction() && !compactOnce);
176     }
177 
178     /**
179      * Create a "mock" HStore that uses the tmpDir specified by the user and
180      * the store dir to compact as source.
181      */
182     private static HStore getStore(final Configuration conf, final FileSystem fs,
183         final Path tableDir, final HTableDescriptor htd, final HRegionInfo hri,
184         final String familyName, final Path tempDir) throws IOException {
185       HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
186         @Override
187         public Path getTempDir() {
188           return tempDir;
189         }
190       };
191       HRegion region = new HRegion(regionFs, null, conf, htd, null);
192       return new HStore(region, htd.getFamily(Bytes.toBytes(familyName)), conf);
193     }
194   }
195 
196   private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
197     Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
198     return fs.exists(regionInfo);
199   }
200 
201   private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
202     return FSTableDescriptors.getTableInfoPath(fs, path) != null;
203   }
204 
205   private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
206     return isRegionDir(fs, path.getParent());
207   }
208 
209   private static class CompactionMapper
210       extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
211     private CompactionWorker compactor = null;
212     private boolean compactOnce = false;
213     private boolean major = false;
214 
215     @Override
216     public void setup(Context context) {
217       Configuration conf = context.getConfiguration();
218       compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
219       major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
220 
221       try {
222         FileSystem fs = FileSystem.get(conf);
223         this.compactor = new CompactionWorker(fs, conf);
224       } catch (IOException e) {
225         throw new RuntimeException("Could not get the input FileSystem", e);
226       }
227     }
228 
229     @Override
230     public void map(LongWritable key, Text value, Context context)
231         throws InterruptedException, IOException {
232       Path path = new Path(value.toString());
233       this.compactor.compact(path, compactOnce, major);
234     }
235   }
236 
237   /**
238    * Input format that uses store files block location as input split locality.
239    */
240   private static class CompactionInputFormat extends TextInputFormat {
241     @Override
242     protected boolean isSplitable(JobContext context, Path file) {
243       return true;
244     }
245 
246     /**
247      * Returns a split for each store files directory using the block location
248      * of each file as locality reference.
249      */
250     @Override
251     public List<InputSplit> getSplits(JobContext job) throws IOException {
252       List<InputSplit> splits = new ArrayList<InputSplit>();
253       List<FileStatus> files = listStatus(job);
254 
255       Text key = new Text();
256       for (FileStatus file: files) {
257         Path path = file.getPath();
258         FileSystem fs = path.getFileSystem(job.getConfiguration());
259         LineReader reader = new LineReader(fs.open(path));
260         long pos = 0;
261         int n;
262         try {
263           while ((n = reader.readLine(key)) > 0) {
264             String[] hosts = getStoreDirHosts(fs, path);
265             splits.add(new FileSplit(path, pos, n, hosts));
266             pos += n;
267           }
268         } finally {
269           reader.close();
270         }
271       }
272 
273       return splits;
274     }
275 
276     /**
277      * return the top hosts of the store files, used by the Split
278      */
279     private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
280         throws IOException {
281       FileStatus[] files = FSUtils.listStatus(fs, path);
282       if (files == null) {
283         return new String[] {};
284       }
285 
286       HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
287       for (FileStatus hfileStatus: files) {
288         HDFSBlocksDistribution storeFileBlocksDistribution =
289           FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
290         hdfsBlocksDistribution.add(storeFileBlocksDistribution);
291       }
292 
293       List<String> hosts = hdfsBlocksDistribution.getTopHosts();
294       return hosts.toArray(new String[hosts.size()]);
295     }
296 
297     /**
298      * Create the input file for the given directories to compact.
299      * The file is a TextFile with each line corrisponding to a
300      * store files directory to compact.
301      */
302     public static void createInputFile(final FileSystem fs, final Path path,
303         final Set<Path> toCompactDirs) throws IOException {
304       // Extract the list of store dirs
305       List<Path> storeDirs = new LinkedList<Path>();
306       for (Path compactDir: toCompactDirs) {
307         if (isFamilyDir(fs, compactDir)) {
308           storeDirs.add(compactDir);
309         } else if (isRegionDir(fs, compactDir)) {
310           for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
311             storeDirs.add(familyDir);
312           }
313         } else if (isTableDir(fs, compactDir)) {
314           // Lookup regions
315           for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
316             for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
317               storeDirs.add(familyDir);
318             }
319           }
320         } else {
321           throw new IOException(
322             "Specified path is not a table, region or family directory. path=" + compactDir);
323         }
324       }
325 
326       // Write Input File
327       FSDataOutputStream stream = fs.create(path);
328       LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
329       try {
330         final byte[] newLine = Bytes.toBytes("\n");
331         for (Path storeDir: storeDirs) {
332           stream.write(Bytes.toBytes(storeDir.toString()));
333           stream.write(newLine);
334         }
335       } finally {
336         stream.close();
337       }
338     }
339   }
340 
341   /**
342    * Execute compaction, using a Map-Reduce job.
343    */
344   private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
345       final boolean compactOnce, final boolean major) throws Exception {
346     Configuration conf = getConf();
347     conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
348     conf.setBoolean(CONF_COMPACT_MAJOR, major);
349 
350     Job job = new Job(conf);
351     job.setJobName("CompactionTool");
352     job.setJarByClass(CompactionTool.class);
353     job.setMapperClass(CompactionMapper.class);
354     job.setInputFormatClass(CompactionInputFormat.class);
355     job.setOutputFormatClass(NullOutputFormat.class);
356     job.setMapSpeculativeExecution(false);
357     job.setNumReduceTasks(0);
358 
359     // add dependencies (including HBase ones)
360     TableMapReduceUtil.addDependencyJars(job);
361 
362     Path stagingDir = JobUtil.getStagingDir(conf);
363     try {
364       // Create input file with the store dirs
365       Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
366       CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
367       CompactionInputFormat.addInputPath(job, inputPath);
368 
369       // Initialize credential for secure cluster
370       TableMapReduceUtil.initCredentials(job);
371 
372       // Start the MR Job and wait
373       return job.waitForCompletion(true) ? 0 : 1;
374     } finally {
375       fs.delete(stagingDir, true);
376     }
377   }
378 
379   /**
380    * Execute compaction, from this client, one path at the time.
381    */
382   private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
383       final boolean compactOnce, final boolean major) throws IOException {
384     CompactionWorker worker = new CompactionWorker(fs, getConf());
385     for (Path path: toCompactDirs) {
386       worker.compact(path, compactOnce, major);
387     }
388     return 0;
389   }
390 
391   @Override
392   public int run(String[] args) throws Exception {
393     Set<Path> toCompactDirs = new HashSet<Path>();
394     boolean compactOnce = false;
395     boolean major = false;
396     boolean mapred = false;
397 
398     Configuration conf = getConf();
399     FileSystem fs = FileSystem.get(conf);
400 
401     try {
402       for (int i = 0; i < args.length; ++i) {
403         String opt = args[i];
404         if (opt.equals("-compactOnce")) {
405           compactOnce = true;
406         } else if (opt.equals("-major")) {
407           major = true;
408         } else if (opt.equals("-mapred")) {
409           mapred = true;
410         } else if (!opt.startsWith("-")) {
411           Path path = new Path(opt);
412           FileStatus status = fs.getFileStatus(path);
413           if (!status.isDirectory()) {
414             printUsage("Specified path is not a directory. path=" + path);
415             return 1;
416           }
417           toCompactDirs.add(path);
418         } else {
419           printUsage();
420         }
421       }
422     } catch (Exception e) {
423       printUsage(e.getMessage());
424       return 1;
425     }
426 
427     if (toCompactDirs.size() == 0) {
428       printUsage("No directories to compact specified.");
429       return 1;
430     }
431 
432     // Execute compaction!
433     if (mapred) {
434       return doMapReduce(fs, toCompactDirs, compactOnce, major);
435     } else {
436       return doClient(fs, toCompactDirs, compactOnce, major);
437     }
438   }
439 
440   private void printUsage() {
441     printUsage(null);
442   }
443 
444   private void printUsage(final String message) {
445     if (message != null && message.length() > 0) {
446       System.err.println(message);
447     }
448     System.err.println("Usage: java " + this.getClass().getName() + " \\");
449     System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
450     System.err.println();
451     System.err.println("Options:");
452     System.err.println(" mapred         Use MapReduce to run compaction.");
453     System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
454     System.err.println(" major          Trigger major compaction.");
455     System.err.println();
456     System.err.println("Note: -D properties will be applied to the conf used. ");
457     System.err.println("For example: ");
458     System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
459     System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
460     System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
461     System.err.println();
462     System.err.println("Examples:");
463     System.err.println(" To compact the full 'TestTable' using MapReduce:");
464     System.err.println(" $ bin/hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/data/default/TestTable");
465     System.err.println();
466     System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
467     System.err.println(" $ bin/hbase " + this.getClass().getName() + " hdfs:///hbase/data/default/TestTable/abc/x");
468   }
469 
470   public static void main(String[] args) throws Exception {
471     System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
472   }
473 }