001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Optional;
028import java.util.Set;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configured;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseInterfaceAudience;
038import org.apache.hadoop.hbase.HDFSBlocksDistribution;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.mapreduce.JobUtil;
042import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.FSTableDescriptors;
049import org.apache.hadoop.hbase.util.FSUtils;
050import org.apache.hadoop.io.LongWritable;
051import org.apache.hadoop.io.NullWritable;
052import org.apache.hadoop.io.Text;
053import org.apache.hadoop.mapreduce.InputSplit;
054import org.apache.hadoop.mapreduce.Job;
055import org.apache.hadoop.mapreduce.JobContext;
056import org.apache.hadoop.mapreduce.Mapper;
057import org.apache.hadoop.mapreduce.lib.input.FileSplit;
058import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
060import org.apache.hadoop.util.LineReader;
061import org.apache.hadoop.util.Tool;
062import org.apache.hadoop.util.ToolRunner;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/*
068 * The CompactionTool allows to execute a compaction specifying a:
069 * <ul>
070 *  <li>table folder (all regions and families will be compacted)
071 *  <li>region folder (all families in the region will be compacted)
072 *  <li>family folder (the store files will be compacted)
073 * </ul>
074 */
075@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
076public class CompactionTool extends Configured implements Tool {
077  private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class);
078
079  private final static String CONF_TMP_DIR = "hbase.tmp.dir";
080  private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
081  private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
082  private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
083
084  /**
085   * Class responsible to execute the Compaction on the specified path.
086   * The path can be a table, region or family directory.
087   */
088  private static class CompactionWorker {
089    private final boolean deleteCompacted;
090    private final Configuration conf;
091    private final FileSystem fs;
092    private final Path tmpDir;
093
094    public CompactionWorker(final FileSystem fs, final Configuration conf) {
095      this.conf = conf;
096      this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
097      this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
098      this.fs = fs;
099    }
100
101    /**
102     * Execute the compaction on the specified path.
103     *
104     * @param path Directory path on which to run compaction.
105     * @param compactOnce Execute just a single step of compaction.
106     * @param major Request major compaction.
107     */
108    public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
109      if (isFamilyDir(fs, path)) {
110        Path regionDir = path.getParent();
111        Path tableDir = regionDir.getParent();
112        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
113        RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
114        compactStoreFiles(tableDir, htd, hri,
115            path.getName(), compactOnce, major);
116      } else if (isRegionDir(fs, path)) {
117        Path tableDir = path.getParent();
118        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
119        compactRegion(tableDir, htd, path, compactOnce, major);
120      } else if (isTableDir(fs, path)) {
121        compactTable(path, compactOnce, major);
122      } else {
123        throw new IOException(
124          "Specified path is not a table, region or family directory. path=" + path);
125      }
126    }
127
128    private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
129        throws IOException {
130      TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
131      for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
132        compactRegion(tableDir, htd, regionDir, compactOnce, major);
133      }
134    }
135
136    private void compactRegion(final Path tableDir, final TableDescriptor htd,
137        final Path regionDir, final boolean compactOnce, final boolean major)
138        throws IOException {
139      RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
140      for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
141        compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
142      }
143    }
144
145    /**
146     * Execute the actual compaction job.
147     * If the compact once flag is not specified, execute the compaction until
148     * no more compactions are needed. Uses the Configuration settings provided.
149     */
150    private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
151        final RegionInfo hri, final String familyName, final boolean compactOnce,
152        final boolean major) throws IOException {
153      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
154      LOG.info("Compact table=" + htd.getTableName() +
155        " region=" + hri.getRegionNameAsString() +
156        " family=" + familyName);
157      if (major) {
158        store.triggerMajorCompaction();
159      }
160      do {
161        Optional<CompactionContext> compaction =
162            store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
163        if (!compaction.isPresent()) {
164          break;
165        }
166        List<HStoreFile> storeFiles =
167            store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null);
168        if (storeFiles != null && !storeFiles.isEmpty()) {
169          if (deleteCompacted) {
170            for (HStoreFile storeFile: storeFiles) {
171              fs.delete(storeFile.getPath(), false);
172            }
173          }
174        }
175      } while (store.needsCompaction() && !compactOnce);
176      //We need to close the store properly, to make sure it will archive compacted files
177      store.close();
178    }
179
180    /**
181     * Create a "mock" HStore that uses the tmpDir specified by the user and
182     * the store dir to compact as source.
183     */
184    private static HStore getStore(final Configuration conf, final FileSystem fs,
185        final Path tableDir, final TableDescriptor htd, final RegionInfo hri,
186        final String familyName, final Path tempDir) throws IOException {
187      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
188        @Override
189        public Path getTempDir() {
190          return tempDir;
191        }
192      };
193      HRegion region = new HRegion(regionFs, null, conf, htd, null);
194      return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
195    }
196  }
197
198  private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
199    Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
200    return fs.exists(regionInfo);
201  }
202
203  private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
204    return FSTableDescriptors.getTableInfoPath(fs, path) != null;
205  }
206
207  private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
208    return isRegionDir(fs, path.getParent());
209  }
210
211  private static class CompactionMapper
212      extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
213    private CompactionWorker compactor = null;
214    private boolean compactOnce = false;
215    private boolean major = false;
216
217    @Override
218    public void setup(Context context) {
219      Configuration conf = context.getConfiguration();
220      compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
221      major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
222
223      try {
224        FileSystem fs = FileSystem.get(conf);
225        this.compactor = new CompactionWorker(fs, conf);
226      } catch (IOException e) {
227        throw new RuntimeException("Could not get the input FileSystem", e);
228      }
229    }
230
231    @Override
232    public void map(LongWritable key, Text value, Context context)
233        throws InterruptedException, IOException {
234      Path path = new Path(value.toString());
235      this.compactor.compact(path, compactOnce, major);
236    }
237  }
238
239  /**
240   * Input format that uses store files block location as input split locality.
241   */
242  private static class CompactionInputFormat extends TextInputFormat {
243    @Override
244    protected boolean isSplitable(JobContext context, Path file) {
245      return true;
246    }
247
248    /**
249     * Returns a split for each store files directory using the block location
250     * of each file as locality reference.
251     */
252    @Override
253    public List<InputSplit> getSplits(JobContext job) throws IOException {
254      List<InputSplit> splits = new ArrayList<>();
255      List<FileStatus> files = listStatus(job);
256
257      Text key = new Text();
258      for (FileStatus file: files) {
259        Path path = file.getPath();
260        FileSystem fs = path.getFileSystem(job.getConfiguration());
261        LineReader reader = new LineReader(fs.open(path));
262        long pos = 0;
263        int n;
264        try {
265          while ((n = reader.readLine(key)) > 0) {
266            String[] hosts = getStoreDirHosts(fs, path);
267            splits.add(new FileSplit(path, pos, n, hosts));
268            pos += n;
269          }
270        } finally {
271          reader.close();
272        }
273      }
274
275      return splits;
276    }
277
278    /**
279     * return the top hosts of the store files, used by the Split
280     */
281    private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
282        throws IOException {
283      FileStatus[] files = FSUtils.listStatus(fs, path);
284      if (files == null) {
285        return new String[] {};
286      }
287
288      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
289      for (FileStatus hfileStatus: files) {
290        HDFSBlocksDistribution storeFileBlocksDistribution =
291          FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
292        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
293      }
294
295      List<String> hosts = hdfsBlocksDistribution.getTopHosts();
296      return hosts.toArray(new String[hosts.size()]);
297    }
298
299    /**
300     * Create the input file for the given directories to compact.
301     * The file is a TextFile with each line corrisponding to a
302     * store files directory to compact.
303     */
304    public static void createInputFile(final FileSystem fs, final Path path,
305        final Set<Path> toCompactDirs) throws IOException {
306      // Extract the list of store dirs
307      List<Path> storeDirs = new LinkedList<>();
308      for (Path compactDir: toCompactDirs) {
309        if (isFamilyDir(fs, compactDir)) {
310          storeDirs.add(compactDir);
311        } else if (isRegionDir(fs, compactDir)) {
312          for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
313            storeDirs.add(familyDir);
314          }
315        } else if (isTableDir(fs, compactDir)) {
316          // Lookup regions
317          for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
318            for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
319              storeDirs.add(familyDir);
320            }
321          }
322        } else {
323          throw new IOException(
324            "Specified path is not a table, region or family directory. path=" + compactDir);
325        }
326      }
327
328      // Write Input File
329      FSDataOutputStream stream = fs.create(path);
330      LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
331      try {
332        final byte[] newLine = Bytes.toBytes("\n");
333        for (Path storeDir: storeDirs) {
334          stream.write(Bytes.toBytes(storeDir.toString()));
335          stream.write(newLine);
336        }
337      } finally {
338        stream.close();
339      }
340    }
341  }
342
343  /**
344   * Execute compaction, using a Map-Reduce job.
345   */
346  private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
347      final boolean compactOnce, final boolean major) throws Exception {
348    Configuration conf = getConf();
349    conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
350    conf.setBoolean(CONF_COMPACT_MAJOR, major);
351
352    Job job = new Job(conf);
353    job.setJobName("CompactionTool");
354    job.setJarByClass(CompactionTool.class);
355    job.setMapperClass(CompactionMapper.class);
356    job.setInputFormatClass(CompactionInputFormat.class);
357    job.setOutputFormatClass(NullOutputFormat.class);
358    job.setMapSpeculativeExecution(false);
359    job.setNumReduceTasks(0);
360
361    // add dependencies (including HBase ones)
362    TableMapReduceUtil.addDependencyJars(job);
363
364    Path stagingDir = JobUtil.getStagingDir(conf);
365    try {
366      // Create input file with the store dirs
367      Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
368      CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
369      CompactionInputFormat.addInputPath(job, inputPath);
370
371      // Initialize credential for secure cluster
372      TableMapReduceUtil.initCredentials(job);
373
374      // Start the MR Job and wait
375      return job.waitForCompletion(true) ? 0 : 1;
376    } finally {
377      fs.delete(stagingDir, true);
378    }
379  }
380
381  /**
382   * Execute compaction, from this client, one path at the time.
383   */
384  private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
385      final boolean compactOnce, final boolean major) throws IOException {
386    CompactionWorker worker = new CompactionWorker(fs, getConf());
387    for (Path path: toCompactDirs) {
388      worker.compact(path, compactOnce, major);
389    }
390    return 0;
391  }
392
393  @Override
394  public int run(String[] args) throws Exception {
395    Set<Path> toCompactDirs = new HashSet<>();
396    boolean compactOnce = false;
397    boolean major = false;
398    boolean mapred = false;
399
400    Configuration conf = getConf();
401    FileSystem fs = FileSystem.get(conf);
402
403    try {
404      for (int i = 0; i < args.length; ++i) {
405        String opt = args[i];
406        if (opt.equals("-compactOnce")) {
407          compactOnce = true;
408        } else if (opt.equals("-major")) {
409          major = true;
410        } else if (opt.equals("-mapred")) {
411          mapred = true;
412        } else if (!opt.startsWith("-")) {
413          Path path = new Path(opt);
414          FileStatus status = fs.getFileStatus(path);
415          if (!status.isDirectory()) {
416            printUsage("Specified path is not a directory. path=" + path);
417            return 1;
418          }
419          toCompactDirs.add(path);
420        } else {
421          printUsage();
422        }
423      }
424    } catch (Exception e) {
425      printUsage(e.getMessage());
426      return 1;
427    }
428
429    if (toCompactDirs.isEmpty()) {
430      printUsage("No directories to compact specified.");
431      return 1;
432    }
433
434    // Execute compaction!
435    if (mapred) {
436      return doMapReduce(fs, toCompactDirs, compactOnce, major);
437    } else {
438      return doClient(fs, toCompactDirs, compactOnce, major);
439    }
440  }
441
442  private void printUsage() {
443    printUsage(null);
444  }
445
446  private void printUsage(final String message) {
447    if (message != null && message.length() > 0) {
448      System.err.println(message);
449    }
450    System.err.println("Usage: java " + this.getClass().getName() + " \\");
451    System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
452    System.err.println();
453    System.err.println("Options:");
454    System.err.println(" mapred         Use MapReduce to run compaction.");
455    System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
456    System.err.println(" major          Trigger major compaction.");
457    System.err.println();
458    System.err.println("Note: -D properties will be applied to the conf used. ");
459    System.err.println("For example: ");
460    System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
461    System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
462    System.err.println();
463    System.err.println("Examples:");
464    System.err.println(" To compact the full 'TestTable' using MapReduce:");
465    System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable");
466    System.err.println();
467    System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
468    System.err.println(" $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x");
469  }
470
471  public static void main(String[] args) throws Exception {
472    System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
473  }
474}