001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.io.InputStreamReader; 022import java.io.OutputStreamWriter; 023import java.security.MessageDigest; 024import java.security.NoSuchAlgorithmException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Properties; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.io.MapFile; 046import org.apache.hadoop.io.NullWritable; 047import org.apache.hadoop.io.SequenceFile; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.mapreduce.Reducer; 050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 051import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; 052import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 053import org.apache.hadoop.util.GenericOptionsParser; 054import org.apache.hadoop.util.Tool; 055import org.apache.hadoop.util.ToolRunner; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.base.Charsets; 061import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 062import org.apache.hbase.thirdparty.com.google.common.collect.Ordering; 063 064@InterfaceAudience.Private 065public class HashTable extends Configured implements Tool { 066 067 private static final Logger LOG = LoggerFactory.getLogger(HashTable.class); 068 069 private static final int DEFAULT_BATCH_SIZE = 8000; 070 071 private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size"; 072 final static String PARTITIONS_FILE_NAME = "partitions"; 073 final static String MANIFEST_FILE_NAME = "manifest"; 074 final static String HASH_DATA_DIR = "hashes"; 075 final static String OUTPUT_DATA_FILE_PREFIX = "part-r-"; 076 private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp"; 077 078 TableHash tableHash = new TableHash(); 079 Path destPath; 080 081 public HashTable(Configuration conf) { 082 super(conf); 083 } 084 085 public static class TableHash { 086 087 Path hashDir; 088 089 String tableName; 090 String families = null; 091 long batchSize = DEFAULT_BATCH_SIZE; 092 int numHashFiles = 0; 093 byte[] startRow = HConstants.EMPTY_START_ROW; 094 byte[] stopRow = HConstants.EMPTY_END_ROW; 095 int scanBatch = 0; 096 int versions = -1; 097 long startTime = 0; 098 long endTime = 0; 099 100 List<ImmutableBytesWritable> partitions; 101 102 public static TableHash read(Configuration conf, Path hashDir) throws IOException { 103 TableHash tableHash = new TableHash(); 104 FileSystem fs = hashDir.getFileSystem(conf); 105 tableHash.hashDir = hashDir; 106 tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME)); 107 tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME)); 108 return tableHash; 109 } 110 111 void writePropertiesFile(FileSystem fs, Path path) throws IOException { 112 Properties p = new Properties(); 113 p.setProperty("table", tableName); 114 if (families != null) { 115 p.setProperty("columnFamilies", families); 116 } 117 p.setProperty("targetBatchSize", Long.toString(batchSize)); 118 p.setProperty("numHashFiles", Integer.toString(numHashFiles)); 119 if (!isTableStartRow(startRow)) { 120 p.setProperty("startRowHex", Bytes.toHex(startRow)); 121 } 122 if (!isTableEndRow(stopRow)) { 123 p.setProperty("stopRowHex", Bytes.toHex(stopRow)); 124 } 125 if (scanBatch > 0) { 126 p.setProperty("scanBatch", Integer.toString(scanBatch)); 127 } 128 if (versions >= 0) { 129 p.setProperty("versions", Integer.toString(versions)); 130 } 131 if (startTime != 0) { 132 p.setProperty("startTimestamp", Long.toString(startTime)); 133 } 134 if (endTime != 0) { 135 p.setProperty("endTimestamp", Long.toString(endTime)); 136 } 137 138 try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) { 139 p.store(osw, null); 140 } 141 } 142 143 void readPropertiesFile(FileSystem fs, Path path) throws IOException { 144 Properties p = new Properties(); 145 try (FSDataInputStream in = fs.open(path)) { 146 try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) { 147 p.load(isr); 148 } 149 } 150 tableName = p.getProperty("table"); 151 families = p.getProperty("columnFamilies"); 152 batchSize = Long.parseLong(p.getProperty("targetBatchSize")); 153 numHashFiles = Integer.parseInt(p.getProperty("numHashFiles")); 154 155 String startRowHex = p.getProperty("startRowHex"); 156 if (startRowHex != null) { 157 startRow = Bytes.fromHex(startRowHex); 158 } 159 String stopRowHex = p.getProperty("stopRowHex"); 160 if (stopRowHex != null) { 161 stopRow = Bytes.fromHex(stopRowHex); 162 } 163 164 String scanBatchString = p.getProperty("scanBatch"); 165 if (scanBatchString != null) { 166 scanBatch = Integer.parseInt(scanBatchString); 167 } 168 169 String versionString = p.getProperty("versions"); 170 if (versionString != null) { 171 versions = Integer.parseInt(versionString); 172 } 173 174 String startTimeString = p.getProperty("startTimestamp"); 175 if (startTimeString != null) { 176 startTime = Long.parseLong(startTimeString); 177 } 178 179 String endTimeString = p.getProperty("endTimestamp"); 180 if (endTimeString != null) { 181 endTime = Long.parseLong(endTimeString); 182 } 183 } 184 185 Scan initScan() throws IOException { 186 Scan scan = new Scan(); 187 scan.setCacheBlocks(false); 188 if (startTime != 0 || endTime != 0) { 189 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 190 } 191 if (scanBatch > 0) { 192 scan.setBatch(scanBatch); 193 } 194 if (versions >= 0) { 195 scan.setMaxVersions(versions); 196 } 197 if (!isTableStartRow(startRow)) { 198 scan.setStartRow(startRow); 199 } 200 if (!isTableEndRow(stopRow)) { 201 scan.setStopRow(stopRow); 202 } 203 if(families != null) { 204 for(String fam : families.split(",")) { 205 scan.addFamily(Bytes.toBytes(fam)); 206 } 207 } 208 return scan; 209 } 210 211 /** 212 * Choose partitions between row ranges to hash to a single output file 213 * Selects region boundaries that fall within the scan range, and groups them 214 * into the desired number of partitions. 215 */ 216 void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) { 217 List<byte[]> startKeys = new ArrayList<>(); 218 for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) { 219 byte[] regionStartKey = regionStartEndKeys.getFirst()[i]; 220 byte[] regionEndKey = regionStartEndKeys.getSecond()[i]; 221 222 // if scan begins after this region, or starts before this region, then drop this region 223 // in other words: 224 // IF (scan begins before the end of this region 225 // AND scan ends before the start of this region) 226 // THEN include this region 227 if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey) 228 || Bytes.compareTo(startRow, regionEndKey) < 0) 229 && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey) 230 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { 231 startKeys.add(regionStartKey); 232 } 233 } 234 235 int numRegions = startKeys.size(); 236 if (numHashFiles == 0) { 237 numHashFiles = numRegions / 100; 238 } 239 if (numHashFiles == 0) { 240 numHashFiles = 1; 241 } 242 if (numHashFiles > numRegions) { 243 // can't partition within regions 244 numHashFiles = numRegions; 245 } 246 247 // choose a subset of start keys to group regions into ranges 248 partitions = new ArrayList<>(numHashFiles - 1); 249 // skip the first start key as it is not a partition between ranges. 250 for (long i = 1; i < numHashFiles; i++) { 251 int splitIndex = (int) (numRegions * i / numHashFiles); 252 partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex))); 253 } 254 } 255 256 void writePartitionFile(Configuration conf, Path path) throws IOException { 257 FileSystem fs = path.getFileSystem(conf); 258 @SuppressWarnings("deprecation") 259 SequenceFile.Writer writer = SequenceFile.createWriter( 260 fs, conf, path, ImmutableBytesWritable.class, NullWritable.class); 261 262 for (int i = 0; i < partitions.size(); i++) { 263 writer.append(partitions.get(i), NullWritable.get()); 264 } 265 writer.close(); 266 } 267 268 private void readPartitionFile(FileSystem fs, Configuration conf, Path path) 269 throws IOException { 270 @SuppressWarnings("deprecation") 271 SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); 272 ImmutableBytesWritable key = new ImmutableBytesWritable(); 273 partitions = new ArrayList<>(); 274 while (reader.next(key)) { 275 partitions.add(new ImmutableBytesWritable(key.copyBytes())); 276 } 277 reader.close(); 278 279 if (!Ordering.natural().isOrdered(partitions)) { 280 throw new IOException("Partitions are not ordered!"); 281 } 282 } 283 284 @Override 285 public String toString() { 286 StringBuilder sb = new StringBuilder(); 287 sb.append("tableName=").append(tableName); 288 if (families != null) { 289 sb.append(", families=").append(families); 290 } 291 sb.append(", batchSize=").append(batchSize); 292 sb.append(", numHashFiles=").append(numHashFiles); 293 if (!isTableStartRow(startRow)) { 294 sb.append(", startRowHex=").append(Bytes.toHex(startRow)); 295 } 296 if (!isTableEndRow(stopRow)) { 297 sb.append(", stopRowHex=").append(Bytes.toHex(stopRow)); 298 } 299 if (scanBatch >= 0) { 300 sb.append(", scanBatch=").append(scanBatch); 301 } 302 if (versions >= 0) { 303 sb.append(", versions=").append(versions); 304 } 305 if (startTime != 0) { 306 sb.append("startTime=").append(startTime); 307 } 308 if (endTime != 0) { 309 sb.append("endTime=").append(endTime); 310 } 311 return sb.toString(); 312 } 313 314 static String getDataFileName(int hashFileIndex) { 315 return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex); 316 } 317 318 /** 319 * Open a TableHash.Reader starting at the first hash at or after the given key. 320 * @throws IOException 321 */ 322 public Reader newReader(Configuration conf, ImmutableBytesWritable startKey) 323 throws IOException { 324 return new Reader(conf, startKey); 325 } 326 327 public class Reader implements java.io.Closeable { 328 private final Configuration conf; 329 330 private int hashFileIndex; 331 private MapFile.Reader mapFileReader; 332 333 private boolean cachedNext; 334 private ImmutableBytesWritable key; 335 private ImmutableBytesWritable hash; 336 337 Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException { 338 this.conf = conf; 339 int partitionIndex = Collections.binarySearch(partitions, startKey); 340 if (partitionIndex >= 0) { 341 // if the key is equal to a partition, then go the file after that partition 342 hashFileIndex = partitionIndex+1; 343 } else { 344 // if the key is between partitions, then go to the file between those partitions 345 hashFileIndex = -1-partitionIndex; 346 } 347 openHashFile(); 348 349 // MapFile's don't make it easy to seek() so that the subsequent next() returns 350 // the desired key/value pair. So we cache it for the first call of next(). 351 hash = new ImmutableBytesWritable(); 352 key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash); 353 if (key == null) { 354 cachedNext = false; 355 hash = null; 356 } else { 357 cachedNext = true; 358 } 359 } 360 361 /** 362 * Read the next key/hash pair. 363 * Returns true if such a pair exists and false when at the end of the data. 364 */ 365 public boolean next() throws IOException { 366 if (cachedNext) { 367 cachedNext = false; 368 return true; 369 } 370 key = new ImmutableBytesWritable(); 371 hash = new ImmutableBytesWritable(); 372 while (true) { 373 boolean hasNext = mapFileReader.next(key, hash); 374 if (hasNext) { 375 return true; 376 } 377 hashFileIndex++; 378 if (hashFileIndex < TableHash.this.numHashFiles) { 379 mapFileReader.close(); 380 openHashFile(); 381 } else { 382 key = null; 383 hash = null; 384 return false; 385 } 386 } 387 } 388 389 /** 390 * Get the current key 391 * @return the current key or null if there is no current key 392 */ 393 public ImmutableBytesWritable getCurrentKey() { 394 return key; 395 } 396 397 /** 398 * Get the current hash 399 * @return the current hash or null if there is no current hash 400 */ 401 public ImmutableBytesWritable getCurrentHash() { 402 return hash; 403 } 404 405 private void openHashFile() throws IOException { 406 if (mapFileReader != null) { 407 mapFileReader.close(); 408 } 409 Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR); 410 Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex)); 411 mapFileReader = new MapFile.Reader(dataFile, conf); 412 } 413 414 @Override 415 public void close() throws IOException { 416 mapFileReader.close(); 417 } 418 } 419 } 420 421 static boolean isTableStartRow(byte[] row) { 422 return Bytes.equals(HConstants.EMPTY_START_ROW, row); 423 } 424 425 static boolean isTableEndRow(byte[] row) { 426 return Bytes.equals(HConstants.EMPTY_END_ROW, row); 427 } 428 429 public Job createSubmittableJob(String[] args) throws IOException { 430 Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME); 431 generatePartitions(partitionsPath); 432 433 Job job = Job.getInstance(getConf(), 434 getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); 435 Configuration jobConf = job.getConfiguration(); 436 jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); 437 job.setJarByClass(HashTable.class); 438 439 TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), 440 HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 441 442 // use a TotalOrderPartitioner and reducers to group region output into hash files 443 job.setPartitionerClass(TotalOrderPartitioner.class); 444 TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath); 445 job.setReducerClass(Reducer.class); // identity reducer 446 job.setNumReduceTasks(tableHash.numHashFiles); 447 job.setOutputKeyClass(ImmutableBytesWritable.class); 448 job.setOutputValueClass(ImmutableBytesWritable.class); 449 job.setOutputFormatClass(MapFileOutputFormat.class); 450 FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR)); 451 452 return job; 453 } 454 455 private void generatePartitions(Path partitionsPath) throws IOException { 456 Connection connection = ConnectionFactory.createConnection(getConf()); 457 Pair<byte[][], byte[][]> regionKeys 458 = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys(); 459 connection.close(); 460 461 tableHash.selectPartitions(regionKeys); 462 LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath); 463 464 tableHash.writePartitionFile(getConf(), partitionsPath); 465 } 466 467 static class ResultHasher { 468 private MessageDigest digest; 469 470 private boolean batchStarted = false; 471 private ImmutableBytesWritable batchStartKey; 472 private ImmutableBytesWritable batchHash; 473 private long batchSize = 0; 474 475 476 public ResultHasher() { 477 try { 478 digest = MessageDigest.getInstance("MD5"); 479 } catch (NoSuchAlgorithmException e) { 480 Throwables.propagate(e); 481 } 482 } 483 484 public void startBatch(ImmutableBytesWritable row) { 485 if (batchStarted) { 486 throw new RuntimeException("Cannot start new batch without finishing existing one."); 487 } 488 batchStarted = true; 489 batchSize = 0; 490 batchStartKey = row; 491 batchHash = null; 492 } 493 494 public void hashResult(Result result) { 495 if (!batchStarted) { 496 throw new RuntimeException("Cannot add to batch that has not been started."); 497 } 498 for (Cell cell : result.rawCells()) { 499 int rowLength = cell.getRowLength(); 500 int familyLength = cell.getFamilyLength(); 501 int qualifierLength = cell.getQualifierLength(); 502 int valueLength = cell.getValueLength(); 503 digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength); 504 digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); 505 digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); 506 long ts = cell.getTimestamp(); 507 for (int i = 8; i > 0; i--) { 508 digest.update((byte) ts); 509 ts >>>= 8; 510 } 511 digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength); 512 513 batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength; 514 } 515 } 516 517 public void finishBatch() { 518 if (!batchStarted) { 519 throw new RuntimeException("Cannot finish batch that has not started."); 520 } 521 batchStarted = false; 522 batchHash = new ImmutableBytesWritable(digest.digest()); 523 } 524 525 public boolean isBatchStarted() { 526 return batchStarted; 527 } 528 529 public ImmutableBytesWritable getBatchStartKey() { 530 return batchStartKey; 531 } 532 533 public ImmutableBytesWritable getBatchHash() { 534 return batchHash; 535 } 536 537 public long getBatchSize() { 538 return batchSize; 539 } 540 } 541 542 public static class HashMapper 543 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 544 545 private ResultHasher hasher; 546 private long targetBatchSize; 547 548 private ImmutableBytesWritable currentRow; 549 550 @Override 551 protected void setup(Context context) throws IOException, InterruptedException { 552 targetBatchSize = context.getConfiguration() 553 .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); 554 hasher = new ResultHasher(); 555 556 TableSplit split = (TableSplit) context.getInputSplit(); 557 hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); 558 } 559 560 @Override 561 protected void map(ImmutableBytesWritable key, Result value, Context context) 562 throws IOException, InterruptedException { 563 564 if (currentRow == null || !currentRow.equals(key)) { 565 currentRow = new ImmutableBytesWritable(key); // not immutable 566 567 if (hasher.getBatchSize() >= targetBatchSize) { 568 hasher.finishBatch(); 569 context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); 570 hasher.startBatch(currentRow); 571 } 572 } 573 574 hasher.hashResult(value); 575 } 576 577 @Override 578 protected void cleanup(Context context) throws IOException, InterruptedException { 579 hasher.finishBatch(); 580 context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); 581 } 582 } 583 584 private void writeTempManifestFile() throws IOException { 585 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); 586 FileSystem fs = tempManifestPath.getFileSystem(getConf()); 587 tableHash.writePropertiesFile(fs, tempManifestPath); 588 } 589 590 private void completeManifest() throws IOException { 591 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); 592 Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME); 593 FileSystem fs = tempManifestPath.getFileSystem(getConf()); 594 fs.rename(tempManifestPath, manifestPath); 595 } 596 597 private static final int NUM_ARGS = 2; 598 private static void printUsage(final String errorMsg) { 599 if (errorMsg != null && errorMsg.length() > 0) { 600 System.err.println("ERROR: " + errorMsg); 601 System.err.println(); 602 } 603 System.err.println("Usage: HashTable [options] <tablename> <outputpath>"); 604 System.err.println(); 605 System.err.println("Options:"); 606 System.err.println(" batchsize the target amount of bytes to hash in each batch"); 607 System.err.println(" rows are added to the batch until this size is reached"); 608 System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); 609 System.err.println(" numhashfiles the number of hash files to create"); 610 System.err.println(" if set to fewer than number of regions then"); 611 System.err.println(" the job will create this number of reducers"); 612 System.err.println(" (defaults to 1/100 of regions -- at least 1)"); 613 System.err.println(" startrow the start row"); 614 System.err.println(" stoprow the stop row"); 615 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 616 System.err.println(" without endtime means from starttime to forever"); 617 System.err.println(" endtime end of the time range. Ignored if no starttime specified."); 618 System.err.println(" scanbatch scanner batch size to support intra row scans"); 619 System.err.println(" versions number of cell versions to include"); 620 System.err.println(" families comma-separated list of families to include"); 621 System.err.println(); 622 System.err.println("Args:"); 623 System.err.println(" tablename Name of the table to hash"); 624 System.err.println(" outputpath Filesystem path to put the output data"); 625 System.err.println(); 626 System.err.println("Examples:"); 627 System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:"); 628 System.err.println(" $ hbase " + 629 "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50" 630 + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3" 631 + " TestTable /hashes/testTable"); 632 } 633 634 private boolean doCommandLine(final String[] args) { 635 if (args.length < NUM_ARGS) { 636 printUsage(null); 637 return false; 638 } 639 try { 640 641 tableHash.tableName = args[args.length-2]; 642 destPath = new Path(args[args.length-1]); 643 644 for (int i = 0; i < args.length - NUM_ARGS; i++) { 645 String cmd = args[i]; 646 if (cmd.equals("-h") || cmd.startsWith("--h")) { 647 printUsage(null); 648 return false; 649 } 650 651 final String batchSizeArgKey = "--batchsize="; 652 if (cmd.startsWith(batchSizeArgKey)) { 653 tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length())); 654 continue; 655 } 656 657 final String numHashFilesArgKey = "--numhashfiles="; 658 if (cmd.startsWith(numHashFilesArgKey)) { 659 tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length())); 660 continue; 661 } 662 663 final String startRowArgKey = "--startrow="; 664 if (cmd.startsWith(startRowArgKey)) { 665 tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length())); 666 continue; 667 } 668 669 final String stopRowArgKey = "--stoprow="; 670 if (cmd.startsWith(stopRowArgKey)) { 671 tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length())); 672 continue; 673 } 674 675 final String startTimeArgKey = "--starttime="; 676 if (cmd.startsWith(startTimeArgKey)) { 677 tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 678 continue; 679 } 680 681 final String endTimeArgKey = "--endtime="; 682 if (cmd.startsWith(endTimeArgKey)) { 683 tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 684 continue; 685 } 686 687 final String scanBatchArgKey = "--scanbatch="; 688 if (cmd.startsWith(scanBatchArgKey)) { 689 tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length())); 690 continue; 691 } 692 693 final String versionsArgKey = "--versions="; 694 if (cmd.startsWith(versionsArgKey)) { 695 tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 696 continue; 697 } 698 699 final String familiesArgKey = "--families="; 700 if (cmd.startsWith(familiesArgKey)) { 701 tableHash.families = cmd.substring(familiesArgKey.length()); 702 continue; 703 } 704 705 printUsage("Invalid argument '" + cmd + "'"); 706 return false; 707 } 708 if ((tableHash.startTime != 0 || tableHash.endTime != 0) 709 && (tableHash.startTime >= tableHash.endTime)) { 710 printUsage("Invalid time range filter: starttime=" 711 + tableHash.startTime + " >= endtime=" + tableHash.endTime); 712 return false; 713 } 714 715 } catch (Exception e) { 716 e.printStackTrace(); 717 printUsage("Can't start because " + e.getMessage()); 718 return false; 719 } 720 return true; 721 } 722 723 /** 724 * Main entry point. 725 */ 726 public static void main(String[] args) throws Exception { 727 int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args); 728 System.exit(ret); 729 } 730 731 @Override 732 public int run(String[] args) throws Exception { 733 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 734 if (!doCommandLine(otherArgs)) { 735 return 1; 736 } 737 738 Job job = createSubmittableJob(otherArgs); 739 writeTempManifestFile(); 740 if (!job.waitForCompletion(true)) { 741 LOG.info("Map-reduce job failed!"); 742 return 1; 743 } 744 completeManifest(); 745 return 0; 746 } 747 748}