001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Optional;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HDFSBlocksDistribution;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.mapreduce.JobUtil;
041import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
044import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.FSTableDescriptors;
049import org.apache.hadoop.hbase.util.FSUtils;
050import org.apache.hadoop.io.LongWritable;
051import org.apache.hadoop.io.NullWritable;
052import org.apache.hadoop.io.Text;
053import org.apache.hadoop.mapreduce.InputSplit;
054import org.apache.hadoop.mapreduce.Job;
055import org.apache.hadoop.mapreduce.JobContext;
056import org.apache.hadoop.mapreduce.Mapper;
057import org.apache.hadoop.mapreduce.lib.input.FileSplit;
058import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
060import org.apache.hadoop.mapreduce.security.TokenCache;
061import org.apache.hadoop.util.LineReader;
062import org.apache.hadoop.util.Tool;
063import org.apache.hadoop.util.ToolRunner;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/*
069 * The CompactionTool allows to execute a compaction specifying a:
070 * <ul>
071 *  <li>table folder (all regions and families will be compacted)
072 *  <li>region folder (all families in the region will be compacted)
073 *  <li>family folder (the store files will be compacted)
074 * </ul>
075 */
076@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
077public class CompactionTool extends Configured implements Tool {
078  private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class);
079
080  private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
081  private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
082  private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
083
084  /**
085   * Class responsible to execute the Compaction on the specified path. The path can be a table,
086   * region or family directory.
087   */
088  private static class CompactionWorker {
089    private final boolean deleteCompacted;
090    private final Configuration conf;
091    private final FileSystem fs;
092
093    public CompactionWorker(final FileSystem fs, final Configuration conf) {
094      this.conf = conf;
095      this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
096      this.fs = fs;
097    }
098
099    /**
100     * Execute the compaction on the specified path.
101     * @param path        Directory path on which to run compaction.
102     * @param compactOnce Execute just a single step of compaction.
103     * @param major       Request major compaction.
104     */
105    public void compact(final Path path, final boolean compactOnce, final boolean major)
106      throws IOException {
107      if (isFamilyDir(fs, path)) {
108        Path regionDir = path.getParent();
109        Path tableDir = regionDir.getParent();
110        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
111        RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
112        compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce, major);
113      } else if (isRegionDir(fs, path)) {
114        Path tableDir = path.getParent();
115        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
116        compactRegion(tableDir, htd, path, compactOnce, major);
117      } else if (isTableDir(fs, path)) {
118        compactTable(path, compactOnce, major);
119      } else {
120        throw new IOException(
121          "Specified path is not a table, region or family directory. path=" + path);
122      }
123    }
124
125    private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
126      throws IOException {
127      TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
128      for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) {
129        compactRegion(tableDir, htd, regionDir, compactOnce, major);
130      }
131    }
132
133    private void compactRegion(final Path tableDir, final TableDescriptor htd, final Path regionDir,
134      final boolean compactOnce, final boolean major) throws IOException {
135      RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
136      for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
137        compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
138      }
139    }
140
141    /**
142     * Execute the actual compaction job. If the compact once flag is not specified, execute the
143     * compaction until no more compactions are needed. Uses the Configuration settings provided.
144     */
145    private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
146      final RegionInfo hri, final String familyName, final boolean compactOnce, final boolean major)
147      throws IOException {
148      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName);
149      LOG.info("Compact table=" + htd.getTableName() + " region=" + hri.getRegionNameAsString()
150        + " family=" + familyName);
151      if (major) {
152        store.triggerMajorCompaction();
153      }
154      do {
155        Optional<CompactionContext> compaction =
156          store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
157        if (!compaction.isPresent()) {
158          break;
159        }
160        List<HStoreFile> storeFiles =
161          store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null);
162        if (storeFiles != null && !storeFiles.isEmpty()) {
163          if (deleteCompacted) {
164            for (HStoreFile storeFile : storeFiles) {
165              fs.delete(storeFile.getPath(), false);
166            }
167          }
168        }
169      } while (store.needsCompaction() && !compactOnce);
170      // We need to close the store properly, to make sure it will archive compacted files
171      store.close();
172    }
173
174    private static HStore getStore(final Configuration conf, final FileSystem fs,
175      final Path tableDir, final TableDescriptor htd, final RegionInfo hri, final String familyName)
176      throws IOException {
177      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri);
178      HRegion region = new HRegion(regionFs, null, conf, htd, null);
179      return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
180    }
181  }
182
183  private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
184    Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
185    return fs.exists(regionInfo);
186  }
187
188  private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
189    return FSTableDescriptors.isTableDir(fs, path);
190  }
191
192  private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
193    return isRegionDir(fs, path.getParent());
194  }
195
196  private static class CompactionMapper
197    extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
198    private CompactionWorker compactor = null;
199    private boolean compactOnce = false;
200    private boolean major = false;
201
202    @Override
203    public void setup(Context context) {
204      Configuration conf = context.getConfiguration();
205      compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
206      major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
207
208      try {
209        FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
210        this.compactor = new CompactionWorker(fs, conf);
211      } catch (IOException e) {
212        throw new RuntimeException("Could not get the input FileSystem", e);
213      }
214    }
215
216    @Override
217    public void map(LongWritable key, Text value, Context context)
218      throws InterruptedException, IOException {
219      Path path = new Path(value.toString());
220      this.compactor.compact(path, compactOnce, major);
221    }
222  }
223
224  /**
225   * Input format that uses store files block location as input split locality.
226   */
227  private static class CompactionInputFormat extends TextInputFormat {
228    @Override
229    protected boolean isSplitable(JobContext context, Path file) {
230      return true;
231    }
232
233    /**
234     * Returns a split for each store files directory using the block location of each file as
235     * locality reference.
236     */
237    @Override
238    public List<InputSplit> getSplits(JobContext job) throws IOException {
239      List<InputSplit> splits = new ArrayList<>();
240      List<FileStatus> files = listStatus(job);
241
242      Text key = new Text();
243      for (FileStatus file : files) {
244        Path path = file.getPath();
245        FileSystem fs = path.getFileSystem(job.getConfiguration());
246        LineReader reader = new LineReader(fs.open(path));
247        long pos = 0;
248        int n;
249        try {
250          while ((n = reader.readLine(key)) > 0) {
251            String[] hosts = getStoreDirHosts(fs, path);
252            splits.add(new FileSplit(path, pos, n, hosts));
253            pos += n;
254          }
255        } finally {
256          reader.close();
257        }
258      }
259
260      return splits;
261    }
262
263    /**
264     * return the top hosts of the store files, used by the Split
265     */
266    private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
267      throws IOException {
268      FileStatus[] files = CommonFSUtils.listStatus(fs, path);
269      if (files == null) {
270        return new String[] {};
271      }
272
273      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
274      for (FileStatus hfileStatus : files) {
275        HDFSBlocksDistribution storeFileBlocksDistribution =
276          FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
277        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
278      }
279
280      List<String> hosts = hdfsBlocksDistribution.getTopHosts();
281      return hosts.toArray(new String[hosts.size()]);
282    }
283
284    /**
285     * Create the input file for the given directories to compact. The file is a TextFile with each
286     * line corrisponding to a store files directory to compact.
287     */
288    public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs,
289      final Path path, final Set<Path> toCompactDirs) throws IOException {
290      // Extract the list of store dirs
291      List<Path> storeDirs = new LinkedList<>();
292      for (Path compactDir : toCompactDirs) {
293        if (isFamilyDir(fs, compactDir)) {
294          storeDirs.add(compactDir);
295        } else if (isRegionDir(fs, compactDir)) {
296          storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir));
297        } else if (isTableDir(fs, compactDir)) {
298          // Lookup regions
299          for (Path regionDir : FSUtils.getRegionDirs(fs, compactDir)) {
300            storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir));
301          }
302        } else {
303          throw new IOException(
304            "Specified path is not a table, region or family directory. path=" + compactDir);
305        }
306      }
307
308      // Write Input File
309      FSDataOutputStream stream = stagingFs.create(path);
310      LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
311      try {
312        final byte[] newLine = Bytes.toBytes("\n");
313        for (Path storeDir : storeDirs) {
314          stream.write(Bytes.toBytes(storeDir.toString()));
315          stream.write(newLine);
316        }
317      } finally {
318        stream.close();
319      }
320      return storeDirs;
321    }
322  }
323
324  /**
325   * Execute compaction, using a Map-Reduce job.
326   */
327  private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
328    final boolean compactOnce, final boolean major) throws Exception {
329    Configuration conf = getConf();
330    conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
331    conf.setBoolean(CONF_COMPACT_MAJOR, major);
332
333    Job job = new Job(conf);
334    job.setJobName("CompactionTool");
335    job.setJarByClass(CompactionTool.class);
336    job.setMapperClass(CompactionMapper.class);
337    job.setInputFormatClass(CompactionInputFormat.class);
338    job.setOutputFormatClass(NullOutputFormat.class);
339    job.setMapSpeculativeExecution(false);
340    job.setNumReduceTasks(0);
341
342    // add dependencies (including HBase ones)
343    TableMapReduceUtil.addDependencyJars(job);
344
345    Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
346    FileSystem stagingFs = stagingDir.getFileSystem(conf);
347    try {
348      // Create input file with the store dirs
349      Path inputPath = new Path(stagingDir, "compact-" + EnvironmentEdgeManager.currentTime());
350      List<Path> storeDirs =
351        CompactionInputFormat.createInputFile(fs, stagingFs, inputPath, toCompactDirs);
352      CompactionInputFormat.addInputPath(job, inputPath);
353
354      // Initialize credential for secure cluster
355      TableMapReduceUtil.initCredentials(job);
356      // Despite the method name this will get delegation token for the filesystem
357      TokenCache.obtainTokensForNamenodes(job.getCredentials(), storeDirs.toArray(new Path[0]),
358        conf);
359
360      // Start the MR Job and wait
361      return job.waitForCompletion(true) ? 0 : 1;
362    } finally {
363      fs.delete(stagingDir, true);
364    }
365  }
366
367  /**
368   * Execute compaction, from this client, one path at the time.
369   */
370  private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
371    final boolean compactOnce, final boolean major) throws IOException {
372    CompactionWorker worker = new CompactionWorker(fs, getConf());
373    for (Path path : toCompactDirs) {
374      worker.compact(path, compactOnce, major);
375    }
376    return 0;
377  }
378
379  @Override
380  public int run(String[] args) throws Exception {
381    Set<Path> toCompactDirs = new HashSet<>();
382    boolean compactOnce = false;
383    boolean major = false;
384    boolean mapred = false;
385
386    Configuration conf = getConf();
387    FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
388
389    try {
390      for (int i = 0; i < args.length; ++i) {
391        String opt = args[i];
392        if (opt.equals("-compactOnce")) {
393          compactOnce = true;
394        } else if (opt.equals("-major")) {
395          major = true;
396        } else if (opt.equals("-mapred")) {
397          mapred = true;
398        } else if (!opt.startsWith("-")) {
399          Path path = new Path(opt);
400          FileStatus status = fs.getFileStatus(path);
401          if (!status.isDirectory()) {
402            printUsage("Specified path is not a directory. path=" + path);
403            return 1;
404          }
405          toCompactDirs.add(path);
406        } else {
407          printUsage();
408        }
409      }
410    } catch (Exception e) {
411      printUsage(e.getMessage());
412      return 1;
413    }
414
415    if (toCompactDirs.isEmpty()) {
416      printUsage("No directories to compact specified.");
417      return 1;
418    }
419
420    // Execute compaction!
421    if (mapred) {
422      return doMapReduce(fs, toCompactDirs, compactOnce, major);
423    } else {
424      return doClient(fs, toCompactDirs, compactOnce, major);
425    }
426  }
427
428  private void printUsage() {
429    printUsage(null);
430  }
431
432  private void printUsage(final String message) {
433    if (message != null && message.length() > 0) {
434      System.err.println(message);
435    }
436    System.err.println("Usage: java " + this.getClass().getName() + " \\");
437    System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
438    System.err.println();
439    System.err.println("Options:");
440    System.err.println(" mapred         Use MapReduce to run compaction.");
441    System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
442    System.err.println(" major          Trigger major compaction.");
443    System.err.println();
444    System.err.println("Note: -D properties will be applied to the conf used. ");
445    System.err.println("For example: ");
446    System.err
447      .println(" To stop delete of compacted file, pass -D" + CONF_DELETE_COMPACTED + "=false");
448    System.err.println();
449    System.err.println("Examples:");
450    System.err.println(" To compact the full 'TestTable' using MapReduce:");
451    System.err.println(
452      " $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable");
453    System.err.println();
454    System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
455    System.err.println(
456      " $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x");
457  }
458
459  public static void main(String[] args) throws Exception {
460    System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
461  }
462}