001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.io.InputStreamReader; 022import java.io.OutputStreamWriter; 023import java.security.MessageDigest; 024import java.security.NoSuchAlgorithmException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Properties; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.io.MapFile; 046import org.apache.hadoop.io.NullWritable; 047import org.apache.hadoop.io.SequenceFile; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.mapreduce.Reducer; 050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 051import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; 052import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 053import org.apache.hadoop.util.GenericOptionsParser; 054import org.apache.hadoop.util.Tool; 055import org.apache.hadoop.util.ToolRunner; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.base.Charsets; 061import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 062import org.apache.hbase.thirdparty.com.google.common.collect.Ordering; 063 064@InterfaceAudience.Private 065public class HashTable extends Configured implements Tool { 066 067 private static final Logger LOG = LoggerFactory.getLogger(HashTable.class); 068 069 private static final int DEFAULT_BATCH_SIZE = 8000; 070 071 private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size"; 072 final static String PARTITIONS_FILE_NAME = "partitions"; 073 final static String MANIFEST_FILE_NAME = "manifest"; 074 final static String HASH_DATA_DIR = "hashes"; 075 final static String OUTPUT_DATA_FILE_PREFIX = "part-r-"; 076 final static String IGNORE_TIMESTAMPS = "ignoreTimestamps"; 077 private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp"; 078 079 TableHash tableHash = new TableHash(); 080 Path destPath; 081 082 public HashTable(Configuration conf) { 083 super(conf); 084 } 085 086 public static class TableHash { 087 088 Path hashDir; 089 090 String tableName; 091 String families = null; 092 long batchSize = DEFAULT_BATCH_SIZE; 093 int numHashFiles = 0; 094 byte[] startRow = HConstants.EMPTY_START_ROW; 095 byte[] stopRow = HConstants.EMPTY_END_ROW; 096 int scanBatch = 0; 097 int versions = -1; 098 long startTime = 0; 099 long endTime = 0; 100 boolean ignoreTimestamps; 101 102 List<ImmutableBytesWritable> partitions; 103 104 public static TableHash read(Configuration conf, Path hashDir) throws IOException { 105 TableHash tableHash = new TableHash(); 106 FileSystem fs = hashDir.getFileSystem(conf); 107 tableHash.hashDir = hashDir; 108 tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME)); 109 tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME)); 110 return tableHash; 111 } 112 113 void writePropertiesFile(FileSystem fs, Path path) throws IOException { 114 Properties p = new Properties(); 115 p.setProperty("table", tableName); 116 if (families != null) { 117 p.setProperty("columnFamilies", families); 118 } 119 p.setProperty("targetBatchSize", Long.toString(batchSize)); 120 p.setProperty("numHashFiles", Integer.toString(numHashFiles)); 121 if (!isTableStartRow(startRow)) { 122 p.setProperty("startRowHex", Bytes.toHex(startRow)); 123 } 124 if (!isTableEndRow(stopRow)) { 125 p.setProperty("stopRowHex", Bytes.toHex(stopRow)); 126 } 127 if (scanBatch > 0) { 128 p.setProperty("scanBatch", Integer.toString(scanBatch)); 129 } 130 if (versions >= 0) { 131 p.setProperty("versions", Integer.toString(versions)); 132 } 133 if (startTime != 0) { 134 p.setProperty("startTimestamp", Long.toString(startTime)); 135 } 136 if (endTime != 0) { 137 p.setProperty("endTimestamp", Long.toString(endTime)); 138 } 139 140 try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) { 141 p.store(osw, null); 142 } 143 } 144 145 void readPropertiesFile(FileSystem fs, Path path) throws IOException { 146 Properties p = new Properties(); 147 try (FSDataInputStream in = fs.open(path)) { 148 try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) { 149 p.load(isr); 150 } 151 } 152 tableName = p.getProperty("table"); 153 families = p.getProperty("columnFamilies"); 154 batchSize = Long.parseLong(p.getProperty("targetBatchSize")); 155 numHashFiles = Integer.parseInt(p.getProperty("numHashFiles")); 156 157 String startRowHex = p.getProperty("startRowHex"); 158 if (startRowHex != null) { 159 startRow = Bytes.fromHex(startRowHex); 160 } 161 String stopRowHex = p.getProperty("stopRowHex"); 162 if (stopRowHex != null) { 163 stopRow = Bytes.fromHex(stopRowHex); 164 } 165 166 String scanBatchString = p.getProperty("scanBatch"); 167 if (scanBatchString != null) { 168 scanBatch = Integer.parseInt(scanBatchString); 169 } 170 171 String versionString = p.getProperty("versions"); 172 if (versionString != null) { 173 versions = Integer.parseInt(versionString); 174 } 175 176 String startTimeString = p.getProperty("startTimestamp"); 177 if (startTimeString != null) { 178 startTime = Long.parseLong(startTimeString); 179 } 180 181 String endTimeString = p.getProperty("endTimestamp"); 182 if (endTimeString != null) { 183 endTime = Long.parseLong(endTimeString); 184 } 185 } 186 187 Scan initScan() throws IOException { 188 Scan scan = new Scan(); 189 scan.setCacheBlocks(false); 190 if (startTime != 0 || endTime != 0) { 191 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 192 } 193 if (scanBatch > 0) { 194 scan.setBatch(scanBatch); 195 } 196 if (versions >= 0) { 197 scan.setMaxVersions(versions); 198 } 199 if (!isTableStartRow(startRow)) { 200 scan.setStartRow(startRow); 201 } 202 if (!isTableEndRow(stopRow)) { 203 scan.setStopRow(stopRow); 204 } 205 if(families != null) { 206 for(String fam : families.split(",")) { 207 scan.addFamily(Bytes.toBytes(fam)); 208 } 209 } 210 return scan; 211 } 212 213 /** 214 * Choose partitions between row ranges to hash to a single output file 215 * Selects region boundaries that fall within the scan range, and groups them 216 * into the desired number of partitions. 217 */ 218 void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) { 219 List<byte[]> startKeys = new ArrayList<>(); 220 for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) { 221 byte[] regionStartKey = regionStartEndKeys.getFirst()[i]; 222 byte[] regionEndKey = regionStartEndKeys.getSecond()[i]; 223 224 // if scan begins after this region, or starts before this region, then drop this region 225 // in other words: 226 // IF (scan begins before the end of this region 227 // AND scan ends before the start of this region) 228 // THEN include this region 229 if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey) 230 || Bytes.compareTo(startRow, regionEndKey) < 0) 231 && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey) 232 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { 233 startKeys.add(regionStartKey); 234 } 235 } 236 237 int numRegions = startKeys.size(); 238 if (numHashFiles == 0) { 239 numHashFiles = numRegions / 100; 240 } 241 if (numHashFiles == 0) { 242 numHashFiles = 1; 243 } 244 if (numHashFiles > numRegions) { 245 // can't partition within regions 246 numHashFiles = numRegions; 247 } 248 249 // choose a subset of start keys to group regions into ranges 250 partitions = new ArrayList<>(numHashFiles - 1); 251 // skip the first start key as it is not a partition between ranges. 252 for (long i = 1; i < numHashFiles; i++) { 253 int splitIndex = (int) (numRegions * i / numHashFiles); 254 partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex))); 255 } 256 } 257 258 void writePartitionFile(Configuration conf, Path path) throws IOException { 259 FileSystem fs = path.getFileSystem(conf); 260 @SuppressWarnings("deprecation") 261 SequenceFile.Writer writer = SequenceFile.createWriter( 262 fs, conf, path, ImmutableBytesWritable.class, NullWritable.class); 263 264 for (int i = 0; i < partitions.size(); i++) { 265 writer.append(partitions.get(i), NullWritable.get()); 266 } 267 writer.close(); 268 } 269 270 private void readPartitionFile(FileSystem fs, Configuration conf, Path path) 271 throws IOException { 272 @SuppressWarnings("deprecation") 273 SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); 274 ImmutableBytesWritable key = new ImmutableBytesWritable(); 275 partitions = new ArrayList<>(); 276 while (reader.next(key)) { 277 partitions.add(new ImmutableBytesWritable(key.copyBytes())); 278 } 279 reader.close(); 280 281 if (!Ordering.natural().isOrdered(partitions)) { 282 throw new IOException("Partitions are not ordered!"); 283 } 284 } 285 286 @Override 287 public String toString() { 288 StringBuilder sb = new StringBuilder(); 289 sb.append("tableName=").append(tableName); 290 if (families != null) { 291 sb.append(", families=").append(families); 292 } 293 sb.append(", batchSize=").append(batchSize); 294 sb.append(", numHashFiles=").append(numHashFiles); 295 if (!isTableStartRow(startRow)) { 296 sb.append(", startRowHex=").append(Bytes.toHex(startRow)); 297 } 298 if (!isTableEndRow(stopRow)) { 299 sb.append(", stopRowHex=").append(Bytes.toHex(stopRow)); 300 } 301 if (scanBatch >= 0) { 302 sb.append(", scanBatch=").append(scanBatch); 303 } 304 if (versions >= 0) { 305 sb.append(", versions=").append(versions); 306 } 307 if (startTime != 0) { 308 sb.append("startTime=").append(startTime); 309 } 310 if (endTime != 0) { 311 sb.append("endTime=").append(endTime); 312 } 313 return sb.toString(); 314 } 315 316 static String getDataFileName(int hashFileIndex) { 317 return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex); 318 } 319 320 /** 321 * Open a TableHash.Reader starting at the first hash at or after the given key. 322 * @throws IOException 323 */ 324 public Reader newReader(Configuration conf, ImmutableBytesWritable startKey) 325 throws IOException { 326 return new Reader(conf, startKey); 327 } 328 329 public class Reader implements java.io.Closeable { 330 private final Configuration conf; 331 332 private int hashFileIndex; 333 private MapFile.Reader mapFileReader; 334 335 private boolean cachedNext; 336 private ImmutableBytesWritable key; 337 private ImmutableBytesWritable hash; 338 339 Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException { 340 this.conf = conf; 341 int partitionIndex = Collections.binarySearch(partitions, startKey); 342 if (partitionIndex >= 0) { 343 // if the key is equal to a partition, then go the file after that partition 344 hashFileIndex = partitionIndex+1; 345 } else { 346 // if the key is between partitions, then go to the file between those partitions 347 hashFileIndex = -1-partitionIndex; 348 } 349 openHashFile(); 350 351 // MapFile's don't make it easy to seek() so that the subsequent next() returns 352 // the desired key/value pair. So we cache it for the first call of next(). 353 hash = new ImmutableBytesWritable(); 354 key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash); 355 if (key == null) { 356 cachedNext = false; 357 hash = null; 358 } else { 359 cachedNext = true; 360 } 361 } 362 363 /** 364 * Read the next key/hash pair. 365 * Returns true if such a pair exists and false when at the end of the data. 366 */ 367 public boolean next() throws IOException { 368 if (cachedNext) { 369 cachedNext = false; 370 return true; 371 } 372 key = new ImmutableBytesWritable(); 373 hash = new ImmutableBytesWritable(); 374 while (true) { 375 boolean hasNext = mapFileReader.next(key, hash); 376 if (hasNext) { 377 return true; 378 } 379 hashFileIndex++; 380 if (hashFileIndex < TableHash.this.numHashFiles) { 381 mapFileReader.close(); 382 openHashFile(); 383 } else { 384 key = null; 385 hash = null; 386 return false; 387 } 388 } 389 } 390 391 /** 392 * Get the current key 393 * @return the current key or null if there is no current key 394 */ 395 public ImmutableBytesWritable getCurrentKey() { 396 return key; 397 } 398 399 /** 400 * Get the current hash 401 * @return the current hash or null if there is no current hash 402 */ 403 public ImmutableBytesWritable getCurrentHash() { 404 return hash; 405 } 406 407 private void openHashFile() throws IOException { 408 if (mapFileReader != null) { 409 mapFileReader.close(); 410 } 411 Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR); 412 Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex)); 413 mapFileReader = new MapFile.Reader(dataFile, conf); 414 } 415 416 @Override 417 public void close() throws IOException { 418 mapFileReader.close(); 419 } 420 } 421 } 422 423 static boolean isTableStartRow(byte[] row) { 424 return Bytes.equals(HConstants.EMPTY_START_ROW, row); 425 } 426 427 static boolean isTableEndRow(byte[] row) { 428 return Bytes.equals(HConstants.EMPTY_END_ROW, row); 429 } 430 431 public Job createSubmittableJob(String[] args) throws IOException { 432 Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME); 433 generatePartitions(partitionsPath); 434 435 Job job = Job.getInstance(getConf(), 436 getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); 437 Configuration jobConf = job.getConfiguration(); 438 jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); 439 jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps); 440 job.setJarByClass(HashTable.class); 441 442 TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), 443 HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); 444 445 // use a TotalOrderPartitioner and reducers to group region output into hash files 446 job.setPartitionerClass(TotalOrderPartitioner.class); 447 TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath); 448 job.setReducerClass(Reducer.class); // identity reducer 449 job.setNumReduceTasks(tableHash.numHashFiles); 450 job.setOutputKeyClass(ImmutableBytesWritable.class); 451 job.setOutputValueClass(ImmutableBytesWritable.class); 452 job.setOutputFormatClass(MapFileOutputFormat.class); 453 FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR)); 454 455 return job; 456 } 457 458 private void generatePartitions(Path partitionsPath) throws IOException { 459 Connection connection = ConnectionFactory.createConnection(getConf()); 460 Pair<byte[][], byte[][]> regionKeys 461 = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys(); 462 connection.close(); 463 464 tableHash.selectPartitions(regionKeys); 465 LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath); 466 467 tableHash.writePartitionFile(getConf(), partitionsPath); 468 } 469 470 static class ResultHasher { 471 private MessageDigest digest; 472 473 private boolean batchStarted = false; 474 private ImmutableBytesWritable batchStartKey; 475 private ImmutableBytesWritable batchHash; 476 private long batchSize = 0; 477 boolean ignoreTimestamps; 478 479 480 public ResultHasher() { 481 try { 482 digest = MessageDigest.getInstance("MD5"); 483 } catch (NoSuchAlgorithmException e) { 484 Throwables.propagate(e); 485 } 486 } 487 488 public void startBatch(ImmutableBytesWritable row) { 489 if (batchStarted) { 490 throw new RuntimeException("Cannot start new batch without finishing existing one."); 491 } 492 batchStarted = true; 493 batchSize = 0; 494 batchStartKey = row; 495 batchHash = null; 496 } 497 498 public void hashResult(Result result) { 499 if (!batchStarted) { 500 throw new RuntimeException("Cannot add to batch that has not been started."); 501 } 502 for (Cell cell : result.rawCells()) { 503 int rowLength = cell.getRowLength(); 504 int familyLength = cell.getFamilyLength(); 505 int qualifierLength = cell.getQualifierLength(); 506 int valueLength = cell.getValueLength(); 507 digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength); 508 digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); 509 digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); 510 511 if (!ignoreTimestamps) { 512 long ts = cell.getTimestamp(); 513 for (int i = 8; i > 0; i--) { 514 digest.update((byte) ts); 515 ts >>>= 8; 516 } 517 } 518 digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength); 519 520 batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength; 521 } 522 } 523 524 public void finishBatch() { 525 if (!batchStarted) { 526 throw new RuntimeException("Cannot finish batch that has not started."); 527 } 528 batchStarted = false; 529 batchHash = new ImmutableBytesWritable(digest.digest()); 530 } 531 532 public boolean isBatchStarted() { 533 return batchStarted; 534 } 535 536 public ImmutableBytesWritable getBatchStartKey() { 537 return batchStartKey; 538 } 539 540 public ImmutableBytesWritable getBatchHash() { 541 return batchHash; 542 } 543 544 public long getBatchSize() { 545 return batchSize; 546 } 547 } 548 549 public static class HashMapper 550 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 551 552 private ResultHasher hasher; 553 private long targetBatchSize; 554 555 private ImmutableBytesWritable currentRow; 556 557 @Override 558 protected void setup(Context context) throws IOException, InterruptedException { 559 targetBatchSize = context.getConfiguration() 560 .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); 561 hasher = new ResultHasher(); 562 hasher.ignoreTimestamps = context.getConfiguration(). 563 getBoolean(IGNORE_TIMESTAMPS, false); 564 TableSplit split = (TableSplit) context.getInputSplit(); 565 hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); 566 } 567 568 @Override 569 protected void map(ImmutableBytesWritable key, Result value, Context context) 570 throws IOException, InterruptedException { 571 572 if (currentRow == null || !currentRow.equals(key)) { 573 currentRow = new ImmutableBytesWritable(key); // not immutable 574 575 if (hasher.getBatchSize() >= targetBatchSize) { 576 hasher.finishBatch(); 577 context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); 578 hasher.startBatch(currentRow); 579 } 580 } 581 582 hasher.hashResult(value); 583 } 584 585 @Override 586 protected void cleanup(Context context) throws IOException, InterruptedException { 587 hasher.finishBatch(); 588 context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); 589 } 590 } 591 592 private void writeTempManifestFile() throws IOException { 593 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); 594 FileSystem fs = tempManifestPath.getFileSystem(getConf()); 595 tableHash.writePropertiesFile(fs, tempManifestPath); 596 } 597 598 private void completeManifest() throws IOException { 599 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); 600 Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME); 601 FileSystem fs = tempManifestPath.getFileSystem(getConf()); 602 fs.rename(tempManifestPath, manifestPath); 603 } 604 605 private static final int NUM_ARGS = 2; 606 private static void printUsage(final String errorMsg) { 607 if (errorMsg != null && errorMsg.length() > 0) { 608 System.err.println("ERROR: " + errorMsg); 609 System.err.println(); 610 } 611 System.err.println("Usage: HashTable [options] <tablename> <outputpath>"); 612 System.err.println(); 613 System.err.println("Options:"); 614 System.err.println(" batchsize the target amount of bytes to hash in each batch"); 615 System.err.println(" rows are added to the batch until this size is reached"); 616 System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); 617 System.err.println(" numhashfiles the number of hash files to create"); 618 System.err.println(" if set to fewer than number of regions then"); 619 System.err.println(" the job will create this number of reducers"); 620 System.err.println(" (defaults to 1/100 of regions -- at least 1)"); 621 System.err.println(" startrow the start row"); 622 System.err.println(" stoprow the stop row"); 623 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 624 System.err.println(" without endtime means from starttime to forever"); 625 System.err.println(" endtime end of the time range."); 626 System.err.println(" Ignored if no starttime specified."); 627 System.err.println(" scanbatch scanner batch size to support intra row scans"); 628 System.err.println(" versions number of cell versions to include"); 629 System.err.println(" families comma-separated list of families to include"); 630 System.err.println(" ignoreTimestamps if true, ignores cell timestamps"); 631 System.err.println(" when calculating hashes"); 632 System.err.println(); 633 System.err.println("Args:"); 634 System.err.println(" tablename Name of the table to hash"); 635 System.err.println(" outputpath Filesystem path to put the output data"); 636 System.err.println(); 637 System.err.println("Examples:"); 638 System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:"); 639 System.err.println(" $ hbase " + 640 "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50" 641 + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3" 642 + " TestTable /hashes/testTable"); 643 } 644 645 private boolean doCommandLine(final String[] args) { 646 if (args.length < NUM_ARGS) { 647 printUsage(null); 648 return false; 649 } 650 try { 651 652 tableHash.tableName = args[args.length-2]; 653 destPath = new Path(args[args.length-1]); 654 655 for (int i = 0; i < args.length - NUM_ARGS; i++) { 656 String cmd = args[i]; 657 if (cmd.equals("-h") || cmd.startsWith("--h")) { 658 printUsage(null); 659 return false; 660 } 661 662 final String batchSizeArgKey = "--batchsize="; 663 if (cmd.startsWith(batchSizeArgKey)) { 664 tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length())); 665 continue; 666 } 667 668 final String numHashFilesArgKey = "--numhashfiles="; 669 if (cmd.startsWith(numHashFilesArgKey)) { 670 tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length())); 671 continue; 672 } 673 674 final String startRowArgKey = "--startrow="; 675 if (cmd.startsWith(startRowArgKey)) { 676 tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length())); 677 continue; 678 } 679 680 final String stopRowArgKey = "--stoprow="; 681 if (cmd.startsWith(stopRowArgKey)) { 682 tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length())); 683 continue; 684 } 685 686 final String startTimeArgKey = "--starttime="; 687 if (cmd.startsWith(startTimeArgKey)) { 688 tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 689 continue; 690 } 691 692 final String endTimeArgKey = "--endtime="; 693 if (cmd.startsWith(endTimeArgKey)) { 694 tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 695 continue; 696 } 697 698 final String scanBatchArgKey = "--scanbatch="; 699 if (cmd.startsWith(scanBatchArgKey)) { 700 tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length())); 701 continue; 702 } 703 704 final String versionsArgKey = "--versions="; 705 if (cmd.startsWith(versionsArgKey)) { 706 tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 707 continue; 708 } 709 710 final String familiesArgKey = "--families="; 711 if (cmd.startsWith(familiesArgKey)) { 712 tableHash.families = cmd.substring(familiesArgKey.length()); 713 continue; 714 } 715 716 final String ignoreTimestampsKey = "--ignoreTimestamps="; 717 if (cmd.startsWith(ignoreTimestampsKey)) { 718 tableHash.ignoreTimestamps = Boolean. 719 parseBoolean(cmd.substring(ignoreTimestampsKey.length())); 720 continue; 721 } 722 723 printUsage("Invalid argument '" + cmd + "'"); 724 return false; 725 } 726 if ((tableHash.startTime != 0 || tableHash.endTime != 0) 727 && (tableHash.startTime >= tableHash.endTime)) { 728 printUsage("Invalid time range filter: starttime=" 729 + tableHash.startTime + " >= endtime=" + tableHash.endTime); 730 return false; 731 } 732 733 } catch (Exception e) { 734 e.printStackTrace(); 735 printUsage("Can't start because " + e.getMessage()); 736 return false; 737 } 738 return true; 739 } 740 741 /** 742 * Main entry point. 743 */ 744 public static void main(String[] args) throws Exception { 745 int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args); 746 System.exit(ret); 747 } 748 749 @Override 750 public int run(String[] args) throws Exception { 751 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); 752 if (!doCommandLine(otherArgs)) { 753 return 1; 754 } 755 756 Job job = createSubmittableJob(otherArgs); 757 writeTempManifestFile(); 758 if (!job.waitForCompletion(true)) { 759 LOG.info("Map-reduce job failed!"); 760 return 1; 761 } 762 completeManifest(); 763 return 0; 764 } 765 766}