001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Optional; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.HDFSBlocksDistribution; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.mapreduce.JobUtil; 041import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 044import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.CommonFSUtils; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.FSTableDescriptors; 049import org.apache.hadoop.hbase.util.FSUtils; 050import org.apache.hadoop.io.LongWritable; 051import org.apache.hadoop.io.NullWritable; 052import org.apache.hadoop.io.Text; 053import org.apache.hadoop.mapreduce.InputSplit; 054import org.apache.hadoop.mapreduce.Job; 055import org.apache.hadoop.mapreduce.JobContext; 056import org.apache.hadoop.mapreduce.Mapper; 057import org.apache.hadoop.mapreduce.lib.input.FileSplit; 058import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 060import org.apache.hadoop.mapreduce.security.TokenCache; 061import org.apache.hadoop.util.LineReader; 062import org.apache.hadoop.util.Tool; 063import org.apache.hadoop.util.ToolRunner; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/* 069 * The CompactionTool allows to execute a compaction specifying a: 070 * <ul> 071 * <li>table folder (all regions and families will be compacted) 072 * <li>region folder (all families in the region will be compacted) 073 * <li>family folder (the store files will be compacted) 074 * </ul> 075 */ 076@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 077public class CompactionTool extends Configured implements Tool { 078 private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class); 079 080 private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; 081 private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major"; 082 private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; 083 084 /** 085 * Class responsible to execute the Compaction on the specified path. The path can be a table, 086 * region or family directory. 087 */ 088 private static class CompactionWorker { 089 private final boolean deleteCompacted; 090 private final Configuration conf; 091 private final FileSystem fs; 092 093 public CompactionWorker(final FileSystem fs, final Configuration conf) { 094 this.conf = conf; 095 this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); 096 this.fs = fs; 097 } 098 099 /** 100 * Execute the compaction on the specified path. 101 * @param path Directory path on which to run compaction. 102 * @param compactOnce Execute just a single step of compaction. 103 * @param major Request major compaction. 104 */ 105 public void compact(final Path path, final boolean compactOnce, final boolean major) 106 throws IOException { 107 if (isFamilyDir(fs, path)) { 108 Path regionDir = path.getParent(); 109 Path tableDir = regionDir.getParent(); 110 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 111 RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); 112 compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce, major); 113 } else if (isRegionDir(fs, path)) { 114 Path tableDir = path.getParent(); 115 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 116 compactRegion(tableDir, htd, path, compactOnce, major); 117 } else if (isTableDir(fs, path)) { 118 compactTable(path, compactOnce, major); 119 } else { 120 throw new IOException( 121 "Specified path is not a table, region or family directory. path=" + path); 122 } 123 } 124 125 private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major) 126 throws IOException { 127 TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 128 for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) { 129 compactRegion(tableDir, htd, regionDir, compactOnce, major); 130 } 131 } 132 133 private void compactRegion(final Path tableDir, final TableDescriptor htd, final Path regionDir, 134 final boolean compactOnce, final boolean major) throws IOException { 135 RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); 136 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 137 compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major); 138 } 139 } 140 141 /** 142 * Execute the actual compaction job. If the compact once flag is not specified, execute the 143 * compaction until no more compactions are needed. Uses the Configuration settings provided. 144 */ 145 private void compactStoreFiles(final Path tableDir, final TableDescriptor htd, 146 final RegionInfo hri, final String familyName, final boolean compactOnce, final boolean major) 147 throws IOException { 148 HStore store = getStore(conf, fs, tableDir, htd, hri, familyName); 149 LOG.info("Compact table=" + htd.getTableName() + " region=" + hri.getRegionNameAsString() 150 + " family=" + familyName); 151 if (major) { 152 store.triggerMajorCompaction(); 153 } 154 do { 155 Optional<CompactionContext> compaction = 156 store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null); 157 if (!compaction.isPresent()) { 158 break; 159 } 160 List<HStoreFile> storeFiles = 161 store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null); 162 if (storeFiles != null && !storeFiles.isEmpty()) { 163 if (deleteCompacted) { 164 for (HStoreFile storeFile : storeFiles) { 165 fs.delete(storeFile.getPath(), false); 166 } 167 } 168 } 169 } while (store.needsCompaction() && !compactOnce); 170 // We need to close the store properly, to make sure it will archive compacted files 171 store.close(); 172 } 173 174 private static HStore getStore(final Configuration conf, final FileSystem fs, 175 final Path tableDir, final TableDescriptor htd, final RegionInfo hri, final String familyName) 176 throws IOException { 177 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri); 178 HRegion region = new HRegion(regionFs, null, conf, htd, null); 179 return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false); 180 } 181 } 182 183 private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException { 184 Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE); 185 return fs.exists(regionInfo); 186 } 187 188 private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException { 189 return FSTableDescriptors.isTableDir(fs, path); 190 } 191 192 private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException { 193 return isRegionDir(fs, path.getParent()); 194 } 195 196 private static class CompactionMapper 197 extends Mapper<LongWritable, Text, NullWritable, NullWritable> { 198 private CompactionWorker compactor = null; 199 private boolean compactOnce = false; 200 private boolean major = false; 201 202 @Override 203 public void setup(Context context) { 204 Configuration conf = context.getConfiguration(); 205 compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false); 206 major = conf.getBoolean(CONF_COMPACT_MAJOR, false); 207 208 try { 209 FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf); 210 this.compactor = new CompactionWorker(fs, conf); 211 } catch (IOException e) { 212 throw new RuntimeException("Could not get the input FileSystem", e); 213 } 214 } 215 216 @Override 217 public void map(LongWritable key, Text value, Context context) 218 throws InterruptedException, IOException { 219 Path path = new Path(value.toString()); 220 this.compactor.compact(path, compactOnce, major); 221 } 222 } 223 224 /** 225 * Input format that uses store files block location as input split locality. 226 */ 227 private static class CompactionInputFormat extends TextInputFormat { 228 @Override 229 protected boolean isSplitable(JobContext context, Path file) { 230 return true; 231 } 232 233 /** 234 * Returns a split for each store files directory using the block location of each file as 235 * locality reference. 236 */ 237 @Override 238 public List<InputSplit> getSplits(JobContext job) throws IOException { 239 List<InputSplit> splits = new ArrayList<>(); 240 List<FileStatus> files = listStatus(job); 241 242 Text key = new Text(); 243 for (FileStatus file : files) { 244 Path path = file.getPath(); 245 FileSystem fs = path.getFileSystem(job.getConfiguration()); 246 LineReader reader = new LineReader(fs.open(path)); 247 long pos = 0; 248 int n; 249 try { 250 while ((n = reader.readLine(key)) > 0) { 251 String[] hosts = getStoreDirHosts(fs, path); 252 splits.add(new FileSplit(path, pos, n, hosts)); 253 pos += n; 254 } 255 } finally { 256 reader.close(); 257 } 258 } 259 260 return splits; 261 } 262 263 /** 264 * return the top hosts of the store files, used by the Split 265 */ 266 private static String[] getStoreDirHosts(final FileSystem fs, final Path path) 267 throws IOException { 268 FileStatus[] files = CommonFSUtils.listStatus(fs, path); 269 if (files == null) { 270 return new String[] {}; 271 } 272 273 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); 274 for (FileStatus hfileStatus : files) { 275 HDFSBlocksDistribution storeFileBlocksDistribution = 276 FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen()); 277 hdfsBlocksDistribution.add(storeFileBlocksDistribution); 278 } 279 280 List<String> hosts = hdfsBlocksDistribution.getTopHosts(); 281 return hosts.toArray(new String[hosts.size()]); 282 } 283 284 /** 285 * Create the input file for the given directories to compact. The file is a TextFile with each 286 * line corrisponding to a store files directory to compact. 287 */ 288 public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs, 289 final Path path, final Set<Path> toCompactDirs) throws IOException { 290 // Extract the list of store dirs 291 List<Path> storeDirs = new LinkedList<>(); 292 for (Path compactDir : toCompactDirs) { 293 if (isFamilyDir(fs, compactDir)) { 294 storeDirs.add(compactDir); 295 } else if (isRegionDir(fs, compactDir)) { 296 storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir)); 297 } else if (isTableDir(fs, compactDir)) { 298 // Lookup regions 299 for (Path regionDir : FSUtils.getRegionDirs(fs, compactDir)) { 300 storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir)); 301 } 302 } else { 303 throw new IOException( 304 "Specified path is not a table, region or family directory. path=" + compactDir); 305 } 306 } 307 308 // Write Input File 309 FSDataOutputStream stream = stagingFs.create(path); 310 LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact."); 311 try { 312 final byte[] newLine = Bytes.toBytes("\n"); 313 for (Path storeDir : storeDirs) { 314 stream.write(Bytes.toBytes(storeDir.toString())); 315 stream.write(newLine); 316 } 317 } finally { 318 stream.close(); 319 } 320 return storeDirs; 321 } 322 } 323 324 /** 325 * Execute compaction, using a Map-Reduce job. 326 */ 327 private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs, 328 final boolean compactOnce, final boolean major) throws Exception { 329 Configuration conf = getConf(); 330 conf.setBoolean(CONF_COMPACT_ONCE, compactOnce); 331 conf.setBoolean(CONF_COMPACT_MAJOR, major); 332 333 Job job = new Job(conf); 334 job.setJobName("CompactionTool"); 335 job.setJarByClass(CompactionTool.class); 336 job.setMapperClass(CompactionMapper.class); 337 job.setInputFormatClass(CompactionInputFormat.class); 338 job.setOutputFormatClass(NullOutputFormat.class); 339 job.setMapSpeculativeExecution(false); 340 job.setNumReduceTasks(0); 341 342 // add dependencies (including HBase ones) 343 TableMapReduceUtil.addDependencyJars(job); 344 345 Path stagingDir = JobUtil.getQualifiedStagingDir(conf); 346 FileSystem stagingFs = stagingDir.getFileSystem(conf); 347 try { 348 // Create input file with the store dirs 349 Path inputPath = new Path(stagingDir, "compact-" + EnvironmentEdgeManager.currentTime()); 350 List<Path> storeDirs = 351 CompactionInputFormat.createInputFile(fs, stagingFs, inputPath, toCompactDirs); 352 CompactionInputFormat.addInputPath(job, inputPath); 353 354 // Initialize credential for secure cluster 355 TableMapReduceUtil.initCredentials(job); 356 // Despite the method name this will get delegation token for the filesystem 357 TokenCache.obtainTokensForNamenodes(job.getCredentials(), storeDirs.toArray(new Path[0]), 358 conf); 359 360 // Start the MR Job and wait 361 return job.waitForCompletion(true) ? 0 : 1; 362 } finally { 363 fs.delete(stagingDir, true); 364 } 365 } 366 367 /** 368 * Execute compaction, from this client, one path at the time. 369 */ 370 private int doClient(final FileSystem fs, final Set<Path> toCompactDirs, 371 final boolean compactOnce, final boolean major) throws IOException { 372 CompactionWorker worker = new CompactionWorker(fs, getConf()); 373 for (Path path : toCompactDirs) { 374 worker.compact(path, compactOnce, major); 375 } 376 return 0; 377 } 378 379 @Override 380 public int run(String[] args) throws Exception { 381 Set<Path> toCompactDirs = new HashSet<>(); 382 boolean compactOnce = false; 383 boolean major = false; 384 boolean mapred = false; 385 386 Configuration conf = getConf(); 387 FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf); 388 389 try { 390 for (int i = 0; i < args.length; ++i) { 391 String opt = args[i]; 392 if (opt.equals("-compactOnce")) { 393 compactOnce = true; 394 } else if (opt.equals("-major")) { 395 major = true; 396 } else if (opt.equals("-mapred")) { 397 mapred = true; 398 } else if (!opt.startsWith("-")) { 399 Path path = new Path(opt); 400 FileStatus status = fs.getFileStatus(path); 401 if (!status.isDirectory()) { 402 printUsage("Specified path is not a directory. path=" + path); 403 return 1; 404 } 405 toCompactDirs.add(path); 406 } else { 407 printUsage(); 408 } 409 } 410 } catch (Exception e) { 411 printUsage(e.getMessage()); 412 return 1; 413 } 414 415 if (toCompactDirs.isEmpty()) { 416 printUsage("No directories to compact specified."); 417 return 1; 418 } 419 420 // Execute compaction! 421 if (mapred) { 422 return doMapReduce(fs, toCompactDirs, compactOnce, major); 423 } else { 424 return doClient(fs, toCompactDirs, compactOnce, major); 425 } 426 } 427 428 private void printUsage() { 429 printUsage(null); 430 } 431 432 private void printUsage(final String message) { 433 if (message != null && message.length() > 0) { 434 System.err.println(message); 435 } 436 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 437 System.err.println(" [-compactOnce] [-major] [-mapred] [-D<property=value>]* files..."); 438 System.err.println(); 439 System.err.println("Options:"); 440 System.err.println(" mapred Use MapReduce to run compaction."); 441 System.err.println(" compactOnce Execute just one compaction step. (default: while needed)"); 442 System.err.println(" major Trigger major compaction."); 443 System.err.println(); 444 System.err.println("Note: -D properties will be applied to the conf used. "); 445 System.err.println("For example: "); 446 System.err 447 .println(" To stop delete of compacted file, pass -D" + CONF_DELETE_COMPACTED + "=false"); 448 System.err.println(); 449 System.err.println("Examples:"); 450 System.err.println(" To compact the full 'TestTable' using MapReduce:"); 451 System.err.println( 452 " $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable"); 453 System.err.println(); 454 System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':"); 455 System.err.println( 456 " $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x"); 457 } 458 459 public static void main(String[] args) throws Exception { 460 System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args)); 461 } 462}