001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Optional;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HDFSBlocksDistribution;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.mapreduce.JobUtil;
041import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
044import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.FSTableDescriptors;
049import org.apache.hadoop.hbase.util.FSUtils;
050import org.apache.hadoop.io.LongWritable;
051import org.apache.hadoop.io.NullWritable;
052import org.apache.hadoop.io.Text;
053import org.apache.hadoop.mapreduce.InputSplit;
054import org.apache.hadoop.mapreduce.Job;
055import org.apache.hadoop.mapreduce.JobContext;
056import org.apache.hadoop.mapreduce.Mapper;
057import org.apache.hadoop.mapreduce.lib.input.FileSplit;
058import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
060import org.apache.hadoop.mapreduce.security.TokenCache;
061import org.apache.hadoop.util.LineReader;
062import org.apache.hadoop.util.Tool;
063import org.apache.hadoop.util.ToolRunner;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/*
069 * The CompactionTool allows to execute a compaction specifying a:
070 * <ul>
071 *  <li>table folder (all regions and families will be compacted)
072 *  <li>region folder (all families in the region will be compacted)
073 *  <li>family folder (the store files will be compacted)
074 * </ul>
075 */
076@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
077public class CompactionTool extends Configured implements Tool {
078  private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class);
079
080  private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
081  private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
082  private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
083
084  /**
085   * Class responsible to execute the Compaction on the specified path.
086   * The path can be a table, region or family directory.
087   */
088  private static class CompactionWorker {
089    private final boolean deleteCompacted;
090    private final Configuration conf;
091    private final FileSystem fs;
092
093    public CompactionWorker(final FileSystem fs, final Configuration conf) {
094      this.conf = conf;
095      this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
096      this.fs = fs;
097    }
098
099    /**
100     * Execute the compaction on the specified path.
101     *
102     * @param path Directory path on which to run compaction.
103     * @param compactOnce Execute just a single step of compaction.
104     * @param major Request major compaction.
105     */
106    public void compact(final Path path, final boolean compactOnce, final boolean major)
107        throws IOException {
108      if (isFamilyDir(fs, path)) {
109        Path regionDir = path.getParent();
110        Path tableDir = regionDir.getParent();
111        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
112        RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
113        compactStoreFiles(tableDir, htd, hri,
114            path.getName(), compactOnce, major);
115      } else if (isRegionDir(fs, path)) {
116        Path tableDir = path.getParent();
117        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
118        compactRegion(tableDir, htd, path, compactOnce, major);
119      } else if (isTableDir(fs, path)) {
120        compactTable(path, compactOnce, major);
121      } else {
122        throw new IOException(
123          "Specified path is not a table, region or family directory. path=" + path);
124      }
125    }
126
127    private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
128        throws IOException {
129      TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
130      for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
131        compactRegion(tableDir, htd, regionDir, compactOnce, major);
132      }
133    }
134
135    private void compactRegion(final Path tableDir, final TableDescriptor htd,
136        final Path regionDir, final boolean compactOnce, final boolean major)
137        throws IOException {
138      RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
139      for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
140        compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
141      }
142    }
143
144    /**
145     * Execute the actual compaction job.
146     * If the compact once flag is not specified, execute the compaction until
147     * no more compactions are needed. Uses the Configuration settings provided.
148     */
149    private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
150        final RegionInfo hri, final String familyName, final boolean compactOnce,
151        final boolean major) throws IOException {
152      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName);
153      LOG.info("Compact table=" + htd.getTableName() +
154        " region=" + hri.getRegionNameAsString() +
155        " family=" + familyName);
156      if (major) {
157        store.triggerMajorCompaction();
158      }
159      do {
160        Optional<CompactionContext> compaction =
161            store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
162        if (!compaction.isPresent()) {
163          break;
164        }
165        List<HStoreFile> storeFiles =
166            store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null);
167        if (storeFiles != null && !storeFiles.isEmpty()) {
168          if (deleteCompacted) {
169            for (HStoreFile storeFile: storeFiles) {
170              fs.delete(storeFile.getPath(), false);
171            }
172          }
173        }
174      } while (store.needsCompaction() && !compactOnce);
175      //We need to close the store properly, to make sure it will archive compacted files
176      store.close();
177    }
178
179    private static HStore getStore(final Configuration conf, final FileSystem fs,
180        final Path tableDir, final TableDescriptor htd, final RegionInfo hri,
181        final String familyName) throws IOException {
182      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri);
183      HRegion region = new HRegion(regionFs, null, conf, htd, null);
184      return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
185    }
186  }
187
188  private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
189    Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
190    return fs.exists(regionInfo);
191  }
192
193  private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
194    return FSTableDescriptors.getTableInfoPath(fs, path) != null;
195  }
196
197  private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
198    return isRegionDir(fs, path.getParent());
199  }
200
201  private static class CompactionMapper
202      extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
203    private CompactionWorker compactor = null;
204    private boolean compactOnce = false;
205    private boolean major = false;
206
207    @Override
208    public void setup(Context context) {
209      Configuration conf = context.getConfiguration();
210      compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
211      major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
212
213      try {
214        FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
215        this.compactor = new CompactionWorker(fs, conf);
216      } catch (IOException e) {
217        throw new RuntimeException("Could not get the input FileSystem", e);
218      }
219    }
220
221    @Override
222    public void map(LongWritable key, Text value, Context context)
223        throws InterruptedException, IOException {
224      Path path = new Path(value.toString());
225      this.compactor.compact(path, compactOnce, major);
226    }
227  }
228
229  /**
230   * Input format that uses store files block location as input split locality.
231   */
232  private static class CompactionInputFormat extends TextInputFormat {
233    @Override
234    protected boolean isSplitable(JobContext context, Path file) {
235      return true;
236    }
237
238    /**
239     * Returns a split for each store files directory using the block location
240     * of each file as locality reference.
241     */
242    @Override
243    public List<InputSplit> getSplits(JobContext job) throws IOException {
244      List<InputSplit> splits = new ArrayList<>();
245      List<FileStatus> files = listStatus(job);
246
247      Text key = new Text();
248      for (FileStatus file: files) {
249        Path path = file.getPath();
250        FileSystem fs = path.getFileSystem(job.getConfiguration());
251        LineReader reader = new LineReader(fs.open(path));
252        long pos = 0;
253        int n;
254        try {
255          while ((n = reader.readLine(key)) > 0) {
256            String[] hosts = getStoreDirHosts(fs, path);
257            splits.add(new FileSplit(path, pos, n, hosts));
258            pos += n;
259          }
260        } finally {
261          reader.close();
262        }
263      }
264
265      return splits;
266    }
267
268    /**
269     * return the top hosts of the store files, used by the Split
270     */
271    private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
272        throws IOException {
273      FileStatus[] files = CommonFSUtils.listStatus(fs, path);
274      if (files == null) {
275        return new String[] {};
276      }
277
278      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
279      for (FileStatus hfileStatus: files) {
280        HDFSBlocksDistribution storeFileBlocksDistribution =
281          FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
282        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
283      }
284
285      List<String> hosts = hdfsBlocksDistribution.getTopHosts();
286      return hosts.toArray(new String[hosts.size()]);
287    }
288
289    /**
290     * Create the input file for the given directories to compact.
291     * The file is a TextFile with each line corrisponding to a
292     * store files directory to compact.
293     */
294    public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs,
295        final Path path, final Set<Path> toCompactDirs) throws IOException {
296      // Extract the list of store dirs
297      List<Path> storeDirs = new LinkedList<>();
298      for (Path compactDir: toCompactDirs) {
299        if (isFamilyDir(fs, compactDir)) {
300          storeDirs.add(compactDir);
301        } else if (isRegionDir(fs, compactDir)) {
302          storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir));
303        } else if (isTableDir(fs, compactDir)) {
304          // Lookup regions
305          for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
306            storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir));
307          }
308        } else {
309          throw new IOException(
310            "Specified path is not a table, region or family directory. path=" + compactDir);
311        }
312      }
313
314      // Write Input File
315      FSDataOutputStream stream = stagingFs.create(path);
316      LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
317      try {
318        final byte[] newLine = Bytes.toBytes("\n");
319        for (Path storeDir: storeDirs) {
320          stream.write(Bytes.toBytes(storeDir.toString()));
321          stream.write(newLine);
322        }
323      } finally {
324        stream.close();
325      }
326      return storeDirs;
327    }
328  }
329
330  /**
331   * Execute compaction, using a Map-Reduce job.
332   */
333  private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
334      final boolean compactOnce, final boolean major) throws Exception {
335    Configuration conf = getConf();
336    conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
337    conf.setBoolean(CONF_COMPACT_MAJOR, major);
338
339    Job job = new Job(conf);
340    job.setJobName("CompactionTool");
341    job.setJarByClass(CompactionTool.class);
342    job.setMapperClass(CompactionMapper.class);
343    job.setInputFormatClass(CompactionInputFormat.class);
344    job.setOutputFormatClass(NullOutputFormat.class);
345    job.setMapSpeculativeExecution(false);
346    job.setNumReduceTasks(0);
347
348    // add dependencies (including HBase ones)
349    TableMapReduceUtil.addDependencyJars(job);
350
351    Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
352    FileSystem stagingFs = stagingDir.getFileSystem(conf);
353    try {
354      // Create input file with the store dirs
355      Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
356      List<Path> storeDirs = CompactionInputFormat.createInputFile(fs, stagingFs,
357          inputPath, toCompactDirs);
358      CompactionInputFormat.addInputPath(job, inputPath);
359
360      // Initialize credential for secure cluster
361      TableMapReduceUtil.initCredentials(job);
362      // Despite the method name this will get delegation token for the filesystem
363      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
364        storeDirs.toArray(new Path[0]), conf);
365
366      // Start the MR Job and wait
367      return job.waitForCompletion(true) ? 0 : 1;
368    } finally {
369      fs.delete(stagingDir, true);
370    }
371  }
372
373  /**
374   * Execute compaction, from this client, one path at the time.
375   */
376  private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
377      final boolean compactOnce, final boolean major) throws IOException {
378    CompactionWorker worker = new CompactionWorker(fs, getConf());
379    for (Path path: toCompactDirs) {
380      worker.compact(path, compactOnce, major);
381    }
382    return 0;
383  }
384
385  @Override
386  public int run(String[] args) throws Exception {
387    Set<Path> toCompactDirs = new HashSet<>();
388    boolean compactOnce = false;
389    boolean major = false;
390    boolean mapred = false;
391
392    Configuration conf = getConf();
393    FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
394
395    try {
396      for (int i = 0; i < args.length; ++i) {
397        String opt = args[i];
398        if (opt.equals("-compactOnce")) {
399          compactOnce = true;
400        } else if (opt.equals("-major")) {
401          major = true;
402        } else if (opt.equals("-mapred")) {
403          mapred = true;
404        } else if (!opt.startsWith("-")) {
405          Path path = new Path(opt);
406          FileStatus status = fs.getFileStatus(path);
407          if (!status.isDirectory()) {
408            printUsage("Specified path is not a directory. path=" + path);
409            return 1;
410          }
411          toCompactDirs.add(path);
412        } else {
413          printUsage();
414        }
415      }
416    } catch (Exception e) {
417      printUsage(e.getMessage());
418      return 1;
419    }
420
421    if (toCompactDirs.isEmpty()) {
422      printUsage("No directories to compact specified.");
423      return 1;
424    }
425
426    // Execute compaction!
427    if (mapred) {
428      return doMapReduce(fs, toCompactDirs, compactOnce, major);
429    } else {
430      return doClient(fs, toCompactDirs, compactOnce, major);
431    }
432  }
433
434  private void printUsage() {
435    printUsage(null);
436  }
437
438  private void printUsage(final String message) {
439    if (message != null && message.length() > 0) {
440      System.err.println(message);
441    }
442    System.err.println("Usage: java " + this.getClass().getName() + " \\");
443    System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
444    System.err.println();
445    System.err.println("Options:");
446    System.err.println(" mapred         Use MapReduce to run compaction.");
447    System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
448    System.err.println(" major          Trigger major compaction.");
449    System.err.println();
450    System.err.println("Note: -D properties will be applied to the conf used. ");
451    System.err.println("For example: ");
452    System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
453    System.err.println();
454    System.err.println("Examples:");
455    System.err.println(" To compact the full 'TestTable' using MapReduce:");
456    System.err.println(" $ hbase " + this.getClass().getName() +
457      " -mapred hdfs://hbase/data/default/TestTable");
458    System.err.println();
459    System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
460    System.err.println(" $ hbase " + this.getClass().getName() +
461      " hdfs://hbase/data/default/TestTable/abc/x");
462  }
463
464  public static void main(String[] args) throws Exception {
465    System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
466  }
467}