001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Optional; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HDFSBlocksDistribution; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.mapreduce.JobUtil; 041import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 044import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.CommonFSUtils; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.FSTableDescriptors; 049import org.apache.hadoop.hbase.util.FSUtils; 050import org.apache.hadoop.io.LongWritable; 051import org.apache.hadoop.io.NullWritable; 052import org.apache.hadoop.io.Text; 053import org.apache.hadoop.mapreduce.InputSplit; 054import org.apache.hadoop.mapreduce.Job; 055import org.apache.hadoop.mapreduce.JobContext; 056import org.apache.hadoop.mapreduce.Mapper; 057import org.apache.hadoop.mapreduce.lib.input.FileSplit; 058import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 060import org.apache.hadoop.mapreduce.security.TokenCache; 061import org.apache.hadoop.util.LineReader; 062import org.apache.hadoop.util.Tool; 063import org.apache.hadoop.util.ToolRunner; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/* 069 * The CompactionTool allows to execute a compaction specifying a: 070 * <ul> 071 * <li>table folder (all regions and families will be compacted) 072 * <li>region folder (all families in the region will be compacted) 073 * <li>family folder (the store files will be compacted) 074 * </ul> 075 */ 076@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 077public class CompactionTool extends Configured implements Tool { 078 private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class); 079 080 private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; 081 private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major"; 082 private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; 083 084 /** 085 * Class responsible to execute the Compaction on the specified path. The path can be a table, 086 * region or family directory. 087 */ 088 private static class CompactionWorker { 089 private final boolean deleteCompacted; 090 private final Configuration conf; 091 private final FileSystem fs; 092 093 public CompactionWorker(final FileSystem fs, final Configuration conf) { 094 this.conf = conf; 095 this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); 096 this.fs = fs; 097 } 098 099 /** 100 * Execute the compaction on the specified path. 101 * @param path Directory path on which to run compaction. 102 * @param compactOnce Execute just a single step of compaction. 103 * @param major Request major compaction. 104 */ 105 public void compact(final Path path, final boolean compactOnce, final boolean major) 106 throws IOException { 107 if (isFamilyDir(fs, path)) { 108 Path regionDir = path.getParent(); 109 Path tableDir = regionDir.getParent(); 110 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 111 RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); 112 compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce, major); 113 } else if (isRegionDir(fs, path)) { 114 Path tableDir = path.getParent(); 115 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 116 compactRegion(tableDir, htd, path, compactOnce, major); 117 } else if (isTableDir(fs, path)) { 118 compactTable(path, compactOnce, major); 119 } else { 120 throw new IOException( 121 "Specified path is not a table, region or family directory. path=" + path); 122 } 123 } 124 125 private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major) 126 throws IOException { 127 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 128 for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) { 129 compactRegion(tableDir, htd, regionDir, compactOnce, major); 130 } 131 } 132 133 private void compactRegion(final Path tableDir, final TableDescriptor htd, final Path regionDir, 134 final boolean compactOnce, final boolean major) throws IOException { 135 RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); 136 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 137 compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major); 138 } 139 } 140 141 /** 142 * Execute the actual compaction job. If the compact once flag is not specified, execute the 143 * compaction until no more compactions are needed. Uses the Configuration settings provided. 144 */ 145 private void compactStoreFiles(final Path tableDir, final TableDescriptor htd, 146 final RegionInfo hri, final String familyName, final boolean compactOnce, final boolean major) 147 throws IOException { 148 HStore store = getStore(conf, fs, tableDir, htd, hri, familyName); 149 LOG.info("Compact table=" + htd.getTableName() + " region=" + hri.getRegionNameAsString() 150 + " family=" + familyName); 151 if (major) { 152 store.triggerMajorCompaction(); 153 } 154 do { 155 Optional<CompactionContext> compaction = 156 store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null); 157 if (!compaction.isPresent()) { 158 break; 159 } 160 List<HStoreFile> storeFiles = 161 store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null); 162 if (storeFiles != null && !storeFiles.isEmpty()) { 163 if (deleteCompacted) { 164 for (HStoreFile storeFile : storeFiles) { 165 fs.delete(storeFile.getPath(), false); 166 } 167 } 168 } 169 } while (store.needsCompaction() && !compactOnce); 170 // We need to close the store properly, to make sure it will archive compacted files 171 store.close(); 172 } 173 174 private static HStore getStore(final Configuration conf, final FileSystem fs, 175 final Path tableDir, final TableDescriptor htd, final RegionInfo hri, final String familyName) 176 throws IOException { 177 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri); 178 HRegion region = new HRegion(regionFs, null, conf, htd, null); 179 return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false); 180 } 181 } 182 183 private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException { 184 Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE); 185 return fs.exists(regionInfo); 186 } 187 188 private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException { 189 return FSTableDescriptors.isTableDir(fs, path); 190 } 191 192 private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException { 193 return isRegionDir(fs, path.getParent()); 194 } 195 196 private static class CompactionMapper 197 extends Mapper<LongWritable, Text, NullWritable, NullWritable> { 198 private CompactionWorker compactor = null; 199 private boolean compactOnce = false; 200 private boolean major = false; 201 202 @Override 203 public void setup(Context context) { 204 Configuration conf = context.getConfiguration(); 205 compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false); 206 major = conf.getBoolean(CONF_COMPACT_MAJOR, false); 207 208 try { 209 FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf); 210 this.compactor = new CompactionWorker(fs, conf); 211 } catch (IOException e) { 212 throw new RuntimeException("Could not get the input FileSystem", e); 213 } 214 // Disable the MemStoreLAB as MemStore is not used by flow during compaction 215 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false); 216 } 217 218 @Override 219 public void map(LongWritable key, Text value, Context context) 220 throws InterruptedException, IOException { 221 Path path = new Path(value.toString()); 222 this.compactor.compact(path, compactOnce, major); 223 } 224 } 225 226 /** 227 * Input format that uses store files block location as input split locality. 228 */ 229 private static class CompactionInputFormat extends TextInputFormat { 230 @Override 231 protected boolean isSplitable(JobContext context, Path file) { 232 return true; 233 } 234 235 /** 236 * Returns a split for each store files directory using the block location of each file as 237 * locality reference. 238 */ 239 @Override 240 public List<InputSplit> getSplits(JobContext job) throws IOException { 241 List<InputSplit> splits = new ArrayList<>(); 242 List<FileStatus> files = listStatus(job); 243 244 Text key = new Text(); 245 for (FileStatus file : files) { 246 Path path = file.getPath(); 247 FileSystem fs = path.getFileSystem(job.getConfiguration()); 248 LineReader reader = new LineReader(fs.open(path)); 249 long pos = 0; 250 int n; 251 try { 252 while ((n = reader.readLine(key)) > 0) { 253 String[] hosts = getStoreDirHosts(fs, path); 254 splits.add(new FileSplit(path, pos, n, hosts)); 255 pos += n; 256 } 257 } finally { 258 reader.close(); 259 } 260 } 261 262 return splits; 263 } 264 265 /** 266 * return the top hosts of the store files, used by the Split 267 */ 268 private static String[] getStoreDirHosts(final FileSystem fs, final Path path) 269 throws IOException { 270 FileStatus[] files = CommonFSUtils.listStatus(fs, path); 271 if (files == null) { 272 return new String[] {}; 273 } 274 275 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); 276 for (FileStatus hfileStatus : files) { 277 HDFSBlocksDistribution storeFileBlocksDistribution = 278 FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen()); 279 hdfsBlocksDistribution.add(storeFileBlocksDistribution); 280 } 281 282 List<String> hosts = hdfsBlocksDistribution.getTopHosts(); 283 return hosts.toArray(new String[hosts.size()]); 284 } 285 286 /** 287 * Create the input file for the given directories to compact. The file is a TextFile with each 288 * line corrisponding to a store files directory to compact. 289 */ 290 public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs, 291 final Path path, final Set<Path> toCompactDirs) throws IOException { 292 // Extract the list of store dirs 293 List<Path> storeDirs = new LinkedList<>(); 294 for (Path compactDir : toCompactDirs) { 295 if (isFamilyDir(fs, compactDir)) { 296 storeDirs.add(compactDir); 297 } else if (isRegionDir(fs, compactDir)) { 298 storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir)); 299 } else if (isTableDir(fs, compactDir)) { 300 // Lookup regions 301 for (Path regionDir : FSUtils.getRegionDirs(fs, compactDir)) { 302 storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir)); 303 } 304 } else { 305 throw new IOException( 306 "Specified path is not a table, region or family directory. path=" + compactDir); 307 } 308 } 309 310 // Write Input File 311 FSDataOutputStream stream = stagingFs.create(path); 312 LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact."); 313 try { 314 final byte[] newLine = Bytes.toBytes("\n"); 315 for (Path storeDir : storeDirs) { 316 stream.write(Bytes.toBytes(storeDir.toString())); 317 stream.write(newLine); 318 } 319 } finally { 320 stream.close(); 321 } 322 return storeDirs; 323 } 324 } 325 326 /** 327 * Execute compaction, using a Map-Reduce job. 328 */ 329 private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs, 330 final boolean compactOnce, final boolean major) throws Exception { 331 Configuration conf = getConf(); 332 conf.setBoolean(CONF_COMPACT_ONCE, compactOnce); 333 conf.setBoolean(CONF_COMPACT_MAJOR, major); 334 335 Job job = new Job(conf); 336 job.setJobName("CompactionTool"); 337 job.setJarByClass(CompactionTool.class); 338 job.setMapperClass(CompactionMapper.class); 339 job.setInputFormatClass(CompactionInputFormat.class); 340 job.setOutputFormatClass(NullOutputFormat.class); 341 job.setMapSpeculativeExecution(false); 342 job.setNumReduceTasks(0); 343 344 // add dependencies (including HBase ones) 345 TableMapReduceUtil.addDependencyJars(job); 346 347 Path stagingDir = JobUtil.getQualifiedStagingDir(conf); 348 FileSystem stagingFs = stagingDir.getFileSystem(conf); 349 try { 350 // Create input file with the store dirs 351 Path inputPath = new Path(stagingDir, "compact-" + EnvironmentEdgeManager.currentTime()); 352 List<Path> storeDirs = 353 CompactionInputFormat.createInputFile(fs, stagingFs, inputPath, toCompactDirs); 354 CompactionInputFormat.addInputPath(job, inputPath); 355 356 // Initialize credential for secure cluster 357 TableMapReduceUtil.initCredentials(job); 358 // Despite the method name this will get delegation token for the filesystem 359 TokenCache.obtainTokensForNamenodes(job.getCredentials(), storeDirs.toArray(new Path[0]), 360 conf); 361 362 // Start the MR Job and wait 363 return job.waitForCompletion(true) ? 0 : 1; 364 } finally { 365 fs.delete(stagingDir, true); 366 } 367 } 368 369 /** 370 * Execute compaction, from this client, one path at the time. 371 */ 372 private int doClient(final FileSystem fs, final Set<Path> toCompactDirs, 373 final boolean compactOnce, final boolean major) throws IOException { 374 // Disable the MemStoreLAB as MemStore is not used by flow during compaction 375 getConf().setBoolean(MemStoreLAB.USEMSLAB_KEY, false); 376 CompactionWorker worker = new CompactionWorker(fs, getConf()); 377 for (Path path : toCompactDirs) { 378 worker.compact(path, compactOnce, major); 379 } 380 return 0; 381 } 382 383 @Override 384 public int run(String[] args) throws Exception { 385 Set<Path> toCompactDirs = new HashSet<>(); 386 boolean compactOnce = false; 387 boolean major = false; 388 boolean mapred = false; 389 390 Configuration conf = getConf(); 391 FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf); 392 393 try { 394 for (int i = 0; i < args.length; ++i) { 395 String opt = args[i]; 396 if (opt.equals("-compactOnce")) { 397 compactOnce = true; 398 } else if (opt.equals("-major")) { 399 major = true; 400 } else if (opt.equals("-mapred")) { 401 mapred = true; 402 } else if (!opt.startsWith("-")) { 403 Path path = new Path(opt); 404 FileStatus status = fs.getFileStatus(path); 405 if (!status.isDirectory()) { 406 printUsage("Specified path is not a directory. path=" + path); 407 return 1; 408 } 409 toCompactDirs.add(path); 410 } else { 411 printUsage(); 412 } 413 } 414 } catch (Exception e) { 415 printUsage(e.getMessage()); 416 return 1; 417 } 418 419 if (toCompactDirs.isEmpty()) { 420 printUsage("No directories to compact specified."); 421 return 1; 422 } 423 424 // Execute compaction! 425 if (mapred) { 426 return doMapReduce(fs, toCompactDirs, compactOnce, major); 427 } else { 428 return doClient(fs, toCompactDirs, compactOnce, major); 429 } 430 } 431 432 private void printUsage() { 433 printUsage(null); 434 } 435 436 private void printUsage(final String message) { 437 if (message != null && message.length() > 0) { 438 System.err.println(message); 439 } 440 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 441 System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files..."); 442 System.err.println(); 443 System.err.println("Options:"); 444 System.err.println(" mapred Use MapReduce to run compaction."); 445 System.err.println(" compactOnce Execute just one compaction step. (default: while needed)"); 446 System.err.println(" major Trigger major compaction."); 447 System.err.println(); 448 System.err.println("Note: -D properties will be applied to the conf used. "); 449 System.err.println("For example: "); 450 System.err 451 .println(" To stop delete of compacted file, pass -D" + CONF_DELETE_COMPACTED + "=false"); 452 System.err.println(); 453 System.err.println("Examples:"); 454 System.err.println(" To compact the full 'TestTable' using MapReduce:"); 455 System.err.println( 456 " $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable"); 457 System.err.println(); 458 System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':"); 459 System.err.println( 460 " $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x"); 461 } 462 463 public static void main(String[] args) throws Exception { 464 System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args)); 465 } 466}