001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.io.InputStreamReader; 022import java.io.OutputStreamWriter; 023import java.security.MessageDigest; 024import java.security.NoSuchAlgorithmException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Properties; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.io.MapFile; 046import org.apache.hadoop.io.NullWritable; 047import org.apache.hadoop.io.SequenceFile; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.mapreduce.Reducer; 050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 051import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; 052import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 053import org.apache.hadoop.util.GenericOptionsParser; 054import org.apache.hadoop.util.Tool; 055import org.apache.hadoop.util.ToolRunner; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.base.Charsets; 061import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 062import org.apache.hbase.thirdparty.com.google.common.collect.Ordering; 063 064@InterfaceAudience.Private 065public class HashTable extends Configured implements Tool { 066 067 private static final Logger LOG = LoggerFactory.getLogger(HashTable.class); 068 069 private static final int DEFAULT_BATCH_SIZE = 8000; 070 071 private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size"; 072 final static String PARTITIONS_FILE_NAME = "partitions"; 073 final static String MANIFEST_FILE_NAME = "manifest"; 074 final static String HASH_DATA_DIR = "hashes"; 075 final static String OUTPUT_DATA_FILE_PREFIX = "part-r-"; 076 final static String IGNORE_TIMESTAMPS = "ignoreTimestamps"; 077 private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp"; 078 079 TableHash tableHash = new TableHash(); 080 Path destPath; 081 082 public HashTable(Configuration conf) { 083 super(conf); 084 } 085 086 public static class TableHash { 087 088 Path hashDir; 089 090 String tableName; 091 String families = null; 092 long batchSize = DEFAULT_BATCH_SIZE; 093 int numHashFiles = 0; 094 byte[] startRow = HConstants.EMPTY_START_ROW; 095 byte[] stopRow = HConstants.EMPTY_END_ROW; 096 int scanBatch = 0; 097 int versions = -1; 098 long startTime = 0; 099 long endTime = 0; 100 boolean ignoreTimestamps; 101 boolean rawScan; 102 103 List<ImmutableBytesWritable> partitions; 104 105 public static TableHash read(Configuration conf, Path hashDir) throws IOException { 106 TableHash tableHash = new TableHash(); 107 FileSystem fs = hashDir.getFileSystem(conf); 108 tableHash.hashDir = hashDir; 109 tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME)); 110 tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME)); 111 return tableHash; 112 } 113 114 void writePropertiesFile(FileSystem fs, Path path) throws IOException { 115 Properties p = new Properties(); 116 p.setProperty("table", tableName); 117 if (families != null) { 118 p.setProperty("columnFamilies", families); 119 } 120 p.setProperty("targetBatchSize", Long.toString(batchSize)); 121 p.setProperty("numHashFiles", Integer.toString(numHashFiles)); 122 if (!isTableStartRow(startRow)) { 123 p.setProperty("startRowHex", Bytes.toHex(startRow)); 124 } 125 if (!isTableEndRow(stopRow)) { 126 p.setProperty("stopRowHex", Bytes.toHex(stopRow)); 127 } 128 if (scanBatch > 0) { 129 p.setProperty("scanBatch", Integer.toString(scanBatch)); 130 } 131 if (versions >= 0) { 132 p.setProperty("versions", Integer.toString(versions)); 133 } 134 if (startTime != 0) { 135 p.setProperty("startTimestamp", Long.toString(startTime)); 136 } 137 if (endTime != 0) { 138 p.setProperty("endTimestamp", Long.toString(endTime)); 139 } 140 p.setProperty("rawScan", Boolean.toString(rawScan)); 141 142 try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) { 143 p.store(osw, null); 144 } 145 } 146 147 void readPropertiesFile(FileSystem fs, Path path) throws IOException { 148 Properties p = new Properties(); 149 try (FSDataInputStream in = fs.open(path)) { 150 try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) { 151 p.load(isr); 152 } 153 } 154 tableName = p.getProperty("table"); 155 families = p.getProperty("columnFamilies"); 156 batchSize = Long.parseLong(p.getProperty("targetBatchSize")); 157 numHashFiles = Integer.parseInt(p.getProperty("numHashFiles")); 158 159 String startRowHex = p.getProperty("startRowHex"); 160 if (startRowHex != null) { 161 startRow = Bytes.fromHex(startRowHex); 162 } 163 String stopRowHex = p.getProperty("stopRowHex"); 164 if (stopRowHex != null) { 165 stopRow = Bytes.fromHex(stopRowHex); 166 } 167 168 String scanBatchString = p.getProperty("scanBatch"); 169 if (scanBatchString != null) { 170 scanBatch = Integer.parseInt(scanBatchString); 171 } 172 173 String versionString = p.getProperty("versions"); 174 if (versionString != null) { 175 versions = Integer.parseInt(versionString); 176 } 177 178 String rawScanString = p.getProperty("rawScan"); 179 if (rawScanString != null) { 180 rawScan = Boolean.parseBoolean(rawScanString); 181 } 182 183 String startTimeString = p.getProperty("startTimestamp"); 184 if (startTimeString != null) { 185 startTime = Long.parseLong(startTimeString); 186 } 187 188 String endTimeString = p.getProperty("endTimestamp"); 189 if (endTimeString != null) { 190 endTime = Long.parseLong(endTimeString); 191 } 192 } 193 194 Scan initScan() throws IOException { 195 Scan scan = new Scan(); 196 scan.setCacheBlocks(false); 197 if (startTime != 0 || endTime != 0) { 198 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 199 } 200 if (scanBatch > 0) { 201 scan.setBatch(scanBatch); 202 } 203 if (versions >= 0) { 204 scan.readVersions(versions); 205 } 206 if (!isTableStartRow(startRow)) { 207 scan.withStartRow(startRow); 208 } 209 if (!isTableEndRow(stopRow)) { 210 scan.withStopRow(stopRow); 211 } 212 if (families != null) { 213 for (String fam : families.split(",")) { 214 scan.addFamily(Bytes.toBytes(fam)); 215 } 216 } 217 scan.setRaw(rawScan); 218 219 return scan; 220 } 221 222 /** 223 * Choose partitions between row ranges to hash to a single output file Selects region 224 * boundaries that fall within the scan range, and groups them into the desired number of 225 * partitions. 226 */ 227 void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) { 228 List<byte[]> startKeys = new ArrayList<>(); 229 for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) { 230 byte[] regionStartKey = regionStartEndKeys.getFirst()[i]; 231 byte[] regionEndKey = regionStartEndKeys.getSecond()[i]; 232 233 // if scan begins after this region, or starts before this region, then drop this region 234 // in other words: 235 // IF (scan begins before the end of this region 236 // AND scan ends before the start of this region) 237 // THEN include this region 238 if ( 239 (isTableStartRow(startRow) || isTableEndRow(regionEndKey) 240 || Bytes.compareTo(startRow, regionEndKey) < 0) 241 && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey) 242 || Bytes.compareTo(stopRow, regionStartKey) > 0) 243 ) { 244 startKeys.add(regionStartKey); 245 } 246 } 247 248 int numRegions = startKeys.size(); 249 if (numHashFiles == 0) { 250 numHashFiles = numRegions / 100; 251 } 252 if (numHashFiles == 0) { 253 numHashFiles = 1; 254 } 255 if (numHashFiles > numRegions) { 256 // can't partition within regions 257 numHashFiles = numRegions; 258 } 259 260 // choose a subset of start keys to group regions into ranges 261 partitions = new ArrayList<>(numHashFiles - 1); 262 // skip the first start key as it is not a partition between ranges. 263 for (long i = 1; i < numHashFiles; i++) { 264 int splitIndex = (int) (numRegions * i / numHashFiles); 265 partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex))); 266 } 267 } 268 269 void writePartitionFile(Configuration conf, Path path) throws IOException { 270 FileSystem fs = path.getFileSystem(conf); 271 @SuppressWarnings("deprecation") 272 SequenceFile.Writer writer = 273 SequenceFile.createWriter(fs, conf, path, ImmutableBytesWritable.class, NullWritable.class); 274 275 for (int i = 0; i < partitions.size(); i++) { 276 writer.append(partitions.get(i), NullWritable.get()); 277 } 278 writer.close(); 279 } 280 281 private void readPartitionFile(FileSystem fs, Configuration conf, Path path) 282 throws IOException { 283 @SuppressWarnings("deprecation") 284 SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); 285 ImmutableBytesWritable key = new ImmutableBytesWritable(); 286 partitions = new ArrayList<>(); 287 while (reader.next(key)) { 288 partitions.add(new ImmutableBytesWritable(key.copyBytes())); 289 } 290 reader.close(); 291 292 if (!Ordering.natural().isOrdered(partitions)) { 293 throw new IOException("Partitions are not ordered!"); 294 } 295 } 296 297 @Override 298 public String toString() { 299 StringBuilder sb = new StringBuilder(); 300 sb.append("tableName=").append(tableName); 301 if (families != null) { 302 sb.append(", families=").append(families); 303 } 304 sb.append(", batchSize=").append(batchSize); 305 sb.append(", numHashFiles=").append(numHashFiles); 306 if (!isTableStartRow(startRow)) { 307 sb.append(", startRowHex=").append(Bytes.toHex(startRow)); 308 } 309 if (!isTableEndRow(stopRow)) { 310 sb.append(", stopRowHex=").append(Bytes.toHex(stopRow)); 311 } 312 if (scanBatch >= 0) { 313 sb.append(", scanBatch=").append(scanBatch); 314 } 315 if (versions >= 0) { 316 sb.append(", versions=").append(versions); 317 } 318 sb.append(", rawScan=").append(rawScan); 319 if (startTime != 0) { 320 sb.append("startTime=").append(startTime); 321 } 322 if (endTime != 0) { 323 sb.append("endTime=").append(endTime); 324 } 325 return sb.toString(); 326 } 327 328 static String getDataFileName(int hashFileIndex) { 329 return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex); 330 } 331 332 /** 333 * Open a TableHash.Reader starting at the first hash at or after the given key. 334 */ 335 public Reader newReader(Configuration conf, ImmutableBytesWritable startKey) 336 throws IOException { 337 return new Reader(conf, startKey); 338 } 339 340 public class Reader implements java.io.Closeable { 341 private final Configuration conf; 342 343 private int hashFileIndex; 344 private MapFile.Reader mapFileReader; 345 346 private boolean cachedNext; 347 private ImmutableBytesWritable key; 348 private ImmutableBytesWritable hash; 349 350 Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException { 351 this.conf = conf; 352 int partitionIndex = Collections.binarySearch(partitions, startKey); 353 if (partitionIndex >= 0) { 354 // if the key is equal to a partition, then go the file after that partition 355 hashFileIndex = partitionIndex + 1; 356 } else { 357 // if the key is between partitions, then go to the file between those partitions 358 hashFileIndex = -1 - partitionIndex; 359 } 360 openHashFile(); 361 362 // MapFile's don't make it easy to seek() so that the subsequent next() returns 363 // the desired key/value pair. So we cache it for the first call of next(). 364 hash = new ImmutableBytesWritable(); 365 key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash); 366 if (key == null) { 367 cachedNext = false; 368 hash = null; 369 } else { 370 cachedNext = true; 371 } 372 } 373 374 /** 375 * Read the next key/hash pair. Returns true if such a pair exists and false when at the end 376 * of the data. 377 */ 378 public boolean next() throws IOException { 379 if (cachedNext) { 380 cachedNext = false; 381 return true; 382 } 383 key = new ImmutableBytesWritable(); 384 hash = new ImmutableBytesWritable(); 385 while (true) { 386 boolean hasNext = mapFileReader.next(key, hash); 387 if (hasNext) { 388 return true; 389 } 390 hashFileIndex++; 391 if (hashFileIndex < TableHash.this.numHashFiles) { 392 mapFileReader.close(); 393 openHashFile(); 394 } else { 395 key = null; 396 hash = null; 397 return false; 398 } 399 } 400 } 401 402 /** 403 * Get the current key 404 * @return the current key or null if there is no current key 405 */ 406 public ImmutableBytesWritable getCurrentKey() { 407 return key; 408 } 409 410 /** 411 * Get the current hash 412 * @return the current hash or null if there is no current hash 413 */ 414 public ImmutableBytesWritable getCurrentHash() { 415 return hash; 416 } 417 418 private void openHashFile() throws IOException { 419 if (mapFileReader != null) { 420 mapFileReader.close(); 421 } 422 Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR); 423 Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex)); 424 mapFileReader = new MapFile.Reader(dataFile, conf); 425 } 426 427 @Override 428 public void close() throws IOException { 429 mapFileReader.close(); 430 } 431 } 432 } 433 434 static boolean isTableStartRow(byte[] row) { 435 return Bytes.equals(HConstants.EMPTY_START_ROW, row); 436 } 437 438 static boolean isTableEndRow(byte[] row) { 439 return Bytes.equals(HConstants.EMPTY_END_ROW, row); 440 } 441 442 public Job createSubmittableJob(String[] args) throws IOException { 443 Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME); 444 generatePartitions(partitionsPath); 445 446 Job job = Job.getInstance(getConf(), 447 getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); 448 Configuration jobConf = job.getConfiguration(); 449 jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); 450 jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps); 451 job.setJarByClass(HashTable.class); 452 453 TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), 454 HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 455 456 // use a TotalOrderPartitioner and reducers to group region output into hash files 457 job.setPartitionerClass(TotalOrderPartitioner.class); 458 TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath); 459 job.setReducerClass(Reducer.class); // identity reducer 460 job.setNumReduceTasks(tableHash.numHashFiles); 461 job.setOutputKeyClass(ImmutableBytesWritable.class); 462 job.setOutputValueClass(ImmutableBytesWritable.class); 463 job.setOutputFormatClass(MapFileOutputFormat.class); 464 FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR)); 465 466 return job; 467 } 468 469 private void generatePartitions(Path partitionsPath) throws IOException { 470 Connection connection = ConnectionFactory.createConnection(getConf()); 471 Pair<byte[][], byte[][]> regionKeys = 472 connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys(); 473 connection.close(); 474 475 tableHash.selectPartitions(regionKeys); 476 LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath); 477 478 tableHash.writePartitionFile(getConf(), partitionsPath); 479 } 480 481 static class ResultHasher { 482 private MessageDigest digest; 483 484 private boolean batchStarted = false; 485 private ImmutableBytesWritable batchStartKey; 486 private ImmutableBytesWritable batchHash; 487 private long batchSize = 0; 488 boolean ignoreTimestamps; 489 490 public ResultHasher() { 491 try { 492 digest = MessageDigest.getInstance("MD5"); 493 } catch (NoSuchAlgorithmException e) { 494 Throwables.propagate(e); 495 } 496 } 497 498 public void startBatch(ImmutableBytesWritable row) { 499 if (batchStarted) { 500 throw new RuntimeException("Cannot start new batch without finishing existing one."); 501 } 502 batchStarted = true; 503 batchSize = 0; 504 batchStartKey = row; 505 batchHash = null; 506 } 507 508 public void hashResult(Result result) { 509 if (!batchStarted) { 510 throw new RuntimeException("Cannot add to batch that has not been started."); 511 } 512 for (Cell cell : result.rawCells()) { 513 int rowLength = cell.getRowLength(); 514 int familyLength = cell.getFamilyLength(); 515 int qualifierLength = cell.getQualifierLength(); 516 int valueLength = cell.getValueLength(); 517 digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength); 518 digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); 519 digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); 520 521 if (!ignoreTimestamps) { 522 long ts = cell.getTimestamp(); 523 for (int i = 8; i > 0; i--) { 524 digest.update((byte) ts); 525 ts >>>= 8; 526 } 527 } 528 digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength); 529 530 batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength; 531 } 532 } 533 534 public void finishBatch() { 535 if (!batchStarted) { 536 throw new RuntimeException("Cannot finish batch that has not started."); 537 } 538 batchStarted = false; 539 batchHash = new ImmutableBytesWritable(digest.digest()); 540 } 541 542 public boolean isBatchStarted() { 543 return batchStarted; 544 } 545 546 public ImmutableBytesWritable getBatchStartKey() { 547 return batchStartKey; 548 } 549 550 public ImmutableBytesWritable getBatchHash() { 551 return batchHash; 552 } 553 554 public long getBatchSize() { 555 return batchSize; 556 } 557 } 558 559 public static class HashMapper 560 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 561 562 private ResultHasher hasher; 563 private long targetBatchSize; 564 565 private ImmutableBytesWritable currentRow; 566 567 @Override 568 protected void setup(Context context) throws IOException, InterruptedException { 569 targetBatchSize = 570 context.getConfiguration().getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); 571 hasher = new ResultHasher(); 572 hasher.ignoreTimestamps = context.getConfiguration().getBoolean(IGNORE_TIMESTAMPS, false); 573 TableSplit split = (TableSplit) context.getInputSplit(); 574 hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); 575 } 576 577 @Override 578 protected void map(ImmutableBytesWritable key, Result value, Context context) 579 throws IOException, InterruptedException { 580 581 if (currentRow == null || !currentRow.equals(key)) { 582 currentRow = new ImmutableBytesWritable(key); // not immutable 583 584 if (hasher.getBatchSize() >= targetBatchSize) { 585 hasher.finishBatch(); 586 context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); 587 hasher.startBatch(currentRow); 588 } 589 } 590 591 hasher.hashResult(value); 592 } 593 594 @Override 595 protected void cleanup(Context context) throws IOException, InterruptedException { 596 hasher.finishBatch(); 597 context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); 598 } 599 } 600 601 private void writeTempManifestFile() throws IOException { 602 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); 603 FileSystem fs = tempManifestPath.getFileSystem(getConf()); 604 tableHash.writePropertiesFile(fs, tempManifestPath); 605 } 606 607 private void completeManifest() throws IOException { 608 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); 609 Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME); 610 FileSystem fs = tempManifestPath.getFileSystem(getConf()); 611 fs.rename(tempManifestPath, manifestPath); 612 } 613 614 private static final int NUM_ARGS = 2; 615 616 private static void printUsage(final String errorMsg) { 617 if (errorMsg != null && errorMsg.length() > 0) { 618 System.err.println("ERROR: " + errorMsg); 619 System.err.println(); 620 } 621 System.err.println("Usage: HashTable [options] <tablename> <outputpath>"); 622 System.err.println(); 623 System.err.println("Options:"); 624 System.err.println(" batchsize the target amount of bytes to hash in each batch"); 625 System.err.println(" rows are added to the batch until this size is reached"); 626 System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); 627 System.err.println(" numhashfiles the number of hash files to create"); 628 System.err.println(" if set to fewer than number of regions then"); 629 System.err.println(" the job will create this number of reducers"); 630 System.err.println(" (defaults to 1/100 of regions -- at least 1)"); 631 System.err.println(" startrow the start row"); 632 System.err.println(" stoprow the stop row"); 633 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 634 System.err.println(" without endtime means from starttime to forever"); 635 System.err.println(" endtime end of the time range."); 636 System.err.println(" Ignored if no starttime specified."); 637 System.err.println(" scanbatch scanner batch size to support intra row scans"); 638 System.err.println(" versions number of cell versions to include"); 639 System.err.println(" rawScan performs a raw scan (false if omitted)"); 640 System.err.println(" families comma-separated list of families to include"); 641 System.err.println(" ignoreTimestamps if true, ignores cell timestamps"); 642 System.err.println(" when calculating hashes"); 643 System.err.println(); 644 System.err.println("Args:"); 645 System.err.println(" tablename Name of the table to hash"); 646 System.err.println(" outputpath Filesystem path to put the output data"); 647 System.err.println(); 648 System.err.println("Examples:"); 649 System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:"); 650 System.err.println(" $ hbase " 651 + "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50" 652 + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3" 653 + " TestTable /hashes/testTable"); 654 } 655 656 private boolean doCommandLine(final String[] args) { 657 if (args.length < NUM_ARGS) { 658 printUsage(null); 659 return false; 660 } 661 try { 662 663 tableHash.tableName = args[args.length - 2]; 664 destPath = new Path(args[args.length - 1]); 665 666 for (int i = 0; i < args.length - NUM_ARGS; i++) { 667 String cmd = args[i]; 668 if (cmd.equals("-h") || cmd.startsWith("--h")) { 669 printUsage(null); 670 return false; 671 } 672 673 final String batchSizeArgKey = "--batchsize="; 674 if (cmd.startsWith(batchSizeArgKey)) { 675 tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length())); 676 continue; 677 } 678 679 final String numHashFilesArgKey = "--numhashfiles="; 680 if (cmd.startsWith(numHashFilesArgKey)) { 681 tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length())); 682 continue; 683 } 684 685 final String startRowArgKey = "--startrow="; 686 if (cmd.startsWith(startRowArgKey)) { 687 tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length())); 688 continue; 689 } 690 691 final String stopRowArgKey = "--stoprow="; 692 if (cmd.startsWith(stopRowArgKey)) { 693 tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length())); 694 continue; 695 } 696 697 final String startTimeArgKey = "--starttime="; 698 if (cmd.startsWith(startTimeArgKey)) { 699 tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 700 continue; 701 } 702 703 final String endTimeArgKey = "--endtime="; 704 if (cmd.startsWith(endTimeArgKey)) { 705 tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 706 continue; 707 } 708 709 final String scanBatchArgKey = "--scanbatch="; 710 if (cmd.startsWith(scanBatchArgKey)) { 711 tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length())); 712 continue; 713 } 714 715 final String versionsArgKey = "--versions="; 716 if (cmd.startsWith(versionsArgKey)) { 717 tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 718 continue; 719 } 720 721 final String rawScanArgKey = "--rawScan="; 722 if (cmd.startsWith(rawScanArgKey)) { 723 tableHash.rawScan = Boolean.parseBoolean(cmd.substring(rawScanArgKey.length())); 724 continue; 725 } 726 727 final String familiesArgKey = "--families="; 728 if (cmd.startsWith(familiesArgKey)) { 729 tableHash.families = cmd.substring(familiesArgKey.length()); 730 continue; 731 } 732 733 final String ignoreTimestampsKey = "--ignoreTimestamps="; 734 if (cmd.startsWith(ignoreTimestampsKey)) { 735 tableHash.ignoreTimestamps = 736 Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length())); 737 continue; 738 } 739 740 printUsage("Invalid argument '" + cmd + "'"); 741 return false; 742 } 743 if ( 744 (tableHash.startTime != 0 || tableHash.endTime != 0) 745 && (tableHash.startTime >= tableHash.endTime) 746 ) { 747 printUsage("Invalid time range filter: starttime=" + tableHash.startTime + " >= endtime=" 748 + tableHash.endTime); 749 return false; 750 } 751 752 } catch (Exception e) { 753 LOG.error("Failed to parse commandLine arguments", e); 754 printUsage("Can't start because " + e.getMessage()); 755 return false; 756 } 757 return true; 758 } 759 760 /** 761 * Main entry point. 762 */ 763 public static void main(String[] args) throws Exception { 764 int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args); 765 System.exit(ret); 766 } 767 768 @Override 769 public int run(String[] args) throws Exception { 770 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 771 if (!doCommandLine(otherArgs)) { 772 return 1; 773 } 774 775 Job job = createSubmittableJob(otherArgs); 776 writeTempManifestFile(); 777 if (!job.waitForCompletion(true)) { 778 LOG.info("Map-reduce job failed!"); 779 return 1; 780 } 781 completeManifest(); 782 return 0; 783 } 784 785}