View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.io.InputStreamReader;
22  import java.io.OutputStreamWriter;
23  import java.security.MessageDigest;
24  import java.security.NoSuchAlgorithmException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.Properties;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.conf.Configured;
34  import org.apache.hadoop.fs.FSDataInputStream;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.client.Connection;
42  import org.apache.hadoop.hbase.client.ConnectionFactory;
43  import org.apache.hadoop.hbase.client.Result;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.Pair;
48  import org.apache.hadoop.io.MapFile;
49  import org.apache.hadoop.io.NullWritable;
50  import org.apache.hadoop.io.SequenceFile;
51  import org.apache.hadoop.mapreduce.Job;
52  import org.apache.hadoop.mapreduce.Reducer;
53  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
54  import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
55  import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
56  import org.apache.hadoop.util.GenericOptionsParser;
57  import org.apache.hadoop.util.Tool;
58  import org.apache.hadoop.util.ToolRunner;
59  
60  import com.google.common.base.Charsets;
61  import com.google.common.base.Throwables;
62  import com.google.common.collect.Ordering;
63  
64  public class HashTable extends Configured implements Tool {
65  
66    private static final Log LOG = LogFactory.getLog(HashTable.class);
67    
68    private static final int DEFAULT_BATCH_SIZE = 8000;
69    
70    private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
71    final static String PARTITIONS_FILE_NAME = "partitions";
72    final static String MANIFEST_FILE_NAME = "manifest";
73    final static String HASH_DATA_DIR = "hashes";
74    final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
75    private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
76    
77    TableHash tableHash = new TableHash();
78    Path destPath;
79    
80    public HashTable(Configuration conf) {
81      super(conf);
82    }
83    
84    public static class TableHash {
85      
86      Path hashDir;
87      
88      String tableName;
89      String families = null;
90      long batchSize = DEFAULT_BATCH_SIZE;
91      int numHashFiles = 0;
92      byte[] startRow = HConstants.EMPTY_START_ROW;
93      byte[] stopRow = HConstants.EMPTY_END_ROW;
94      int scanBatch = 0;
95      int versions = -1;
96      long startTime = 0;
97      long endTime = 0;
98      
99      List<ImmutableBytesWritable> partitions;
100     
101     public static TableHash read(Configuration conf, Path hashDir) throws IOException {
102       TableHash tableHash = new TableHash();
103       FileSystem fs = hashDir.getFileSystem(conf);
104       tableHash.hashDir = hashDir;
105       tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
106       tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
107       return tableHash;
108     }
109     
110     void writePropertiesFile(FileSystem fs, Path path) throws IOException {
111       Properties p = new Properties();
112       p.setProperty("table", tableName);
113       if (families != null) {
114         p.setProperty("columnFamilies", families);
115       }
116       p.setProperty("targetBatchSize", Long.toString(batchSize));
117       p.setProperty("numHashFiles", Integer.toString(numHashFiles));
118       if (!isTableStartRow(startRow)) {
119         p.setProperty("startRowHex", Bytes.toHex(startRow));
120       }
121       if (!isTableEndRow(stopRow)) {
122         p.setProperty("stopRowHex", Bytes.toHex(stopRow));
123       }
124       if (scanBatch > 0) {
125         p.setProperty("scanBatch", Integer.toString(scanBatch));
126       }
127       if (versions >= 0) {
128         p.setProperty("versions", Integer.toString(versions));
129       }
130       if (startTime != 0) {
131         p.setProperty("startTimestamp", Long.toString(startTime));
132       }
133       if (endTime != 0) {
134         p.setProperty("endTimestamp", Long.toString(endTime));
135       }
136       
137       try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
138         p.store(osw, null);
139       }
140     }
141 
142     void readPropertiesFile(FileSystem fs, Path path) throws IOException {
143       Properties p = new Properties();
144       try (FSDataInputStream in = fs.open(path)) {
145         try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
146           p.load(isr);
147         }
148       }
149       tableName = p.getProperty("table");
150       families = p.getProperty("columnFamilies");
151       batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
152       numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
153       
154       String startRowHex = p.getProperty("startRowHex");
155       if (startRowHex != null) {
156         startRow = Bytes.fromHex(startRowHex);
157       }
158       String stopRowHex = p.getProperty("stopRowHex");
159       if (stopRowHex != null) {
160         stopRow = Bytes.fromHex(stopRowHex);
161       }
162       
163       String scanBatchString = p.getProperty("scanBatch");
164       if (scanBatchString != null) {
165         scanBatch = Integer.parseInt(scanBatchString);
166       }
167       
168       String versionString = p.getProperty("versions");
169       if (versionString != null) {
170         versions = Integer.parseInt(versionString);
171       }
172       
173       String startTimeString = p.getProperty("startTimestamp");
174       if (startTimeString != null) {
175         startTime = Long.parseLong(startTimeString);
176       }
177       
178       String endTimeString = p.getProperty("endTimestamp");
179       if (endTimeString != null) {
180         endTime = Long.parseLong(endTimeString);
181       }
182     }
183     
184     Scan initScan() throws IOException {
185       Scan scan = new Scan();
186       scan.setCacheBlocks(false);
187       if (startTime != 0 || endTime != 0) {
188         scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
189       }
190       if (scanBatch > 0) {
191         scan.setBatch(scanBatch);
192       }
193       if (versions >= 0) {
194         scan.setMaxVersions(versions);
195       }
196       if (!isTableStartRow(startRow)) {
197         scan.setStartRow(startRow);
198       }
199       if (!isTableEndRow(stopRow)) {
200         scan.setStopRow(stopRow);
201       }
202       if(families != null) {
203         for(String fam : families.split(",")) {
204           scan.addFamily(Bytes.toBytes(fam));
205         }
206       }
207       return scan;
208     }
209     
210     /**
211      * Choose partitions between row ranges to hash to a single output file
212      * Selects region boundaries that fall within the scan range, and groups them
213      * into the desired number of partitions.
214      */
215     void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
216       List<byte[]> startKeys = new ArrayList<byte[]>();
217       for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
218         byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
219         byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
220         
221         // if scan begins after this region, or starts before this region, then drop this region
222         // in other words:
223         //   IF (scan begins before the end of this region
224         //      AND scan ends before the start of this region)
225         //   THEN include this region
226         if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
227             || Bytes.compareTo(startRow, regionEndKey) < 0)
228           && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
229             || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
230           startKeys.add(regionStartKey);
231         }
232       }
233       
234       int numRegions = startKeys.size();
235       if (numHashFiles == 0) {
236         numHashFiles = numRegions / 100;
237       }
238       if (numHashFiles == 0) {
239         numHashFiles = 1;
240       }
241       if (numHashFiles > numRegions) {
242         // can't partition within regions
243         numHashFiles = numRegions;
244       }
245       
246       // choose a subset of start keys to group regions into ranges
247       partitions = new ArrayList<ImmutableBytesWritable>(numHashFiles - 1);
248       // skip the first start key as it is not a partition between ranges.
249       for (long i = 1; i < numHashFiles; i++) {
250         int splitIndex = (int) (numRegions * i / numHashFiles);
251         partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
252       }
253     }
254     
255     void writePartitionFile(Configuration conf, Path path) throws IOException {
256       FileSystem fs = path.getFileSystem(conf);
257       @SuppressWarnings("deprecation")
258       SequenceFile.Writer writer = SequenceFile.createWriter(
259         fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
260       
261       for (int i = 0; i < partitions.size(); i++) {
262         writer.append(partitions.get(i), NullWritable.get());
263       }
264       writer.close();
265     }
266     
267     private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
268          throws IOException {
269       @SuppressWarnings("deprecation")
270       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
271       ImmutableBytesWritable key = new ImmutableBytesWritable();
272       partitions = new ArrayList<ImmutableBytesWritable>();
273       while (reader.next(key)) {
274         partitions.add(new ImmutableBytesWritable(key.copyBytes()));
275       }
276       reader.close();
277       
278       if (!Ordering.natural().isOrdered(partitions)) {
279         throw new IOException("Partitions are not ordered!");
280       }
281     }
282 
283     @Override
284     public String toString() {
285       StringBuilder sb = new StringBuilder();
286       sb.append("tableName=").append(tableName);
287       if (families != null) {
288         sb.append(", families=").append(families);
289       }
290       sb.append(", batchSize=").append(batchSize);
291       sb.append(", numHashFiles=").append(numHashFiles);
292       if (!isTableStartRow(startRow)) {
293         sb.append(", startRowHex=").append(Bytes.toHex(startRow));
294       }
295       if (!isTableEndRow(stopRow)) {
296         sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
297       }
298       if (scanBatch >= 0) {
299         sb.append(", scanBatch=").append(scanBatch);
300       }
301       if (versions >= 0) {
302         sb.append(", versions=").append(versions);
303       }
304       if (startTime != 0) {
305         sb.append("startTime=").append(startTime);
306       }
307       if (endTime != 0) {
308         sb.append("endTime=").append(endTime);
309       }
310       return sb.toString();
311     }
312     
313     static String getDataFileName(int hashFileIndex) {
314       return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
315     }
316     
317     /**
318      * Open a TableHash.Reader starting at the first hash at or after the given key.
319      * @throws IOException 
320      */
321     public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
322         throws IOException {
323       return new Reader(conf, startKey);
324     }
325     
326     public class Reader implements java.io.Closeable {
327       private final Configuration conf;
328       
329       private int hashFileIndex;
330       private MapFile.Reader mapFileReader;
331       
332       private boolean cachedNext;
333       private ImmutableBytesWritable key;
334       private ImmutableBytesWritable hash;
335       
336       Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
337         this.conf = conf;
338         int partitionIndex = Collections.binarySearch(partitions, startKey);
339         if (partitionIndex >= 0) {
340           // if the key is equal to a partition, then go the file after that partition
341           hashFileIndex = partitionIndex+1;
342         } else {
343           // if the key is between partitions, then go to the file between those partitions
344           hashFileIndex = -1-partitionIndex;
345         }
346         openHashFile();
347         
348         // MapFile's don't make it easy to seek() so that the subsequent next() returns
349         // the desired key/value pair.  So we cache it for the first call of next().
350         hash = new ImmutableBytesWritable();
351         key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
352         if (key == null) {
353           cachedNext = false;
354           hash = null;
355         } else {
356           cachedNext = true;
357         }
358       }
359       
360       /**
361        * Read the next key/hash pair.
362        * Returns true if such a pair exists and false when at the end of the data.
363        */
364       public boolean next() throws IOException {
365         if (cachedNext) {
366           cachedNext = false;
367           return true;
368         }
369         key = new ImmutableBytesWritable();
370         hash = new ImmutableBytesWritable();
371         while (true) {
372           boolean hasNext = mapFileReader.next(key, hash);
373           if (hasNext) {
374             return true;
375           }
376           hashFileIndex++;
377           if (hashFileIndex < TableHash.this.numHashFiles) {
378             mapFileReader.close();
379             openHashFile();
380           } else {
381             key = null;
382             hash = null;
383             return false;
384           }
385         }
386       }
387       
388       /**
389        * Get the current key
390        * @return the current key or null if there is no current key
391        */
392       public ImmutableBytesWritable getCurrentKey() {
393         return key;
394       }
395       
396       /**
397        * Get the current hash
398        * @return the current hash or null if there is no current hash
399        */
400       public ImmutableBytesWritable getCurrentHash() {
401         return hash;
402       }
403       
404       private void openHashFile() throws IOException {
405         if (mapFileReader != null) {
406           mapFileReader.close();
407         }
408         Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
409         Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
410         mapFileReader = new MapFile.Reader(dataFile, conf);
411       }
412 
413       @Override
414       public void close() throws IOException {
415         mapFileReader.close();
416       }
417     }
418   }
419   
420   static boolean isTableStartRow(byte[] row) {
421     return Bytes.equals(HConstants.EMPTY_START_ROW, row);
422   }
423   
424   static boolean isTableEndRow(byte[] row) {
425     return Bytes.equals(HConstants.EMPTY_END_ROW, row);
426   }
427   
428   public Job createSubmittableJob(String[] args) throws IOException {
429     Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
430     generatePartitions(partitionsPath);
431     
432     Job job = Job.getInstance(getConf(),
433           getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
434     Configuration jobConf = job.getConfiguration();
435     jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
436     job.setJarByClass(HashTable.class);
437 
438     TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
439         HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
440     
441     // use a TotalOrderPartitioner and reducers to group region output into hash files
442     job.setPartitionerClass(TotalOrderPartitioner.class);
443     TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
444     job.setReducerClass(Reducer.class);  // identity reducer
445     job.setNumReduceTasks(tableHash.numHashFiles);
446     job.setOutputKeyClass(ImmutableBytesWritable.class);
447     job.setOutputValueClass(ImmutableBytesWritable.class);
448     job.setOutputFormatClass(MapFileOutputFormat.class);
449     FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
450     
451     return job;
452   }
453   
454   private void generatePartitions(Path partitionsPath) throws IOException {
455     Connection connection = ConnectionFactory.createConnection(getConf());
456     Pair<byte[][], byte[][]> regionKeys
457       = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
458     connection.close();
459     
460     tableHash.selectPartitions(regionKeys);
461     LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
462     
463     tableHash.writePartitionFile(getConf(), partitionsPath);
464   }
465   
466   static class ResultHasher {
467     private MessageDigest digest;
468     
469     private boolean batchStarted = false;
470     private ImmutableBytesWritable batchStartKey;
471     private ImmutableBytesWritable batchHash;
472     private long batchSize = 0;
473     
474     
475     public ResultHasher() {
476       try {
477         digest = MessageDigest.getInstance("MD5");
478       } catch (NoSuchAlgorithmException e) {
479         Throwables.propagate(e);
480       }
481     }
482     
483     public void startBatch(ImmutableBytesWritable row) {
484       if (batchStarted) {
485         throw new RuntimeException("Cannot start new batch without finishing existing one.");
486       }
487       batchStarted = true;
488       batchSize = 0;
489       batchStartKey = row;
490       batchHash = null;
491     }
492     
493     public void hashResult(Result result) {
494       if (!batchStarted) {
495         throw new RuntimeException("Cannot add to batch that has not been started.");
496       }
497       for (Cell cell : result.rawCells()) {
498         int rowLength = cell.getRowLength();
499         int familyLength = cell.getFamilyLength();
500         int qualifierLength = cell.getQualifierLength();
501         int valueLength = cell.getValueLength();
502         digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
503         digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
504         digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
505         long ts = cell.getTimestamp();
506         for (int i = 8; i > 0; i--) {
507           digest.update((byte) ts);
508           ts >>>= 8;
509         }
510         digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
511         
512         batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
513       }
514     }
515     
516     public void finishBatch() {
517       if (!batchStarted) {
518         throw new RuntimeException("Cannot finish batch that has not started.");
519       }
520       batchStarted = false;
521       batchHash = new ImmutableBytesWritable(digest.digest());
522     }
523 
524     public boolean isBatchStarted() {
525       return batchStarted;
526     }
527 
528     public ImmutableBytesWritable getBatchStartKey() {
529       return batchStartKey;
530     }
531 
532     public ImmutableBytesWritable getBatchHash() {
533       return batchHash;
534     }
535 
536     public long getBatchSize() {
537       return batchSize;
538     }
539   }
540   
541   public static class HashMapper
542     extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
543     
544     private ResultHasher hasher;
545     private long targetBatchSize;
546     
547     private ImmutableBytesWritable currentRow;
548     
549     @Override
550     protected void setup(Context context) throws IOException, InterruptedException {
551       targetBatchSize = context.getConfiguration()
552           .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
553       hasher = new ResultHasher();
554       
555       TableSplit split = (TableSplit) context.getInputSplit();
556       hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
557     }
558     
559     @Override
560     protected void map(ImmutableBytesWritable key, Result value, Context context)
561         throws IOException, InterruptedException {
562       
563       if (currentRow == null || !currentRow.equals(key)) {
564         currentRow = new ImmutableBytesWritable(key); // not immutable
565         
566         if (hasher.getBatchSize() >= targetBatchSize) {
567           hasher.finishBatch();
568           context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
569           hasher.startBatch(currentRow);
570         }
571       }
572       
573       hasher.hashResult(value);
574     }
575 
576     @Override
577     protected void cleanup(Context context) throws IOException, InterruptedException {
578       hasher.finishBatch();
579       context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
580     }
581   }
582   
583   private void writeTempManifestFile() throws IOException {
584     Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
585     FileSystem fs = tempManifestPath.getFileSystem(getConf());
586     tableHash.writePropertiesFile(fs, tempManifestPath);
587   }
588   
589   private void completeManifest() throws IOException {
590     Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
591     Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
592     FileSystem fs = tempManifestPath.getFileSystem(getConf());
593     fs.rename(tempManifestPath, manifestPath);
594   }
595   
596   private static final int NUM_ARGS = 2;
597   private static void printUsage(final String errorMsg) {
598     if (errorMsg != null && errorMsg.length() > 0) {
599       System.err.println("ERROR: " + errorMsg);
600       System.err.println();
601     }
602     System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
603     System.err.println();
604     System.err.println("Options:");
605     System.err.println(" batchsize     the target amount of bytes to hash in each batch");
606     System.err.println("               rows are added to the batch until this size is reached");
607     System.err.println("               (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
608     System.err.println(" numhashfiles  the number of hash files to create");
609     System.err.println("               if set to fewer than number of regions then");
610     System.err.println("               the job will create this number of reducers");
611     System.err.println("               (defaults to 1/100 of regions -- at least 1)");
612     System.err.println(" startrow      the start row");
613     System.err.println(" stoprow       the stop row");
614     System.err.println(" starttime     beginning of the time range (unixtime in millis)");
615     System.err.println("               without endtime means from starttime to forever");
616     System.err.println(" endtime       end of the time range.  Ignored if no starttime specified.");
617     System.err.println(" scanbatch     scanner batch size to support intra row scans");
618     System.err.println(" versions      number of cell versions to include");
619     System.err.println(" families      comma-separated list of families to include");
620     System.err.println();
621     System.err.println("Args:");
622     System.err.println(" tablename     Name of the table to hash");
623     System.err.println(" outputpath    Filesystem path to put the output data");
624     System.err.println();
625     System.err.println("Examples:");
626     System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
627     System.err.println(" $ bin/hbase " +
628         "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
629         + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
630         + " TestTable /hashes/testTable");
631   }
632 
633   private boolean doCommandLine(final String[] args) {
634     if (args.length < NUM_ARGS) {
635       printUsage(null);
636       return false;
637     }
638     try {
639       
640       tableHash.tableName = args[args.length-2];
641       destPath = new Path(args[args.length-1]);
642       
643       for (int i = 0; i < args.length - NUM_ARGS; i++) {
644         String cmd = args[i];
645         if (cmd.equals("-h") || cmd.startsWith("--h")) {
646           printUsage(null);
647           return false;
648         }
649         
650         final String batchSizeArgKey = "--batchsize=";
651         if (cmd.startsWith(batchSizeArgKey)) {
652           tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
653           continue;
654         }
655         
656         final String numHashFilesArgKey = "--numhashfiles=";
657         if (cmd.startsWith(numHashFilesArgKey)) {
658           tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
659           continue;
660         }
661          
662         final String startRowArgKey = "--startrow=";
663         if (cmd.startsWith(startRowArgKey)) {
664           tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
665           continue;
666         }
667         
668         final String stopRowArgKey = "--stoprow=";
669         if (cmd.startsWith(stopRowArgKey)) {
670           tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
671           continue;
672         }
673         
674         final String startTimeArgKey = "--starttime=";
675         if (cmd.startsWith(startTimeArgKey)) {
676           tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
677           continue;
678         }
679 
680         final String endTimeArgKey = "--endtime=";
681         if (cmd.startsWith(endTimeArgKey)) {
682           tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
683           continue;
684         }
685 
686         final String scanBatchArgKey = "--scanbatch=";
687         if (cmd.startsWith(scanBatchArgKey)) {
688           tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
689           continue;
690         }
691 
692         final String versionsArgKey = "--versions=";
693         if (cmd.startsWith(versionsArgKey)) {
694           tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
695           continue;
696         }
697 
698         final String familiesArgKey = "--families=";
699         if (cmd.startsWith(familiesArgKey)) {
700           tableHash.families = cmd.substring(familiesArgKey.length());
701           continue;
702         }
703 
704         printUsage("Invalid argument '" + cmd + "'");
705         return false;
706       }
707       if ((tableHash.startTime != 0 || tableHash.endTime != 0)
708           && (tableHash.startTime >= tableHash.endTime)) {
709         printUsage("Invalid time range filter: starttime="
710             + tableHash.startTime + " >=  endtime=" + tableHash.endTime);
711         return false;
712       }
713       
714     } catch (Exception e) {
715       e.printStackTrace();
716       printUsage("Can't start because " + e.getMessage());
717       return false;
718     }
719     return true;
720   }
721 
722   /**
723    * Main entry point.
724    */
725   public static void main(String[] args) throws Exception {
726     int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
727     System.exit(ret);
728   }
729 
730   @Override
731   public int run(String[] args) throws Exception {
732     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
733     if (!doCommandLine(otherArgs)) {
734       return 1;
735     }
736 
737     Job job = createSubmittableJob(otherArgs);
738     writeTempManifestFile();
739     if (!job.waitForCompletion(true)) {
740       LOG.info("Map-reduce job failed!");
741       return 1;
742     }
743     completeManifest();
744     return 0;
745   }
746 
747 }