001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.io.InputStreamReader; 022import java.io.OutputStreamWriter; 023import java.security.MessageDigest; 024import java.security.NoSuchAlgorithmException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Properties; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.io.MapFile; 046import org.apache.hadoop.io.NullWritable; 047import org.apache.hadoop.io.SequenceFile; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.mapreduce.Reducer; 050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 051import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; 052import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 053import org.apache.hadoop.util.GenericOptionsParser; 054import org.apache.hadoop.util.Tool; 055import org.apache.hadoop.util.ToolRunner; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.base.Charsets; 061import org.apache.hbase.thirdparty.com.google.common.collect.Ordering; 062 063@InterfaceAudience.Private 064public class HashTable extends Configured implements Tool { 065 066 private static final Logger LOG = LoggerFactory.getLogger(HashTable.class); 067 068 private static final int DEFAULT_BATCH_SIZE = 8000; 069 070 /** 071 * Default hash algorithm. Kept as MD5 so that manifests produced by older versions, which did not 072 * record an algorithm, remain readable. 073 */ 074 static final String DEFAULT_HASH_ALGORITHM = "MD5"; 075 076 private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size"; 077 final static String HASH_ALGORITHM_CONF_KEY = "hash.algorithm"; 078 final static String PARTITIONS_FILE_NAME = "partitions"; 079 final static String MANIFEST_FILE_NAME = "manifest"; 080 final static String HASH_DATA_DIR = "hashes"; 081 final static String OUTPUT_DATA_FILE_PREFIX = "part-r-"; 082 final static String IGNORE_TIMESTAMPS = "ignoreTimestamps"; 083 private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp"; 084 085 TableHash tableHash = new TableHash(); 086 Path destPath; 087 088 public HashTable(Configuration conf) { 089 super(conf); 090 } 091 092 public static class TableHash { 093 094 Path hashDir; 095 096 String tableName; 097 String families = null; 098 long batchSize = DEFAULT_BATCH_SIZE; 099 int numHashFiles = 0; 100 byte[] startRow = HConstants.EMPTY_START_ROW; 101 byte[] stopRow = HConstants.EMPTY_END_ROW; 102 int scanBatch = 0; 103 int versions = -1; 104 long startTime = 0; 105 long endTime = 0; 106 boolean ignoreTimestamps; 107 boolean rawScan; 108 String hashAlgorithm = DEFAULT_HASH_ALGORITHM; 109 110 List<ImmutableBytesWritable> partitions; 111 112 public static TableHash read(Configuration conf, Path hashDir) throws IOException { 113 TableHash tableHash = new TableHash(); 114 FileSystem fs = hashDir.getFileSystem(conf); 115 tableHash.hashDir = hashDir; 116 tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME)); 117 tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME)); 118 return tableHash; 119 } 120 121 void writePropertiesFile(FileSystem fs, Path path) throws IOException { 122 Properties p = new Properties(); 123 p.setProperty("table", tableName); 124 if (families != null) { 125 p.setProperty("columnFamilies", families); 126 } 127 p.setProperty("targetBatchSize", Long.toString(batchSize)); 128 p.setProperty("numHashFiles", Integer.toString(numHashFiles)); 129 if (!isTableStartRow(startRow)) { 130 p.setProperty("startRowHex", Bytes.toHex(startRow)); 131 } 132 if (!isTableEndRow(stopRow)) { 133 p.setProperty("stopRowHex", Bytes.toHex(stopRow)); 134 } 135 if (scanBatch > 0) { 136 p.setProperty("scanBatch", Integer.toString(scanBatch)); 137 } 138 if (versions >= 0) { 139 p.setProperty("versions", Integer.toString(versions)); 140 } 141 if (startTime != 0) { 142 p.setProperty("startTimestamp", Long.toString(startTime)); 143 } 144 if (endTime != 0) { 145 p.setProperty("endTimestamp", Long.toString(endTime)); 146 } 147 p.setProperty("rawScan", Boolean.toString(rawScan)); 148 p.setProperty("hashAlgorithm", hashAlgorithm); 149 150 try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) { 151 p.store(osw, null); 152 } 153 } 154 155 void readPropertiesFile(FileSystem fs, Path path) throws IOException { 156 Properties p = new Properties(); 157 try (FSDataInputStream in = fs.open(path)) { 158 try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) { 159 p.load(isr); 160 } 161 } 162 tableName = p.getProperty("table"); 163 families = p.getProperty("columnFamilies"); 164 batchSize = Long.parseLong(p.getProperty("targetBatchSize")); 165 numHashFiles = Integer.parseInt(p.getProperty("numHashFiles")); 166 167 String startRowHex = p.getProperty("startRowHex"); 168 if (startRowHex != null) { 169 startRow = Bytes.fromHex(startRowHex); 170 } 171 String stopRowHex = p.getProperty("stopRowHex"); 172 if (stopRowHex != null) { 173 stopRow = Bytes.fromHex(stopRowHex); 174 } 175 176 String scanBatchString = p.getProperty("scanBatch"); 177 if (scanBatchString != null) { 178 scanBatch = Integer.parseInt(scanBatchString); 179 } 180 181 String versionString = p.getProperty("versions"); 182 if (versionString != null) { 183 versions = Integer.parseInt(versionString); 184 } 185 186 String rawScanString = p.getProperty("rawScan"); 187 if (rawScanString != null) { 188 rawScan = Boolean.parseBoolean(rawScanString); 189 } 190 191 String startTimeString = p.getProperty("startTimestamp"); 192 if (startTimeString != null) { 193 startTime = Long.parseLong(startTimeString); 194 } 195 196 String endTimeString = p.getProperty("endTimestamp"); 197 if (endTimeString != null) { 198 endTime = Long.parseLong(endTimeString); 199 } 200 201 hashAlgorithm = p.getProperty("hashAlgorithm", DEFAULT_HASH_ALGORITHM); 202 } 203 204 Scan initScan() throws IOException { 205 Scan scan = new Scan(); 206 scan.setCacheBlocks(false); 207 if (startTime != 0 || endTime != 0) { 208 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 209 } 210 if (scanBatch > 0) { 211 scan.setBatch(scanBatch); 212 } 213 if (versions >= 0) { 214 scan.readVersions(versions); 215 } 216 if (!isTableStartRow(startRow)) { 217 scan.withStartRow(startRow); 218 } 219 if (!isTableEndRow(stopRow)) { 220 scan.withStopRow(stopRow); 221 } 222 if (families != null) { 223 for (String fam : families.split(",")) { 224 scan.addFamily(Bytes.toBytes(fam)); 225 } 226 } 227 scan.setRaw(rawScan); 228 229 return scan; 230 } 231 232 /** 233 * Choose partitions between row ranges to hash to a single output file Selects region 234 * boundaries that fall within the scan range, and groups them into the desired number of 235 * partitions. 236 */ 237 void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) { 238 List<byte[]> startKeys = new ArrayList<>(); 239 for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) { 240 byte[] regionStartKey = regionStartEndKeys.getFirst()[i]; 241 byte[] regionEndKey = regionStartEndKeys.getSecond()[i]; 242 243 // if scan begins after this region, or starts before this region, then drop this region 244 // in other words: 245 // IF (scan begins before the end of this region 246 // AND scan ends before the start of this region) 247 // THEN include this region 248 if ( 249 (isTableStartRow(startRow) || isTableEndRow(regionEndKey) 250 || Bytes.compareTo(startRow, regionEndKey) < 0) 251 && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey) 252 || Bytes.compareTo(stopRow, regionStartKey) > 0) 253 ) { 254 startKeys.add(regionStartKey); 255 } 256 } 257 258 int numRegions = startKeys.size(); 259 if (numHashFiles == 0) { 260 numHashFiles = numRegions / 100; 261 } 262 if (numHashFiles == 0) { 263 numHashFiles = 1; 264 } 265 if (numHashFiles > numRegions) { 266 // can't partition within regions 267 numHashFiles = numRegions; 268 } 269 270 // choose a subset of start keys to group regions into ranges 271 partitions = new ArrayList<>(numHashFiles - 1); 272 // skip the first start key as it is not a partition between ranges. 273 for (long i = 1; i < numHashFiles; i++) { 274 int splitIndex = (int) (numRegions * i / numHashFiles); 275 partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex))); 276 } 277 } 278 279 void writePartitionFile(Configuration conf, Path path) throws IOException { 280 FileSystem fs = path.getFileSystem(conf); 281 @SuppressWarnings("deprecation") 282 SequenceFile.Writer writer = 283 SequenceFile.createWriter(fs, conf, path, ImmutableBytesWritable.class, NullWritable.class); 284 285 for (int i = 0; i < partitions.size(); i++) { 286 writer.append(partitions.get(i), NullWritable.get()); 287 } 288 writer.close(); 289 } 290 291 private void readPartitionFile(FileSystem fs, Configuration conf, Path path) 292 throws IOException { 293 @SuppressWarnings("deprecation") 294 SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); 295 ImmutableBytesWritable key = new ImmutableBytesWritable(); 296 partitions = new ArrayList<>(); 297 while (reader.next(key)) { 298 partitions.add(new ImmutableBytesWritable(key.copyBytes())); 299 } 300 reader.close(); 301 302 if (!Ordering.natural().isOrdered(partitions)) { 303 throw new IOException("Partitions are not ordered!"); 304 } 305 } 306 307 @Override 308 public String toString() { 309 StringBuilder sb = new StringBuilder(); 310 sb.append("tableName=").append(tableName); 311 if (families != null) { 312 sb.append(", families=").append(families); 313 } 314 sb.append(", batchSize=").append(batchSize); 315 sb.append(", numHashFiles=").append(numHashFiles); 316 if (!isTableStartRow(startRow)) { 317 sb.append(", startRowHex=").append(Bytes.toHex(startRow)); 318 } 319 if (!isTableEndRow(stopRow)) { 320 sb.append(", stopRowHex=").append(Bytes.toHex(stopRow)); 321 } 322 if (scanBatch >= 0) { 323 sb.append(", scanBatch=").append(scanBatch); 324 } 325 if (versions >= 0) { 326 sb.append(", versions=").append(versions); 327 } 328 sb.append(", rawScan=").append(rawScan); 329 sb.append(", hashAlgorithm=").append(hashAlgorithm); 330 if (startTime != 0) { 331 sb.append("startTime=").append(startTime); 332 } 333 if (endTime != 0) { 334 sb.append("endTime=").append(endTime); 335 } 336 return sb.toString(); 337 } 338 339 static String getDataFileName(int hashFileIndex) { 340 return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex); 341 } 342 343 /** 344 * Open a TableHash.Reader starting at the first hash at or after the given key. 345 */ 346 public Reader newReader(Configuration conf, ImmutableBytesWritable startKey) 347 throws IOException { 348 return new Reader(conf, startKey); 349 } 350 351 public class Reader implements java.io.Closeable { 352 private final Configuration conf; 353 354 private int hashFileIndex; 355 private MapFile.Reader mapFileReader; 356 357 private boolean cachedNext; 358 private ImmutableBytesWritable key; 359 private ImmutableBytesWritable hash; 360 361 Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException { 362 this.conf = conf; 363 int partitionIndex = Collections.binarySearch(partitions, startKey); 364 if (partitionIndex >= 0) { 365 // if the key is equal to a partition, then go the file after that partition 366 hashFileIndex = partitionIndex + 1; 367 } else { 368 // if the key is between partitions, then go to the file between those partitions 369 hashFileIndex = -1 - partitionIndex; 370 } 371 openHashFile(); 372 373 // MapFile's don't make it easy to seek() so that the subsequent next() returns 374 // the desired key/value pair. So we cache it for the first call of next(). 375 hash = new ImmutableBytesWritable(); 376 key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash); 377 if (key == null) { 378 cachedNext = false; 379 hash = null; 380 } else { 381 cachedNext = true; 382 } 383 } 384 385 /** 386 * Read the next key/hash pair. Returns true if such a pair exists and false when at the end 387 * of the data. 388 */ 389 public boolean next() throws IOException { 390 if (cachedNext) { 391 cachedNext = false; 392 return true; 393 } 394 key = new ImmutableBytesWritable(); 395 hash = new ImmutableBytesWritable(); 396 while (true) { 397 boolean hasNext = mapFileReader.next(key, hash); 398 if (hasNext) { 399 return true; 400 } 401 hashFileIndex++; 402 if (hashFileIndex < TableHash.this.numHashFiles) { 403 mapFileReader.close(); 404 openHashFile(); 405 } else { 406 key = null; 407 hash = null; 408 return false; 409 } 410 } 411 } 412 413 /** 414 * Get the current key 415 * @return the current key or null if there is no current key 416 */ 417 public ImmutableBytesWritable getCurrentKey() { 418 return key; 419 } 420 421 /** 422 * Get the current hash 423 * @return the current hash or null if there is no current hash 424 */ 425 public ImmutableBytesWritable getCurrentHash() { 426 return hash; 427 } 428 429 private void openHashFile() throws IOException { 430 if (mapFileReader != null) { 431 mapFileReader.close(); 432 } 433 Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR); 434 Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex)); 435 mapFileReader = new MapFile.Reader(dataFile, conf); 436 } 437 438 @Override 439 public void close() throws IOException { 440 mapFileReader.close(); 441 } 442 } 443 } 444 445 static boolean isTableStartRow(byte[] row) { 446 return Bytes.equals(HConstants.EMPTY_START_ROW, row); 447 } 448 449 static boolean isTableEndRow(byte[] row) { 450 return Bytes.equals(HConstants.EMPTY_END_ROW, row); 451 } 452 453 public Job createSubmittableJob(String[] args) throws IOException { 454 Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME); 455 generatePartitions(partitionsPath); 456 457 Job job = Job.getInstance(getConf(), 458 getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); 459 Configuration jobConf = job.getConfiguration(); 460 jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); 461 jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps); 462 jobConf.set(HASH_ALGORITHM_CONF_KEY, tableHash.hashAlgorithm); 463 job.setJarByClass(HashTable.class); 464 465 TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), 466 HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 467 468 // use a TotalOrderPartitioner and reducers to group region output into hash files 469 job.setPartitionerClass(TotalOrderPartitioner.class); 470 TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath); 471 job.setReducerClass(Reducer.class); // identity reducer 472 job.setNumReduceTasks(tableHash.numHashFiles); 473 job.setOutputKeyClass(ImmutableBytesWritable.class); 474 job.setOutputValueClass(ImmutableBytesWritable.class); 475 job.setOutputFormatClass(MapFileOutputFormat.class); 476 FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR)); 477 478 return job; 479 } 480 481 private void generatePartitions(Path partitionsPath) throws IOException { 482 Connection connection = ConnectionFactory.createConnection(getConf()); 483 Pair<byte[][], byte[][]> regionKeys = 484 connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys(); 485 connection.close(); 486 487 tableHash.selectPartitions(regionKeys); 488 LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath); 489 490 tableHash.writePartitionFile(getConf(), partitionsPath); 491 } 492 493 static class ResultHasher { 494 private MessageDigest digest; 495 496 private boolean batchStarted = false; 497 private ImmutableBytesWritable batchStartKey; 498 private ImmutableBytesWritable batchHash; 499 private long batchSize = 0; 500 boolean ignoreTimestamps; 501 502 public ResultHasher(String algorithm) { 503 try { 504 digest = MessageDigest.getInstance(algorithm); 505 } catch (NoSuchAlgorithmException e) { 506 throw new IllegalArgumentException("Unsupported hash algorithm: " + algorithm, e); 507 } 508 } 509 510 public void startBatch(ImmutableBytesWritable row) { 511 if (batchStarted) { 512 throw new RuntimeException("Cannot start new batch without finishing existing one."); 513 } 514 batchStarted = true; 515 batchSize = 0; 516 batchStartKey = row; 517 batchHash = null; 518 } 519 520 public void hashResult(Result result) { 521 if (!batchStarted) { 522 throw new RuntimeException("Cannot add to batch that has not been started."); 523 } 524 for (Cell cell : result.rawCells()) { 525 int rowLength = cell.getRowLength(); 526 int familyLength = cell.getFamilyLength(); 527 int qualifierLength = cell.getQualifierLength(); 528 int valueLength = cell.getValueLength(); 529 digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength); 530 digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); 531 digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); 532 533 if (!ignoreTimestamps) { 534 long ts = cell.getTimestamp(); 535 for (int i = 8; i > 0; i--) { 536 digest.update((byte) ts); 537 ts >>>= 8; 538 } 539 } 540 digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength); 541 542 batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength; 543 } 544 } 545 546 public void finishBatch() { 547 if (!batchStarted) { 548 throw new RuntimeException("Cannot finish batch that has not started."); 549 } 550 batchStarted = false; 551 batchHash = new ImmutableBytesWritable(digest.digest()); 552 } 553 554 public boolean isBatchStarted() { 555 return batchStarted; 556 } 557 558 public ImmutableBytesWritable getBatchStartKey() { 559 return batchStartKey; 560 } 561 562 public ImmutableBytesWritable getBatchHash() { 563 return batchHash; 564 } 565 566 public long getBatchSize() { 567 return batchSize; 568 } 569 } 570 571 public static class HashMapper 572 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 573 574 private ResultHasher hasher; 575 private long targetBatchSize; 576 577 private ImmutableBytesWritable currentRow; 578 579 @Override 580 protected void setup(Context context) throws IOException, InterruptedException { 581 Configuration conf = context.getConfiguration(); 582 targetBatchSize = conf.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); 583 hasher = new ResultHasher(conf.get(HASH_ALGORITHM_CONF_KEY, DEFAULT_HASH_ALGORITHM)); 584 hasher.ignoreTimestamps = conf.getBoolean(IGNORE_TIMESTAMPS, false); 585 TableSplit split = (TableSplit) context.getInputSplit(); 586 hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); 587 } 588 589 @Override 590 protected void map(ImmutableBytesWritable key, Result value, Context context) 591 throws IOException, InterruptedException { 592 593 if (currentRow == null || !currentRow.equals(key)) { 594 currentRow = new ImmutableBytesWritable(key); // not immutable 595 596 if (hasher.getBatchSize() >= targetBatchSize) { 597 hasher.finishBatch(); 598 context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); 599 hasher.startBatch(currentRow); 600 } 601 } 602 603 hasher.hashResult(value); 604 } 605 606 @Override 607 protected void cleanup(Context context) throws IOException, InterruptedException { 608 hasher.finishBatch(); 609 context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); 610 } 611 } 612 613 private void writeTempManifestFile() throws IOException { 614 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); 615 FileSystem fs = tempManifestPath.getFileSystem(getConf()); 616 tableHash.writePropertiesFile(fs, tempManifestPath); 617 } 618 619 private void completeManifest() throws IOException { 620 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); 621 Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME); 622 FileSystem fs = tempManifestPath.getFileSystem(getConf()); 623 fs.rename(tempManifestPath, manifestPath); 624 } 625 626 private static final int NUM_ARGS = 2; 627 628 private static void printUsage(final String errorMsg) { 629 if (errorMsg != null && errorMsg.length() > 0) { 630 System.err.println("ERROR: " + errorMsg); 631 System.err.println(); 632 } 633 System.err.println("Usage: HashTable [options] <tablename> <outputpath>"); 634 System.err.println(); 635 System.err.println("Options:"); 636 System.err.println(" batchsize the target amount of bytes to hash in each batch"); 637 System.err.println(" rows are added to the batch until this size is reached"); 638 System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); 639 System.err.println(" numhashfiles the number of hash files to create"); 640 System.err.println(" if set to fewer than number of regions then"); 641 System.err.println(" the job will create this number of reducers"); 642 System.err.println(" (defaults to 1/100 of regions -- at least 1)"); 643 System.err.println(" startrow the start row"); 644 System.err.println(" stoprow the stop row"); 645 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 646 System.err.println(" without endtime means from starttime to forever"); 647 System.err.println(" endtime end of the time range."); 648 System.err.println(" Ignored if no starttime specified."); 649 System.err.println(" scanbatch scanner batch size to support intra row scans"); 650 System.err.println(" versions number of cell versions to include"); 651 System.err.println(" rawScan performs a raw scan (false if omitted)"); 652 System.err.println(" families comma-separated list of families to include"); 653 System.err.println(" ignoreTimestamps if true, ignores cell timestamps"); 654 System.err.println(" when calculating hashes"); 655 System.err.println(" hashAlgorithm MessageDigest algorithm to use for batch hashes"); 656 System.err.println(" examples: MD5, SHA-256, SHA-384, SHA-512"); 657 System.err.println(" (defaults to " + DEFAULT_HASH_ALGORITHM + ")"); 658 System.err.println(); 659 System.err.println("Args:"); 660 System.err.println(" tablename Name of the table to hash"); 661 System.err.println(" outputpath Filesystem path to put the output data"); 662 System.err.println(); 663 System.err.println("Examples:"); 664 System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:"); 665 System.err.println(" $ hbase " 666 + "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50" 667 + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3" 668 + " TestTable /hashes/testTable"); 669 } 670 671 private boolean doCommandLine(final String[] args) { 672 if (args.length < NUM_ARGS) { 673 printUsage(null); 674 return false; 675 } 676 try { 677 678 tableHash.tableName = args[args.length - 2]; 679 destPath = new Path(args[args.length - 1]); 680 681 for (int i = 0; i < args.length - NUM_ARGS; i++) { 682 String cmd = args[i]; 683 if (cmd.equals("-h") || cmd.equals("--help")) { 684 printUsage(null); 685 return false; 686 } 687 688 final String batchSizeArgKey = "--batchsize="; 689 if (cmd.startsWith(batchSizeArgKey)) { 690 tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length())); 691 continue; 692 } 693 694 final String numHashFilesArgKey = "--numhashfiles="; 695 if (cmd.startsWith(numHashFilesArgKey)) { 696 tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length())); 697 continue; 698 } 699 700 final String startRowArgKey = "--startrow="; 701 if (cmd.startsWith(startRowArgKey)) { 702 tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length())); 703 continue; 704 } 705 706 final String stopRowArgKey = "--stoprow="; 707 if (cmd.startsWith(stopRowArgKey)) { 708 tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length())); 709 continue; 710 } 711 712 final String startTimeArgKey = "--starttime="; 713 if (cmd.startsWith(startTimeArgKey)) { 714 tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 715 continue; 716 } 717 718 final String endTimeArgKey = "--endtime="; 719 if (cmd.startsWith(endTimeArgKey)) { 720 tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 721 continue; 722 } 723 724 final String scanBatchArgKey = "--scanbatch="; 725 if (cmd.startsWith(scanBatchArgKey)) { 726 tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length())); 727 continue; 728 } 729 730 final String versionsArgKey = "--versions="; 731 if (cmd.startsWith(versionsArgKey)) { 732 tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 733 continue; 734 } 735 736 final String rawScanArgKey = "--rawScan="; 737 if (cmd.startsWith(rawScanArgKey)) { 738 tableHash.rawScan = Boolean.parseBoolean(cmd.substring(rawScanArgKey.length())); 739 continue; 740 } 741 742 final String familiesArgKey = "--families="; 743 if (cmd.startsWith(familiesArgKey)) { 744 tableHash.families = cmd.substring(familiesArgKey.length()); 745 continue; 746 } 747 748 final String ignoreTimestampsKey = "--ignoreTimestamps="; 749 if (cmd.startsWith(ignoreTimestampsKey)) { 750 tableHash.ignoreTimestamps = 751 Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length())); 752 continue; 753 } 754 755 final String hashAlgorithmKey = "--hashAlgorithm="; 756 if (cmd.startsWith(hashAlgorithmKey)) { 757 tableHash.hashAlgorithm = cmd.substring(hashAlgorithmKey.length()); 758 continue; 759 } 760 761 printUsage("Invalid argument '" + cmd + "'"); 762 return false; 763 } 764 if ( 765 (tableHash.startTime != 0 || tableHash.endTime != 0) 766 && (tableHash.startTime >= tableHash.endTime) 767 ) { 768 printUsage("Invalid time range filter: starttime=" + tableHash.startTime + " >= endtime=" 769 + tableHash.endTime); 770 return false; 771 } 772 773 try { 774 MessageDigest.getInstance(tableHash.hashAlgorithm); 775 } catch (NoSuchAlgorithmException e) { 776 printUsage("Unsupported hash algorithm: " + tableHash.hashAlgorithm); 777 return false; 778 } 779 780 } catch (Exception e) { 781 LOG.error("Failed to parse commandLine arguments", e); 782 printUsage("Can't start because " + e.getMessage()); 783 return false; 784 } 785 return true; 786 } 787 788 /** 789 * Main entry point. 790 */ 791 public static void main(String[] args) throws Exception { 792 int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args); 793 System.exit(ret); 794 } 795 796 @Override 797 public int run(String[] args) throws Exception { 798 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 799 if (!doCommandLine(otherArgs)) { 800 return 1; 801 } 802 803 Job job = createSubmittableJob(otherArgs); 804 writeTempManifestFile(); 805 if (!job.waitForCompletion(true)) { 806 LOG.info("Map-reduce job failed!"); 807 return 1; 808 } 809 completeManifest(); 810 return 0; 811 } 812 813}