View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Set;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.conf.Configured;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.io.LongWritable;
39  import org.apache.hadoop.io.NullWritable;
40  import org.apache.hadoop.io.Text;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.util.LineReader;
43  import org.apache.hadoop.util.Tool;
44  import org.apache.hadoop.util.ToolRunner;
45  
46  import org.apache.hadoop.mapreduce.InputSplit;
47  import org.apache.hadoop.mapreduce.Job;
48  import org.apache.hadoop.mapreduce.JobContext;
49  import org.apache.hadoop.mapreduce.Mapper;
50  import org.apache.hadoop.mapreduce.lib.input.FileSplit;
51  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
52  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
53  
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.regionserver.HRegion;
60  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
61  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64  import org.apache.hadoop.hbase.util.FSTableDescriptors;
65  import org.apache.hadoop.hbase.util.FSUtils;
66  
67  /*
68   * The CompactionTool allows to execute a compaction specifying a:
69   * <ul>
70   *  <li>table folder (all regions and families will be compacted)
71   *  <li>region folder (all families in the region will be compacted)
72   *  <li>family folder (the store files will be compacted)
73   * </ul>
74   */
75  @InterfaceAudience.Public
76  public class CompactionTool extends Configured implements Tool {
77    private static final Log LOG = LogFactory.getLog(CompactionTool.class);
78  
79    private final static String CONF_TMP_DIR = "hbase.tmp.dir";
80    private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
81    private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
82    private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
83    private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
84  
85    /**
86     * Class responsible to execute the Compaction on the specified path.
87     * The path can be a table, region or family directory.
88     */
89    private static class CompactionWorker {
90      private final boolean keepCompactedFiles;
91      private final boolean deleteCompacted;
92      private final Configuration conf;
93      private final FileSystem fs;
94      private final Path tmpDir;
95  
96      public CompactionWorker(final FileSystem fs, final Configuration conf) {
97        this.conf = conf;
98        this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
99        this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
100       this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
101       this.fs = fs;
102     }
103 
104     /**
105      * Execute the compaction on the specified path.
106      *
107      * @param path Directory path on which to run compaction.
108      * @param compactOnce Execute just a single step of compaction.
109      * @param major Request major compaction.
110      */
111     public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
112       if (isFamilyDir(fs, path)) {
113         Path regionDir = path.getParent();
114         Path tableDir = regionDir.getParent();
115         HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
116         HRegion region = loadRegion(fs, conf, htd, regionDir);
117         compactStoreFiles(region, path, compactOnce, major);
118       } else if (isRegionDir(fs, path)) {
119         Path tableDir = path.getParent();
120         HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
121         compactRegion(htd, path, compactOnce, major);
122       } else if (isTableDir(fs, path)) {
123         compactTable(path, compactOnce, major);
124       } else {
125         throw new IOException(
126           "Specified path is not a table, region or family directory. path=" + path);
127       }
128     }
129 
130     private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
131         throws IOException {
132       HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, tableDir);
133       LOG.info("Compact table=" + htd.getNameAsString());
134       for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
135         compactRegion(htd, regionDir, compactOnce, major);
136       }
137     }
138 
139     private void compactRegion(final HTableDescriptor htd, final Path regionDir,
140         final boolean compactOnce, final boolean major) throws IOException {
141       HRegion region = loadRegion(fs, conf, htd, regionDir);
142       LOG.info("Compact table=" + htd.getNameAsString() +
143         " region=" + region.getRegionNameAsString());
144       for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
145         compactStoreFiles(region, familyDir, compactOnce, major);
146       }
147     }
148 
149     /**
150      * Execute the actual compaction job.
151      * If the compact once flag is not specified, execute the compaction until
152      * no more compactions are needed. Uses the Configuration settings provided.
153      */
154     private void compactStoreFiles(final HRegion region, final Path familyDir,
155         final boolean compactOnce, final boolean major) throws IOException {
156       LOG.info("Compact table=" + region.getTableDesc().getNameAsString() +
157         " region=" + region.getRegionNameAsString() +
158         " family=" + familyDir.getName());
159       Store store = getStore(region, familyDir);
160       if (major) {
161         store.triggerMajorCompaction();
162       }
163       do {
164         CompactionRequest cr = store.requestCompaction(Store.PRIORITY_USER, null);
165         StoreFile storeFile = store.compact(cr);
166         if (storeFile != null) {
167           if (keepCompactedFiles && deleteCompacted) {
168             fs.delete(storeFile.getPath(), false);
169           }
170         }
171       } while (store.needsCompaction() && !compactOnce);
172     }
173 
174     /**
175      * Create a "mock" HStore that uses the tmpDir specified by the user and
176      * the store dir to compact as source.
177      */
178     private Store getStore(final HRegion region, final Path storeDir) throws IOException {
179       byte[] familyName = Bytes.toBytes(storeDir.getName());
180       HColumnDescriptor hcd = region.getTableDesc().getFamily(familyName);
181       // Create a Store w/ check of hbase.rootdir blanked out and return our
182       // list of files instead of have Store search its home dir.
183       return new Store(tmpDir, region, hcd, fs, conf) {
184         @Override
185         public FileStatus[] getStoreFiles() throws IOException {
186           return this.fs.listStatus(getHomedir());
187         }
188 
189         @Override
190         Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException {
191           return storeDir;
192         }
193       };
194     }
195 
196     private static HRegion loadRegion(final FileSystem fs, final Configuration conf,
197         final HTableDescriptor htd, final Path regionDir) throws IOException {
198       Path rootDir = regionDir.getParent().getParent();
199       HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir);
200       return HRegion.createHRegion(hri, rootDir, conf, htd, null, false, true);
201     }
202   }
203 
204   private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
205     Path regionInfo = new Path(path, HRegion.REGIONINFO_FILE);
206     return fs.exists(regionInfo);
207   }
208 
209   private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
210     return FSTableDescriptors.getTableInfoPath(fs, path) != null;
211   }
212 
213   private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
214     return isRegionDir(fs, path.getParent());
215   }
216 
217   private static class CompactionMapper
218       extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
219     private CompactionWorker compactor = null;
220     private boolean compactOnce = false;
221     private boolean major = false;
222 
223     @Override
224     public void setup(Context context) {
225       Configuration conf = context.getConfiguration();
226       compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
227       major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
228 
229       try {
230         FileSystem fs = FileSystem.get(conf);
231         this.compactor = new CompactionWorker(fs, conf);
232       } catch (IOException e) {
233         throw new RuntimeException("Could not get the input FileSystem", e);
234       }
235     }
236 
237     @Override
238     public void map(LongWritable key, Text value, Context context)
239         throws InterruptedException, IOException {
240       Path path = new Path(value.toString());
241       this.compactor.compact(path, compactOnce, major);
242     }
243   }
244 
245   /**
246    * Input format that uses store files block location as input split locality.
247    */
248   private static class CompactionInputFormat extends TextInputFormat {
249     @Override
250     protected boolean isSplitable(JobContext context, Path file) {
251       return true;
252     }
253 
254     /**
255      * Returns a split for each store files directory using the block location
256      * of each file as locality reference.
257      */
258     @Override
259     public List<InputSplit> getSplits(JobContext job) throws IOException {
260       List<InputSplit> splits = new ArrayList<InputSplit>();
261       List<FileStatus> files = listStatus(job);
262 
263       Text key = new Text();
264       for (FileStatus file: files) {
265         Path path = file.getPath();
266         FileSystem fs = path.getFileSystem(job.getConfiguration());
267         LineReader reader = new LineReader(fs.open(path));
268         long pos = 0;
269         int n;
270         try {
271           while ((n = reader.readLine(key)) > 0) {
272             String[] hosts = getStoreDirHosts(fs, path);
273             splits.add(new FileSplit(path, pos, n, hosts));
274             pos += n;
275           }
276         } finally {
277           reader.close();
278         }
279       }
280 
281       return splits;
282     }
283 
284     /**
285      * return the top hosts of the store files, used by the Split
286      */
287     private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
288         throws IOException {
289       FileStatus[] files = FSUtils.listStatus(fs, path, null);
290       if (files == null) {
291         return new String[] {};
292       }
293 
294       HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
295       for (FileStatus hfileStatus: files) {
296         HDFSBlocksDistribution storeFileBlocksDistribution =
297           FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
298         hdfsBlocksDistribution.add(storeFileBlocksDistribution);
299       }
300 
301       List<String> hosts = hdfsBlocksDistribution.getTopHosts();
302       return hosts.toArray(new String[hosts.size()]);
303     }
304 
305     /**
306      * Create the input file for the given directories to compact.
307      * The file is a TextFile with each line corrisponding to a
308      * store files directory to compact.
309      */
310     public static void createInputFile(final FileSystem fs, final Path path,
311         final Set<Path> toCompactDirs) throws IOException {
312       // Extract the list of store dirs
313       List<Path> storeDirs = new LinkedList<Path>();
314       for (Path compactDir: toCompactDirs) {
315         if (isFamilyDir(fs, compactDir)) {
316           storeDirs.add(compactDir);
317         } else if (isRegionDir(fs, compactDir)) {
318           for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
319             storeDirs.add(familyDir);
320           }
321         } else if (isTableDir(fs, compactDir)) {
322           // Lookup regions
323           for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
324             for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
325               storeDirs.add(familyDir);
326             }
327           }
328         } else {
329           throw new IOException(
330             "Specified path is not a table, region or family directory. path=" + compactDir);
331         }
332       }
333 
334       // Write Input File
335       FSDataOutputStream stream = fs.create(path);
336       LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
337       try {
338         final byte[] newLine = Bytes.toBytes("\n");
339         for (Path storeDir: storeDirs) {
340           stream.write(Bytes.toBytes(storeDir.toString()));
341           stream.write(newLine);
342         }
343       } finally {
344         stream.close();
345       }
346     }
347   }
348 
349   /**
350    * Execute compaction, using a Map-Reduce job.
351    */
352   private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
353       final boolean compactOnce, final boolean major) throws Exception {
354     Configuration conf = getConf();
355     conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
356     conf.setBoolean(CONF_COMPACT_MAJOR, major);
357 
358     Job job = new Job(conf);
359     job.setJobName("CompactionTool");
360     job.setJarByClass(CompactionTool.class);
361     job.setMapperClass(CompactionMapper.class);
362     job.setInputFormatClass(CompactionInputFormat.class);
363     job.setOutputFormatClass(NullOutputFormat.class);
364     job.setMapSpeculativeExecution(false);
365     job.setNumReduceTasks(0);
366 
367     String stagingName = "compact-" + EnvironmentEdgeManager.currentTimeMillis();
368     Path stagingDir = new Path(conf.get(CONF_TMP_DIR), stagingName);
369     fs.mkdirs(stagingDir);
370     try {
371       // Create input file with the store dirs
372       Path inputPath = new Path(stagingDir, stagingName);
373       CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
374       CompactionInputFormat.addInputPath(job, inputPath);
375 
376       // Initialize credential for secure cluster
377       TableMapReduceUtil.initCredentials(job);
378 
379       // Start the MR Job and wait
380       return job.waitForCompletion(true) ? 0 : 1;
381     } finally {
382       fs.delete(stagingDir, true);
383     }
384   }
385 
386   /**
387    * Execute compaction, from this client, one path at the time.
388    */
389   private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
390       final boolean compactOnce, final boolean major) throws IOException {
391     CompactionWorker worker = new CompactionWorker(fs, getConf());
392     for (Path path: toCompactDirs) {
393       worker.compact(path, compactOnce, major);
394     }
395     return 0;
396   }
397 
398   @Override
399   public int run(String[] args) throws Exception {
400     Set<Path> toCompactDirs = new HashSet<Path>();
401     boolean compactOnce = false;
402     boolean major = false;
403     boolean mapred = false;
404 
405     Configuration conf = getConf();
406     FileSystem fs = FileSystem.get(conf);
407 
408     try {
409       for (int i = 0; i < args.length; ++i) {
410         String opt = args[i];
411         if (opt.equals("-compactOnce")) {
412           compactOnce = true;
413         } else if (opt.equals("-major")) {
414           major = true;
415         } else if (opt.equals("-mapred")) {
416           mapred = true;
417         } else if (!opt.startsWith("-")) {
418           Path path = new Path(opt);
419           FileStatus status = fs.getFileStatus(path);
420           if (!status.isDir()) {
421             printUsage("Specified path is not a directory. path=" + path);
422             return 1;
423           }
424           toCompactDirs.add(path);
425         } else {
426           printUsage();
427         }
428       }
429     } catch (Exception e) {
430       printUsage(e.getMessage());
431       return 1;
432     }
433 
434     if (toCompactDirs.size() == 0) {
435       printUsage("No directories to compact specified.");
436       return 1;
437     }
438 
439     // Execute compaction!
440     if (mapred) {
441       return doMapReduce(fs, toCompactDirs, compactOnce, major);
442     } else {
443       return doClient(fs, toCompactDirs, compactOnce, major);
444     }
445   }
446 
447   private void printUsage() {
448     printUsage(null);
449   }
450 
451   private void printUsage(final String message) {
452     if (message != null && message.length() > 0) {
453       System.err.println(message);
454     }
455     System.err.println("Usage: java " + this.getClass().getName() + " \\");
456     System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
457     System.err.println();
458     System.err.println("Options:");
459     System.err.println(" mapred         Use MapReduce to run compaction.");
460     System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
461     System.err.println(" major          Trigger major compaction.");
462     System.err.println();
463     System.err.println("Note: -D properties will be applied to the conf used. ");
464     System.err.println("For example: ");
465     System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
466     System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
467     System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
468     System.err.println();
469     System.err.println("Examples:");
470     System.err.println(" To compact the full 'TestTable' using MapReduce:");
471     System.err.println(" $ bin/hbase " + this.getClass().getName() + " -mapred hdfs:///hbase/TestTable");
472     System.err.println();
473     System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
474     System.err.println(" $ bin/hbase " + this.getClass().getName() + " hdfs:///hbase/TestTable/abc/x");
475   }
476 
477   public static void main(String[] args) throws Exception {
478     System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
479   }
480 }