001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Optional; 028import java.util.Set; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configured; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseInterfaceAudience; 038import org.apache.hadoop.hbase.HDFSBlocksDistribution; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.mapreduce.JobUtil; 042import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.FSTableDescriptors; 049import org.apache.hadoop.hbase.util.FSUtils; 050import org.apache.hadoop.io.LongWritable; 051import org.apache.hadoop.io.NullWritable; 052import org.apache.hadoop.io.Text; 053import org.apache.hadoop.mapreduce.InputSplit; 054import org.apache.hadoop.mapreduce.Job; 055import org.apache.hadoop.mapreduce.JobContext; 056import org.apache.hadoop.mapreduce.Mapper; 057import org.apache.hadoop.mapreduce.lib.input.FileSplit; 058import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 060import org.apache.hadoop.util.LineReader; 061import org.apache.hadoop.util.Tool; 062import org.apache.hadoop.util.ToolRunner; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/* 068 * The CompactionTool allows to execute a compaction specifying a: 069 * <ul> 070 * <li>table folder (all regions and families will be compacted) 071 * <li>region folder (all families in the region will be compacted) 072 * <li>family folder (the store files will be compacted) 073 * </ul> 074 */ 075@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 076public class CompactionTool extends Configured implements Tool { 077 private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class); 078 079 private final static String CONF_TMP_DIR = "hbase.tmp.dir"; 080 private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; 081 private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major"; 082 private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; 083 084 /** 085 * Class responsible to execute the Compaction on the specified path. 086 * The path can be a table, region or family directory. 087 */ 088 private static class CompactionWorker { 089 private final boolean deleteCompacted; 090 private final Configuration conf; 091 private final FileSystem fs; 092 private final Path tmpDir; 093 094 public CompactionWorker(final FileSystem fs, final Configuration conf) { 095 this.conf = conf; 096 this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); 097 this.tmpDir = new Path(conf.get(CONF_TMP_DIR)); 098 this.fs = fs; 099 } 100 101 /** 102 * Execute the compaction on the specified path. 103 * 104 * @param path Directory path on which to run compaction. 105 * @param compactOnce Execute just a single step of compaction. 106 * @param major Request major compaction. 107 */ 108 public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException { 109 if (isFamilyDir(fs, path)) { 110 Path regionDir = path.getParent(); 111 Path tableDir = regionDir.getParent(); 112 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 113 RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); 114 compactStoreFiles(tableDir, htd, hri, 115 path.getName(), compactOnce, major); 116 } else if (isRegionDir(fs, path)) { 117 Path tableDir = path.getParent(); 118 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 119 compactRegion(tableDir, htd, path, compactOnce, major); 120 } else if (isTableDir(fs, path)) { 121 compactTable(path, compactOnce, major); 122 } else { 123 throw new IOException( 124 "Specified path is not a table, region or family directory. path=" + path); 125 } 126 } 127 128 private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major) 129 throws IOException { 130 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 131 for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) { 132 compactRegion(tableDir, htd, regionDir, compactOnce, major); 133 } 134 } 135 136 private void compactRegion(final Path tableDir, final TableDescriptor htd, 137 final Path regionDir, final boolean compactOnce, final boolean major) 138 throws IOException { 139 RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); 140 for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { 141 compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major); 142 } 143 } 144 145 /** 146 * Execute the actual compaction job. 147 * If the compact once flag is not specified, execute the compaction until 148 * no more compactions are needed. Uses the Configuration settings provided. 149 */ 150 private void compactStoreFiles(final Path tableDir, final TableDescriptor htd, 151 final RegionInfo hri, final String familyName, final boolean compactOnce, 152 final boolean major) throws IOException { 153 HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir); 154 LOG.info("Compact table=" + htd.getTableName() + 155 " region=" + hri.getRegionNameAsString() + 156 " family=" + familyName); 157 if (major) { 158 store.triggerMajorCompaction(); 159 } 160 do { 161 Optional<CompactionContext> compaction = 162 store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null); 163 if (!compaction.isPresent()) { 164 break; 165 } 166 List<HStoreFile> storeFiles = 167 store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null); 168 if (storeFiles != null && !storeFiles.isEmpty()) { 169 if (deleteCompacted) { 170 for (HStoreFile storeFile: storeFiles) { 171 fs.delete(storeFile.getPath(), false); 172 } 173 } 174 } 175 } while (store.needsCompaction() && !compactOnce); 176 //We need to close the store properly, to make sure it will archive compacted files 177 store.close(); 178 } 179 180 /** 181 * Create a "mock" HStore that uses the tmpDir specified by the user and 182 * the store dir to compact as source. 183 */ 184 private static HStore getStore(final Configuration conf, final FileSystem fs, 185 final Path tableDir, final TableDescriptor htd, final RegionInfo hri, 186 final String familyName, final Path tempDir) throws IOException { 187 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) { 188 @Override 189 public Path getTempDir() { 190 return tempDir; 191 } 192 }; 193 HRegion region = new HRegion(regionFs, null, conf, htd, null); 194 return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false); 195 } 196 } 197 198 private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException { 199 Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE); 200 return fs.exists(regionInfo); 201 } 202 203 private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException { 204 return FSTableDescriptors.getTableInfoPath(fs, path) != null; 205 } 206 207 private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException { 208 return isRegionDir(fs, path.getParent()); 209 } 210 211 private static class CompactionMapper 212 extends Mapper<LongWritable, Text, NullWritable, NullWritable> { 213 private CompactionWorker compactor = null; 214 private boolean compactOnce = false; 215 private boolean major = false; 216 217 @Override 218 public void setup(Context context) { 219 Configuration conf = context.getConfiguration(); 220 compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false); 221 major = conf.getBoolean(CONF_COMPACT_MAJOR, false); 222 223 try { 224 FileSystem fs = FileSystem.get(conf); 225 this.compactor = new CompactionWorker(fs, conf); 226 } catch (IOException e) { 227 throw new RuntimeException("Could not get the input FileSystem", e); 228 } 229 } 230 231 @Override 232 public void map(LongWritable key, Text value, Context context) 233 throws InterruptedException, IOException { 234 Path path = new Path(value.toString()); 235 this.compactor.compact(path, compactOnce, major); 236 } 237 } 238 239 /** 240 * Input format that uses store files block location as input split locality. 241 */ 242 private static class CompactionInputFormat extends TextInputFormat { 243 @Override 244 protected boolean isSplitable(JobContext context, Path file) { 245 return true; 246 } 247 248 /** 249 * Returns a split for each store files directory using the block location 250 * of each file as locality reference. 251 */ 252 @Override 253 public List<InputSplit> getSplits(JobContext job) throws IOException { 254 List<InputSplit> splits = new ArrayList<>(); 255 List<FileStatus> files = listStatus(job); 256 257 Text key = new Text(); 258 for (FileStatus file: files) { 259 Path path = file.getPath(); 260 FileSystem fs = path.getFileSystem(job.getConfiguration()); 261 LineReader reader = new LineReader(fs.open(path)); 262 long pos = 0; 263 int n; 264 try { 265 while ((n = reader.readLine(key)) > 0) { 266 String[] hosts = getStoreDirHosts(fs, path); 267 splits.add(new FileSplit(path, pos, n, hosts)); 268 pos += n; 269 } 270 } finally { 271 reader.close(); 272 } 273 } 274 275 return splits; 276 } 277 278 /** 279 * return the top hosts of the store files, used by the Split 280 */ 281 private static String[] getStoreDirHosts(final FileSystem fs, final Path path) 282 throws IOException { 283 FileStatus[] files = FSUtils.listStatus(fs, path); 284 if (files == null) { 285 return new String[] {}; 286 } 287 288 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); 289 for (FileStatus hfileStatus: files) { 290 HDFSBlocksDistribution storeFileBlocksDistribution = 291 FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen()); 292 hdfsBlocksDistribution.add(storeFileBlocksDistribution); 293 } 294 295 List<String> hosts = hdfsBlocksDistribution.getTopHosts(); 296 return hosts.toArray(new String[hosts.size()]); 297 } 298 299 /** 300 * Create the input file for the given directories to compact. 301 * The file is a TextFile with each line corrisponding to a 302 * store files directory to compact. 303 */ 304 public static void createInputFile(final FileSystem fs, final Path path, 305 final Set<Path> toCompactDirs) throws IOException { 306 // Extract the list of store dirs 307 List<Path> storeDirs = new LinkedList<>(); 308 for (Path compactDir: toCompactDirs) { 309 if (isFamilyDir(fs, compactDir)) { 310 storeDirs.add(compactDir); 311 } else if (isRegionDir(fs, compactDir)) { 312 for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) { 313 storeDirs.add(familyDir); 314 } 315 } else if (isTableDir(fs, compactDir)) { 316 // Lookup regions 317 for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) { 318 for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { 319 storeDirs.add(familyDir); 320 } 321 } 322 } else { 323 throw new IOException( 324 "Specified path is not a table, region or family directory. path=" + compactDir); 325 } 326 } 327 328 // Write Input File 329 FSDataOutputStream stream = fs.create(path); 330 LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact."); 331 try { 332 final byte[] newLine = Bytes.toBytes("\n"); 333 for (Path storeDir: storeDirs) { 334 stream.write(Bytes.toBytes(storeDir.toString())); 335 stream.write(newLine); 336 } 337 } finally { 338 stream.close(); 339 } 340 } 341 } 342 343 /** 344 * Execute compaction, using a Map-Reduce job. 345 */ 346 private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs, 347 final boolean compactOnce, final boolean major) throws Exception { 348 Configuration conf = getConf(); 349 conf.setBoolean(CONF_COMPACT_ONCE, compactOnce); 350 conf.setBoolean(CONF_COMPACT_MAJOR, major); 351 352 Job job = new Job(conf); 353 job.setJobName("CompactionTool"); 354 job.setJarByClass(CompactionTool.class); 355 job.setMapperClass(CompactionMapper.class); 356 job.setInputFormatClass(CompactionInputFormat.class); 357 job.setOutputFormatClass(NullOutputFormat.class); 358 job.setMapSpeculativeExecution(false); 359 job.setNumReduceTasks(0); 360 361 // add dependencies (including HBase ones) 362 TableMapReduceUtil.addDependencyJars(job); 363 364 Path stagingDir = JobUtil.getStagingDir(conf); 365 try { 366 // Create input file with the store dirs 367 Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime()); 368 CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs); 369 CompactionInputFormat.addInputPath(job, inputPath); 370 371 // Initialize credential for secure cluster 372 TableMapReduceUtil.initCredentials(job); 373 374 // Start the MR Job and wait 375 return job.waitForCompletion(true) ? 0 : 1; 376 } finally { 377 fs.delete(stagingDir, true); 378 } 379 } 380 381 /** 382 * Execute compaction, from this client, one path at the time. 383 */ 384 private int doClient(final FileSystem fs, final Set<Path> toCompactDirs, 385 final boolean compactOnce, final boolean major) throws IOException { 386 CompactionWorker worker = new CompactionWorker(fs, getConf()); 387 for (Path path: toCompactDirs) { 388 worker.compact(path, compactOnce, major); 389 } 390 return 0; 391 } 392 393 @Override 394 public int run(String[] args) throws Exception { 395 Set<Path> toCompactDirs = new HashSet<>(); 396 boolean compactOnce = false; 397 boolean major = false; 398 boolean mapred = false; 399 400 Configuration conf = getConf(); 401 FileSystem fs = FileSystem.get(conf); 402 403 try { 404 for (int i = 0; i < args.length; ++i) { 405 String opt = args[i]; 406 if (opt.equals("-compactOnce")) { 407 compactOnce = true; 408 } else if (opt.equals("-major")) { 409 major = true; 410 } else if (opt.equals("-mapred")) { 411 mapred = true; 412 } else if (!opt.startsWith("-")) { 413 Path path = new Path(opt); 414 FileStatus status = fs.getFileStatus(path); 415 if (!status.isDirectory()) { 416 printUsage("Specified path is not a directory. path=" + path); 417 return 1; 418 } 419 toCompactDirs.add(path); 420 } else { 421 printUsage(); 422 } 423 } 424 } catch (Exception e) { 425 printUsage(e.getMessage()); 426 return 1; 427 } 428 429 if (toCompactDirs.isEmpty()) { 430 printUsage("No directories to compact specified."); 431 return 1; 432 } 433 434 // Execute compaction! 435 if (mapred) { 436 return doMapReduce(fs, toCompactDirs, compactOnce, major); 437 } else { 438 return doClient(fs, toCompactDirs, compactOnce, major); 439 } 440 } 441 442 private void printUsage() { 443 printUsage(null); 444 } 445 446 private void printUsage(final String message) { 447 if (message != null && message.length() > 0) { 448 System.err.println(message); 449 } 450 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 451 System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files..."); 452 System.err.println(); 453 System.err.println("Options:"); 454 System.err.println(" mapred Use MapReduce to run compaction."); 455 System.err.println(" compactOnce Execute just one compaction step. (default: while needed)"); 456 System.err.println(" major Trigger major compaction."); 457 System.err.println(); 458 System.err.println("Note: -D properties will be applied to the conf used. "); 459 System.err.println("For example: "); 460 System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false"); 461 System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR"); 462 System.err.println(); 463 System.err.println("Examples:"); 464 System.err.println(" To compact the full 'TestTable' using MapReduce:"); 465 System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable"); 466 System.err.println(); 467 System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':"); 468 System.err.println(" $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x"); 469 } 470 471 public static void main(String[] args) throws Exception { 472 System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args)); 473 } 474}