1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.io.InputStreamReader;
22 import java.io.OutputStreamWriter;
23 import java.security.MessageDigest;
24 import java.security.NoSuchAlgorithmException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Properties;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.conf.Configured;
34 import org.apache.hadoop.fs.FSDataInputStream;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.Pair;
48 import org.apache.hadoop.io.MapFile;
49 import org.apache.hadoop.io.NullWritable;
50 import org.apache.hadoop.io.SequenceFile;
51 import org.apache.hadoop.mapreduce.Job;
52 import org.apache.hadoop.mapreduce.Reducer;
53 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
54 import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
55 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
56 import org.apache.hadoop.util.GenericOptionsParser;
57 import org.apache.hadoop.util.Tool;
58 import org.apache.hadoop.util.ToolRunner;
59
60 import com.google.common.base.Charsets;
61 import com.google.common.base.Throwables;
62 import com.google.common.collect.Ordering;
63
64 public class HashTable extends Configured implements Tool {
65
66 private static final Log LOG = LogFactory.getLog(HashTable.class);
67
68 private static final int DEFAULT_BATCH_SIZE = 8000;
69
70 private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
71 final static String PARTITIONS_FILE_NAME = "partitions";
72 final static String MANIFEST_FILE_NAME = "manifest";
73 final static String HASH_DATA_DIR = "hashes";
74 final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
75 private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
76
77 TableHash tableHash = new TableHash();
78 Path destPath;
79
80 public HashTable(Configuration conf) {
81 super(conf);
82 }
83
84 public static class TableHash {
85
86 Path hashDir;
87
88 String tableName;
89 String families = null;
90 long batchSize = DEFAULT_BATCH_SIZE;
91 int numHashFiles = 0;
92 byte[] startRow = HConstants.EMPTY_START_ROW;
93 byte[] stopRow = HConstants.EMPTY_END_ROW;
94 int scanBatch = 0;
95 int versions = -1;
96 long startTime = 0;
97 long endTime = 0;
98
99 List<ImmutableBytesWritable> partitions;
100
101 public static TableHash read(Configuration conf, Path hashDir) throws IOException {
102 TableHash tableHash = new TableHash();
103 FileSystem fs = hashDir.getFileSystem(conf);
104 tableHash.hashDir = hashDir;
105 tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
106 tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
107 return tableHash;
108 }
109
110 void writePropertiesFile(FileSystem fs, Path path) throws IOException {
111 Properties p = new Properties();
112 p.setProperty("table", tableName);
113 if (families != null) {
114 p.setProperty("columnFamilies", families);
115 }
116 p.setProperty("targetBatchSize", Long.toString(batchSize));
117 p.setProperty("numHashFiles", Integer.toString(numHashFiles));
118 if (!isTableStartRow(startRow)) {
119 p.setProperty("startRowHex", Bytes.toHex(startRow));
120 }
121 if (!isTableEndRow(stopRow)) {
122 p.setProperty("stopRowHex", Bytes.toHex(stopRow));
123 }
124 if (scanBatch > 0) {
125 p.setProperty("scanBatch", Integer.toString(scanBatch));
126 }
127 if (versions >= 0) {
128 p.setProperty("versions", Integer.toString(versions));
129 }
130 if (startTime != 0) {
131 p.setProperty("startTimestamp", Long.toString(startTime));
132 }
133 if (endTime != 0) {
134 p.setProperty("endTimestamp", Long.toString(endTime));
135 }
136
137 try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
138 p.store(osw, null);
139 }
140 }
141
142 void readPropertiesFile(FileSystem fs, Path path) throws IOException {
143 Properties p = new Properties();
144 try (FSDataInputStream in = fs.open(path)) {
145 try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
146 p.load(isr);
147 }
148 }
149 tableName = p.getProperty("table");
150 families = p.getProperty("columnFamilies");
151 batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
152 numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
153
154 String startRowHex = p.getProperty("startRowHex");
155 if (startRowHex != null) {
156 startRow = Bytes.fromHex(startRowHex);
157 }
158 String stopRowHex = p.getProperty("stopRowHex");
159 if (stopRowHex != null) {
160 stopRow = Bytes.fromHex(stopRowHex);
161 }
162
163 String scanBatchString = p.getProperty("scanBatch");
164 if (scanBatchString != null) {
165 scanBatch = Integer.parseInt(scanBatchString);
166 }
167
168 String versionString = p.getProperty("versions");
169 if (versionString != null) {
170 versions = Integer.parseInt(versionString);
171 }
172
173 String startTimeString = p.getProperty("startTimestamp");
174 if (startTimeString != null) {
175 startTime = Long.parseLong(startTimeString);
176 }
177
178 String endTimeString = p.getProperty("endTimestamp");
179 if (endTimeString != null) {
180 endTime = Long.parseLong(endTimeString);
181 }
182 }
183
184 Scan initScan() throws IOException {
185 Scan scan = new Scan();
186 scan.setCacheBlocks(false);
187 if (startTime != 0 || endTime != 0) {
188 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
189 }
190 if (scanBatch > 0) {
191 scan.setBatch(scanBatch);
192 }
193 if (versions >= 0) {
194 scan.setMaxVersions(versions);
195 }
196 if (!isTableStartRow(startRow)) {
197 scan.setStartRow(startRow);
198 }
199 if (!isTableEndRow(stopRow)) {
200 scan.setStopRow(stopRow);
201 }
202 if(families != null) {
203 for(String fam : families.split(",")) {
204 scan.addFamily(Bytes.toBytes(fam));
205 }
206 }
207 return scan;
208 }
209
210
211
212
213
214
215 void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
216 List<byte[]> startKeys = new ArrayList<byte[]>();
217 for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
218 byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
219 byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
220
221
222
223
224
225
226 if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
227 || Bytes.compareTo(startRow, regionEndKey) < 0)
228 && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
229 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
230 startKeys.add(regionStartKey);
231 }
232 }
233
234 int numRegions = startKeys.size();
235 if (numHashFiles == 0) {
236 numHashFiles = numRegions / 100;
237 }
238 if (numHashFiles == 0) {
239 numHashFiles = 1;
240 }
241 if (numHashFiles > numRegions) {
242
243 numHashFiles = numRegions;
244 }
245
246
247 partitions = new ArrayList<ImmutableBytesWritable>(numHashFiles - 1);
248
249 for (long i = 1; i < numHashFiles; i++) {
250 int splitIndex = (int) (numRegions * i / numHashFiles);
251 partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
252 }
253 }
254
255 void writePartitionFile(Configuration conf, Path path) throws IOException {
256 FileSystem fs = path.getFileSystem(conf);
257 @SuppressWarnings("deprecation")
258 SequenceFile.Writer writer = SequenceFile.createWriter(
259 fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
260
261 for (int i = 0; i < partitions.size(); i++) {
262 writer.append(partitions.get(i), NullWritable.get());
263 }
264 writer.close();
265 }
266
267 private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
268 throws IOException {
269 @SuppressWarnings("deprecation")
270 SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
271 ImmutableBytesWritable key = new ImmutableBytesWritable();
272 partitions = new ArrayList<ImmutableBytesWritable>();
273 while (reader.next(key)) {
274 partitions.add(new ImmutableBytesWritable(key.copyBytes()));
275 }
276 reader.close();
277
278 if (!Ordering.natural().isOrdered(partitions)) {
279 throw new IOException("Partitions are not ordered!");
280 }
281 }
282
283 @Override
284 public String toString() {
285 StringBuilder sb = new StringBuilder();
286 sb.append("tableName=").append(tableName);
287 if (families != null) {
288 sb.append(", families=").append(families);
289 }
290 sb.append(", batchSize=").append(batchSize);
291 sb.append(", numHashFiles=").append(numHashFiles);
292 if (!isTableStartRow(startRow)) {
293 sb.append(", startRowHex=").append(Bytes.toHex(startRow));
294 }
295 if (!isTableEndRow(stopRow)) {
296 sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
297 }
298 if (scanBatch >= 0) {
299 sb.append(", scanBatch=").append(scanBatch);
300 }
301 if (versions >= 0) {
302 sb.append(", versions=").append(versions);
303 }
304 if (startTime != 0) {
305 sb.append("startTime=").append(startTime);
306 }
307 if (endTime != 0) {
308 sb.append("endTime=").append(endTime);
309 }
310 return sb.toString();
311 }
312
313 static String getDataFileName(int hashFileIndex) {
314 return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
315 }
316
317
318
319
320
321 public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
322 throws IOException {
323 return new Reader(conf, startKey);
324 }
325
326 public class Reader implements java.io.Closeable {
327 private final Configuration conf;
328
329 private int hashFileIndex;
330 private MapFile.Reader mapFileReader;
331
332 private boolean cachedNext;
333 private ImmutableBytesWritable key;
334 private ImmutableBytesWritable hash;
335
336 Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
337 this.conf = conf;
338 int partitionIndex = Collections.binarySearch(partitions, startKey);
339 if (partitionIndex >= 0) {
340
341 hashFileIndex = partitionIndex+1;
342 } else {
343
344 hashFileIndex = -1-partitionIndex;
345 }
346 openHashFile();
347
348
349
350 hash = new ImmutableBytesWritable();
351 key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
352 if (key == null) {
353 cachedNext = false;
354 hash = null;
355 } else {
356 cachedNext = true;
357 }
358 }
359
360
361
362
363
364 public boolean next() throws IOException {
365 if (cachedNext) {
366 cachedNext = false;
367 return true;
368 }
369 key = new ImmutableBytesWritable();
370 hash = new ImmutableBytesWritable();
371 while (true) {
372 boolean hasNext = mapFileReader.next(key, hash);
373 if (hasNext) {
374 return true;
375 }
376 hashFileIndex++;
377 if (hashFileIndex < TableHash.this.numHashFiles) {
378 mapFileReader.close();
379 openHashFile();
380 } else {
381 key = null;
382 hash = null;
383 return false;
384 }
385 }
386 }
387
388
389
390
391
392 public ImmutableBytesWritable getCurrentKey() {
393 return key;
394 }
395
396
397
398
399
400 public ImmutableBytesWritable getCurrentHash() {
401 return hash;
402 }
403
404 private void openHashFile() throws IOException {
405 if (mapFileReader != null) {
406 mapFileReader.close();
407 }
408 Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
409 Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
410 mapFileReader = new MapFile.Reader(dataFile, conf);
411 }
412
413 @Override
414 public void close() throws IOException {
415 mapFileReader.close();
416 }
417 }
418 }
419
420 static boolean isTableStartRow(byte[] row) {
421 return Bytes.equals(HConstants.EMPTY_START_ROW, row);
422 }
423
424 static boolean isTableEndRow(byte[] row) {
425 return Bytes.equals(HConstants.EMPTY_END_ROW, row);
426 }
427
428 public Job createSubmittableJob(String[] args) throws IOException {
429 Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
430 generatePartitions(partitionsPath);
431
432 Job job = Job.getInstance(getConf(),
433 getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
434 Configuration jobConf = job.getConfiguration();
435 jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
436 job.setJarByClass(HashTable.class);
437
438 TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
439 HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
440
441
442 job.setPartitionerClass(TotalOrderPartitioner.class);
443 TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
444 job.setReducerClass(Reducer.class);
445 job.setNumReduceTasks(tableHash.numHashFiles);
446 job.setOutputKeyClass(ImmutableBytesWritable.class);
447 job.setOutputValueClass(ImmutableBytesWritable.class);
448 job.setOutputFormatClass(MapFileOutputFormat.class);
449 FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
450
451 return job;
452 }
453
454 private void generatePartitions(Path partitionsPath) throws IOException {
455 Connection connection = ConnectionFactory.createConnection(getConf());
456 Pair<byte[][], byte[][]> regionKeys
457 = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
458 connection.close();
459
460 tableHash.selectPartitions(regionKeys);
461 LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
462
463 tableHash.writePartitionFile(getConf(), partitionsPath);
464 }
465
466 static class ResultHasher {
467 private MessageDigest digest;
468
469 private boolean batchStarted = false;
470 private ImmutableBytesWritable batchStartKey;
471 private ImmutableBytesWritable batchHash;
472 private long batchSize = 0;
473
474
475 public ResultHasher() {
476 try {
477 digest = MessageDigest.getInstance("MD5");
478 } catch (NoSuchAlgorithmException e) {
479 Throwables.propagate(e);
480 }
481 }
482
483 public void startBatch(ImmutableBytesWritable row) {
484 if (batchStarted) {
485 throw new RuntimeException("Cannot start new batch without finishing existing one.");
486 }
487 batchStarted = true;
488 batchSize = 0;
489 batchStartKey = row;
490 batchHash = null;
491 }
492
493 public void hashResult(Result result) {
494 if (!batchStarted) {
495 throw new RuntimeException("Cannot add to batch that has not been started.");
496 }
497 for (Cell cell : result.rawCells()) {
498 int rowLength = cell.getRowLength();
499 int familyLength = cell.getFamilyLength();
500 int qualifierLength = cell.getQualifierLength();
501 int valueLength = cell.getValueLength();
502 digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
503 digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
504 digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
505 long ts = cell.getTimestamp();
506 for (int i = 8; i > 0; i--) {
507 digest.update((byte) ts);
508 ts >>>= 8;
509 }
510 digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
511
512 batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
513 }
514 }
515
516 public void finishBatch() {
517 if (!batchStarted) {
518 throw new RuntimeException("Cannot finish batch that has not started.");
519 }
520 batchStarted = false;
521 batchHash = new ImmutableBytesWritable(digest.digest());
522 }
523
524 public boolean isBatchStarted() {
525 return batchStarted;
526 }
527
528 public ImmutableBytesWritable getBatchStartKey() {
529 return batchStartKey;
530 }
531
532 public ImmutableBytesWritable getBatchHash() {
533 return batchHash;
534 }
535
536 public long getBatchSize() {
537 return batchSize;
538 }
539 }
540
541 public static class HashMapper
542 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
543
544 private ResultHasher hasher;
545 private long targetBatchSize;
546
547 private ImmutableBytesWritable currentRow;
548
549 @Override
550 protected void setup(Context context) throws IOException, InterruptedException {
551 targetBatchSize = context.getConfiguration()
552 .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
553 hasher = new ResultHasher();
554
555 TableSplit split = (TableSplit) context.getInputSplit();
556 hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
557 }
558
559 @Override
560 protected void map(ImmutableBytesWritable key, Result value, Context context)
561 throws IOException, InterruptedException {
562
563 if (currentRow == null || !currentRow.equals(key)) {
564 currentRow = new ImmutableBytesWritable(key);
565
566 if (hasher.getBatchSize() >= targetBatchSize) {
567 hasher.finishBatch();
568 context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
569 hasher.startBatch(currentRow);
570 }
571 }
572
573 hasher.hashResult(value);
574 }
575
576 @Override
577 protected void cleanup(Context context) throws IOException, InterruptedException {
578 hasher.finishBatch();
579 context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
580 }
581 }
582
583 private void writeTempManifestFile() throws IOException {
584 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
585 FileSystem fs = tempManifestPath.getFileSystem(getConf());
586 tableHash.writePropertiesFile(fs, tempManifestPath);
587 }
588
589 private void completeManifest() throws IOException {
590 Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
591 Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
592 FileSystem fs = tempManifestPath.getFileSystem(getConf());
593 fs.rename(tempManifestPath, manifestPath);
594 }
595
596 private static final int NUM_ARGS = 2;
597 private static void printUsage(final String errorMsg) {
598 if (errorMsg != null && errorMsg.length() > 0) {
599 System.err.println("ERROR: " + errorMsg);
600 System.err.println();
601 }
602 System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
603 System.err.println();
604 System.err.println("Options:");
605 System.err.println(" batchsize the target amount of bytes to hash in each batch");
606 System.err.println(" rows are added to the batch until this size is reached");
607 System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
608 System.err.println(" numhashfiles the number of hash files to create");
609 System.err.println(" if set to fewer than number of regions then");
610 System.err.println(" the job will create this number of reducers");
611 System.err.println(" (defaults to 1/100 of regions -- at least 1)");
612 System.err.println(" startrow the start row");
613 System.err.println(" stoprow the stop row");
614 System.err.println(" starttime beginning of the time range (unixtime in millis)");
615 System.err.println(" without endtime means from starttime to forever");
616 System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
617 System.err.println(" scanbatch scanner batch size to support intra row scans");
618 System.err.println(" versions number of cell versions to include");
619 System.err.println(" families comma-separated list of families to include");
620 System.err.println();
621 System.err.println("Args:");
622 System.err.println(" tablename Name of the table to hash");
623 System.err.println(" outputpath Filesystem path to put the output data");
624 System.err.println();
625 System.err.println("Examples:");
626 System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
627 System.err.println(" $ bin/hbase " +
628 "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
629 + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
630 + " TestTable /hashes/testTable");
631 }
632
633 private boolean doCommandLine(final String[] args) {
634 if (args.length < NUM_ARGS) {
635 printUsage(null);
636 return false;
637 }
638 try {
639
640 tableHash.tableName = args[args.length-2];
641 destPath = new Path(args[args.length-1]);
642
643 for (int i = 0; i < args.length - NUM_ARGS; i++) {
644 String cmd = args[i];
645 if (cmd.equals("-h") || cmd.startsWith("--h")) {
646 printUsage(null);
647 return false;
648 }
649
650 final String batchSizeArgKey = "--batchsize=";
651 if (cmd.startsWith(batchSizeArgKey)) {
652 tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
653 continue;
654 }
655
656 final String numHashFilesArgKey = "--numhashfiles=";
657 if (cmd.startsWith(numHashFilesArgKey)) {
658 tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
659 continue;
660 }
661
662 final String startRowArgKey = "--startrow=";
663 if (cmd.startsWith(startRowArgKey)) {
664 tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
665 continue;
666 }
667
668 final String stopRowArgKey = "--stoprow=";
669 if (cmd.startsWith(stopRowArgKey)) {
670 tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
671 continue;
672 }
673
674 final String startTimeArgKey = "--starttime=";
675 if (cmd.startsWith(startTimeArgKey)) {
676 tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
677 continue;
678 }
679
680 final String endTimeArgKey = "--endtime=";
681 if (cmd.startsWith(endTimeArgKey)) {
682 tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
683 continue;
684 }
685
686 final String scanBatchArgKey = "--scanbatch=";
687 if (cmd.startsWith(scanBatchArgKey)) {
688 tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
689 continue;
690 }
691
692 final String versionsArgKey = "--versions=";
693 if (cmd.startsWith(versionsArgKey)) {
694 tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
695 continue;
696 }
697
698 final String familiesArgKey = "--families=";
699 if (cmd.startsWith(familiesArgKey)) {
700 tableHash.families = cmd.substring(familiesArgKey.length());
701 continue;
702 }
703
704 printUsage("Invalid argument '" + cmd + "'");
705 return false;
706 }
707 if ((tableHash.startTime != 0 || tableHash.endTime != 0)
708 && (tableHash.startTime >= tableHash.endTime)) {
709 printUsage("Invalid time range filter: starttime="
710 + tableHash.startTime + " >= endtime=" + tableHash.endTime);
711 return false;
712 }
713
714 } catch (Exception e) {
715 e.printStackTrace();
716 printUsage("Can't start because " + e.getMessage());
717 return false;
718 }
719 return true;
720 }
721
722
723
724
725 public static void main(String[] args) throws Exception {
726 int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
727 System.exit(ret);
728 }
729
730 @Override
731 public int run(String[] args) throws Exception {
732 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
733 if (!doCommandLine(otherArgs)) {
734 return 1;
735 }
736
737 Job job = createSubmittableJob(otherArgs);
738 writeTempManifestFile();
739 if (!job.waitForCompletion(true)) {
740 LOG.info("Map-reduce job failed!");
741 return 1;
742 }
743 completeManifest();
744 return 0;
745 }
746
747 }