001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.io.InputStreamReader;
022import java.io.OutputStreamWriter;
023import java.security.MessageDigest;
024import java.security.NoSuchAlgorithmException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Properties;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.io.MapFile;
046import org.apache.hadoop.io.NullWritable;
047import org.apache.hadoop.io.SequenceFile;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.mapreduce.Reducer;
050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
051import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
052import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
053import org.apache.hadoop.util.GenericOptionsParser;
054import org.apache.hadoop.util.Tool;
055import org.apache.hadoop.util.ToolRunner;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
061import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
062import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;
063
064@InterfaceAudience.Private
065public class HashTable extends Configured implements Tool {
066
067  private static final Logger LOG = LoggerFactory.getLogger(HashTable.class);
068
069  private static final int DEFAULT_BATCH_SIZE = 8000;
070
071  private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
072  final static String PARTITIONS_FILE_NAME = "partitions";
073  final static String MANIFEST_FILE_NAME = "manifest";
074  final static String HASH_DATA_DIR = "hashes";
075  final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
076  final static String IGNORE_TIMESTAMPS = "ignoreTimestamps";
077  private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
078
079  TableHash tableHash = new TableHash();
080  Path destPath;
081
082  public HashTable(Configuration conf) {
083    super(conf);
084  }
085
086  public static class TableHash {
087
088    Path hashDir;
089
090    String tableName;
091    String families = null;
092    long batchSize = DEFAULT_BATCH_SIZE;
093    int numHashFiles = 0;
094    byte[] startRow = HConstants.EMPTY_START_ROW;
095    byte[] stopRow = HConstants.EMPTY_END_ROW;
096    int scanBatch = 0;
097    int versions = -1;
098    long startTime = 0;
099    long endTime = 0;
100    boolean ignoreTimestamps;
101
102    List<ImmutableBytesWritable> partitions;
103
104    public static TableHash read(Configuration conf, Path hashDir) throws IOException {
105      TableHash tableHash = new TableHash();
106      FileSystem fs = hashDir.getFileSystem(conf);
107      tableHash.hashDir = hashDir;
108      tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
109      tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
110      return tableHash;
111    }
112
113    void writePropertiesFile(FileSystem fs, Path path) throws IOException {
114      Properties p = new Properties();
115      p.setProperty("table", tableName);
116      if (families != null) {
117        p.setProperty("columnFamilies", families);
118      }
119      p.setProperty("targetBatchSize", Long.toString(batchSize));
120      p.setProperty("numHashFiles", Integer.toString(numHashFiles));
121      if (!isTableStartRow(startRow)) {
122        p.setProperty("startRowHex", Bytes.toHex(startRow));
123      }
124      if (!isTableEndRow(stopRow)) {
125        p.setProperty("stopRowHex", Bytes.toHex(stopRow));
126      }
127      if (scanBatch > 0) {
128        p.setProperty("scanBatch", Integer.toString(scanBatch));
129      }
130      if (versions >= 0) {
131        p.setProperty("versions", Integer.toString(versions));
132      }
133      if (startTime != 0) {
134        p.setProperty("startTimestamp", Long.toString(startTime));
135      }
136      if (endTime != 0) {
137        p.setProperty("endTimestamp", Long.toString(endTime));
138      }
139
140      try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
141        p.store(osw, null);
142      }
143    }
144
145    void readPropertiesFile(FileSystem fs, Path path) throws IOException {
146      Properties p = new Properties();
147      try (FSDataInputStream in = fs.open(path)) {
148        try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
149          p.load(isr);
150        }
151      }
152      tableName = p.getProperty("table");
153      families = p.getProperty("columnFamilies");
154      batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
155      numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
156
157      String startRowHex = p.getProperty("startRowHex");
158      if (startRowHex != null) {
159        startRow = Bytes.fromHex(startRowHex);
160      }
161      String stopRowHex = p.getProperty("stopRowHex");
162      if (stopRowHex != null) {
163        stopRow = Bytes.fromHex(stopRowHex);
164      }
165
166      String scanBatchString = p.getProperty("scanBatch");
167      if (scanBatchString != null) {
168        scanBatch = Integer.parseInt(scanBatchString);
169      }
170
171      String versionString = p.getProperty("versions");
172      if (versionString != null) {
173        versions = Integer.parseInt(versionString);
174      }
175
176      String startTimeString = p.getProperty("startTimestamp");
177      if (startTimeString != null) {
178        startTime = Long.parseLong(startTimeString);
179      }
180
181      String endTimeString = p.getProperty("endTimestamp");
182      if (endTimeString != null) {
183        endTime = Long.parseLong(endTimeString);
184      }
185    }
186
187    Scan initScan() throws IOException {
188      Scan scan = new Scan();
189      scan.setCacheBlocks(false);
190      if (startTime != 0 || endTime != 0) {
191        scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
192      }
193      if (scanBatch > 0) {
194        scan.setBatch(scanBatch);
195      }
196      if (versions >= 0) {
197        scan.setMaxVersions(versions);
198      }
199      if (!isTableStartRow(startRow)) {
200        scan.setStartRow(startRow);
201      }
202      if (!isTableEndRow(stopRow)) {
203        scan.setStopRow(stopRow);
204      }
205      if(families != null) {
206        for(String fam : families.split(",")) {
207          scan.addFamily(Bytes.toBytes(fam));
208        }
209      }
210      return scan;
211    }
212
213    /**
214     * Choose partitions between row ranges to hash to a single output file
215     * Selects region boundaries that fall within the scan range, and groups them
216     * into the desired number of partitions.
217     */
218    void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
219      List<byte[]> startKeys = new ArrayList<>();
220      for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
221        byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
222        byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
223
224        // if scan begins after this region, or starts before this region, then drop this region
225        // in other words:
226        //   IF (scan begins before the end of this region
227        //      AND scan ends before the start of this region)
228        //   THEN include this region
229        if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
230            || Bytes.compareTo(startRow, regionEndKey) < 0)
231          && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
232            || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
233          startKeys.add(regionStartKey);
234        }
235      }
236
237      int numRegions = startKeys.size();
238      if (numHashFiles == 0) {
239        numHashFiles = numRegions / 100;
240      }
241      if (numHashFiles == 0) {
242        numHashFiles = 1;
243      }
244      if (numHashFiles > numRegions) {
245        // can't partition within regions
246        numHashFiles = numRegions;
247      }
248
249      // choose a subset of start keys to group regions into ranges
250      partitions = new ArrayList<>(numHashFiles - 1);
251      // skip the first start key as it is not a partition between ranges.
252      for (long i = 1; i < numHashFiles; i++) {
253        int splitIndex = (int) (numRegions * i / numHashFiles);
254        partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
255      }
256    }
257
258    void writePartitionFile(Configuration conf, Path path) throws IOException {
259      FileSystem fs = path.getFileSystem(conf);
260      @SuppressWarnings("deprecation")
261      SequenceFile.Writer writer = SequenceFile.createWriter(
262        fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
263
264      for (int i = 0; i < partitions.size(); i++) {
265        writer.append(partitions.get(i), NullWritable.get());
266      }
267      writer.close();
268    }
269
270    private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
271         throws IOException {
272      @SuppressWarnings("deprecation")
273      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
274      ImmutableBytesWritable key = new ImmutableBytesWritable();
275      partitions = new ArrayList<>();
276      while (reader.next(key)) {
277        partitions.add(new ImmutableBytesWritable(key.copyBytes()));
278      }
279      reader.close();
280
281      if (!Ordering.natural().isOrdered(partitions)) {
282        throw new IOException("Partitions are not ordered!");
283      }
284    }
285
286    @Override
287    public String toString() {
288      StringBuilder sb = new StringBuilder();
289      sb.append("tableName=").append(tableName);
290      if (families != null) {
291        sb.append(", families=").append(families);
292      }
293      sb.append(", batchSize=").append(batchSize);
294      sb.append(", numHashFiles=").append(numHashFiles);
295      if (!isTableStartRow(startRow)) {
296        sb.append(", startRowHex=").append(Bytes.toHex(startRow));
297      }
298      if (!isTableEndRow(stopRow)) {
299        sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
300      }
301      if (scanBatch >= 0) {
302        sb.append(", scanBatch=").append(scanBatch);
303      }
304      if (versions >= 0) {
305        sb.append(", versions=").append(versions);
306      }
307      if (startTime != 0) {
308        sb.append("startTime=").append(startTime);
309      }
310      if (endTime != 0) {
311        sb.append("endTime=").append(endTime);
312      }
313      return sb.toString();
314    }
315
316    static String getDataFileName(int hashFileIndex) {
317      return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
318    }
319
320    /**
321     * Open a TableHash.Reader starting at the first hash at or after the given key.
322     * @throws IOException
323     */
324    public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
325        throws IOException {
326      return new Reader(conf, startKey);
327    }
328
329    public class Reader implements java.io.Closeable {
330      private final Configuration conf;
331
332      private int hashFileIndex;
333      private MapFile.Reader mapFileReader;
334
335      private boolean cachedNext;
336      private ImmutableBytesWritable key;
337      private ImmutableBytesWritable hash;
338
339      Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
340        this.conf = conf;
341        int partitionIndex = Collections.binarySearch(partitions, startKey);
342        if (partitionIndex >= 0) {
343          // if the key is equal to a partition, then go the file after that partition
344          hashFileIndex = partitionIndex+1;
345        } else {
346          // if the key is between partitions, then go to the file between those partitions
347          hashFileIndex = -1-partitionIndex;
348        }
349        openHashFile();
350
351        // MapFile's don't make it easy to seek() so that the subsequent next() returns
352        // the desired key/value pair.  So we cache it for the first call of next().
353        hash = new ImmutableBytesWritable();
354        key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
355        if (key == null) {
356          cachedNext = false;
357          hash = null;
358        } else {
359          cachedNext = true;
360        }
361      }
362
363      /**
364       * Read the next key/hash pair.
365       * Returns true if such a pair exists and false when at the end of the data.
366       */
367      public boolean next() throws IOException {
368        if (cachedNext) {
369          cachedNext = false;
370          return true;
371        }
372        key = new ImmutableBytesWritable();
373        hash = new ImmutableBytesWritable();
374        while (true) {
375          boolean hasNext = mapFileReader.next(key, hash);
376          if (hasNext) {
377            return true;
378          }
379          hashFileIndex++;
380          if (hashFileIndex < TableHash.this.numHashFiles) {
381            mapFileReader.close();
382            openHashFile();
383          } else {
384            key = null;
385            hash = null;
386            return false;
387          }
388        }
389      }
390
391      /**
392       * Get the current key
393       * @return the current key or null if there is no current key
394       */
395      public ImmutableBytesWritable getCurrentKey() {
396        return key;
397      }
398
399      /**
400       * Get the current hash
401       * @return the current hash or null if there is no current hash
402       */
403      public ImmutableBytesWritable getCurrentHash() {
404        return hash;
405      }
406
407      private void openHashFile() throws IOException {
408        if (mapFileReader != null) {
409          mapFileReader.close();
410        }
411        Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
412        Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
413        mapFileReader = new MapFile.Reader(dataFile, conf);
414      }
415
416      @Override
417      public void close() throws IOException {
418        mapFileReader.close();
419      }
420    }
421  }
422
423  static boolean isTableStartRow(byte[] row) {
424    return Bytes.equals(HConstants.EMPTY_START_ROW, row);
425  }
426
427  static boolean isTableEndRow(byte[] row) {
428    return Bytes.equals(HConstants.EMPTY_END_ROW, row);
429  }
430
431  public Job createSubmittableJob(String[] args) throws IOException {
432    Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
433    generatePartitions(partitionsPath);
434
435    Job job = Job.getInstance(getConf(),
436          getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
437    Configuration jobConf = job.getConfiguration();
438    jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
439    jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
440    job.setJarByClass(HashTable.class);
441
442    TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
443        HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
444
445    // use a TotalOrderPartitioner and reducers to group region output into hash files
446    job.setPartitionerClass(TotalOrderPartitioner.class);
447    TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
448    job.setReducerClass(Reducer.class);  // identity reducer
449    job.setNumReduceTasks(tableHash.numHashFiles);
450    job.setOutputKeyClass(ImmutableBytesWritable.class);
451    job.setOutputValueClass(ImmutableBytesWritable.class);
452    job.setOutputFormatClass(MapFileOutputFormat.class);
453    FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
454
455    return job;
456  }
457
458  private void generatePartitions(Path partitionsPath) throws IOException {
459    Connection connection = ConnectionFactory.createConnection(getConf());
460    Pair<byte[][], byte[][]> regionKeys
461      = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
462    connection.close();
463
464    tableHash.selectPartitions(regionKeys);
465    LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
466
467    tableHash.writePartitionFile(getConf(), partitionsPath);
468  }
469
470  static class ResultHasher {
471    private MessageDigest digest;
472
473    private boolean batchStarted = false;
474    private ImmutableBytesWritable batchStartKey;
475    private ImmutableBytesWritable batchHash;
476    private long batchSize = 0;
477    boolean ignoreTimestamps;
478
479
480    public ResultHasher() {
481      try {
482        digest = MessageDigest.getInstance("MD5");
483      } catch (NoSuchAlgorithmException e) {
484        Throwables.propagate(e);
485      }
486    }
487
488    public void startBatch(ImmutableBytesWritable row) {
489      if (batchStarted) {
490        throw new RuntimeException("Cannot start new batch without finishing existing one.");
491      }
492      batchStarted = true;
493      batchSize = 0;
494      batchStartKey = row;
495      batchHash = null;
496    }
497
498    public void hashResult(Result result) {
499      if (!batchStarted) {
500        throw new RuntimeException("Cannot add to batch that has not been started.");
501      }
502      for (Cell cell : result.rawCells()) {
503        int rowLength = cell.getRowLength();
504        int familyLength = cell.getFamilyLength();
505        int qualifierLength = cell.getQualifierLength();
506        int valueLength = cell.getValueLength();
507        digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
508        digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
509        digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
510
511        if (!ignoreTimestamps) {
512          long ts = cell.getTimestamp();
513          for (int i = 8; i > 0; i--) {
514            digest.update((byte) ts);
515            ts >>>= 8;
516          }
517        }
518        digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
519
520        batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
521      }
522    }
523
524    public void finishBatch() {
525      if (!batchStarted) {
526        throw new RuntimeException("Cannot finish batch that has not started.");
527      }
528      batchStarted = false;
529      batchHash = new ImmutableBytesWritable(digest.digest());
530    }
531
532    public boolean isBatchStarted() {
533      return batchStarted;
534    }
535
536    public ImmutableBytesWritable getBatchStartKey() {
537      return batchStartKey;
538    }
539
540    public ImmutableBytesWritable getBatchHash() {
541      return batchHash;
542    }
543
544    public long getBatchSize() {
545      return batchSize;
546    }
547  }
548
549  public static class HashMapper
550    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
551
552    private ResultHasher hasher;
553    private long targetBatchSize;
554
555    private ImmutableBytesWritable currentRow;
556
557    @Override
558    protected void setup(Context context) throws IOException, InterruptedException {
559      targetBatchSize = context.getConfiguration()
560          .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
561      hasher = new ResultHasher();
562      hasher.ignoreTimestamps = context.getConfiguration().
563        getBoolean(IGNORE_TIMESTAMPS, false);
564      TableSplit split = (TableSplit) context.getInputSplit();
565      hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
566    }
567
568    @Override
569    protected void map(ImmutableBytesWritable key, Result value, Context context)
570        throws IOException, InterruptedException {
571
572      if (currentRow == null || !currentRow.equals(key)) {
573        currentRow = new ImmutableBytesWritable(key); // not immutable
574
575        if (hasher.getBatchSize() >= targetBatchSize) {
576          hasher.finishBatch();
577          context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
578          hasher.startBatch(currentRow);
579        }
580      }
581
582      hasher.hashResult(value);
583    }
584
585    @Override
586    protected void cleanup(Context context) throws IOException, InterruptedException {
587      hasher.finishBatch();
588      context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
589    }
590  }
591
592  private void writeTempManifestFile() throws IOException {
593    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
594    FileSystem fs = tempManifestPath.getFileSystem(getConf());
595    tableHash.writePropertiesFile(fs, tempManifestPath);
596  }
597
598  private void completeManifest() throws IOException {
599    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
600    Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
601    FileSystem fs = tempManifestPath.getFileSystem(getConf());
602    fs.rename(tempManifestPath, manifestPath);
603  }
604
605  private static final int NUM_ARGS = 2;
606  private static void printUsage(final String errorMsg) {
607    if (errorMsg != null && errorMsg.length() > 0) {
608      System.err.println("ERROR: " + errorMsg);
609      System.err.println();
610    }
611    System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
612    System.err.println();
613    System.err.println("Options:");
614    System.err.println(" batchsize         the target amount of bytes to hash in each batch");
615    System.err.println("                   rows are added to the batch until this size is reached");
616    System.err.println("                   (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
617    System.err.println(" numhashfiles      the number of hash files to create");
618    System.err.println("                   if set to fewer than number of regions then");
619    System.err.println("                   the job will create this number of reducers");
620    System.err.println("                   (defaults to 1/100 of regions -- at least 1)");
621    System.err.println(" startrow          the start row");
622    System.err.println(" stoprow           the stop row");
623    System.err.println(" starttime         beginning of the time range (unixtime in millis)");
624    System.err.println("                   without endtime means from starttime to forever");
625    System.err.println(" endtime           end of the time range.");
626    System.err.println("                   Ignored if no starttime specified.");
627    System.err.println(" scanbatch         scanner batch size to support intra row scans");
628    System.err.println(" versions          number of cell versions to include");
629    System.err.println(" families          comma-separated list of families to include");
630    System.err.println(" ignoreTimestamps  if true, ignores cell timestamps");
631    System.err.println("                   when calculating hashes");
632    System.err.println();
633    System.err.println("Args:");
634    System.err.println(" tablename     Name of the table to hash");
635    System.err.println(" outputpath    Filesystem path to put the output data");
636    System.err.println();
637    System.err.println("Examples:");
638    System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
639    System.err.println(" $ hbase " +
640        "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
641        + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
642        + " TestTable /hashes/testTable");
643  }
644
645  private boolean doCommandLine(final String[] args) {
646    if (args.length < NUM_ARGS) {
647      printUsage(null);
648      return false;
649    }
650    try {
651
652      tableHash.tableName = args[args.length-2];
653      destPath = new Path(args[args.length-1]);
654
655      for (int i = 0; i < args.length - NUM_ARGS; i++) {
656        String cmd = args[i];
657        if (cmd.equals("-h") || cmd.startsWith("--h")) {
658          printUsage(null);
659          return false;
660        }
661
662        final String batchSizeArgKey = "--batchsize=";
663        if (cmd.startsWith(batchSizeArgKey)) {
664          tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
665          continue;
666        }
667
668        final String numHashFilesArgKey = "--numhashfiles=";
669        if (cmd.startsWith(numHashFilesArgKey)) {
670          tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
671          continue;
672        }
673
674        final String startRowArgKey = "--startrow=";
675        if (cmd.startsWith(startRowArgKey)) {
676          tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
677          continue;
678        }
679
680        final String stopRowArgKey = "--stoprow=";
681        if (cmd.startsWith(stopRowArgKey)) {
682          tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
683          continue;
684        }
685
686        final String startTimeArgKey = "--starttime=";
687        if (cmd.startsWith(startTimeArgKey)) {
688          tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
689          continue;
690        }
691
692        final String endTimeArgKey = "--endtime=";
693        if (cmd.startsWith(endTimeArgKey)) {
694          tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
695          continue;
696        }
697
698        final String scanBatchArgKey = "--scanbatch=";
699        if (cmd.startsWith(scanBatchArgKey)) {
700          tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
701          continue;
702        }
703
704        final String versionsArgKey = "--versions=";
705        if (cmd.startsWith(versionsArgKey)) {
706          tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
707          continue;
708        }
709
710        final String familiesArgKey = "--families=";
711        if (cmd.startsWith(familiesArgKey)) {
712          tableHash.families = cmd.substring(familiesArgKey.length());
713          continue;
714        }
715
716        final String ignoreTimestampsKey = "--ignoreTimestamps=";
717        if (cmd.startsWith(ignoreTimestampsKey)) {
718          tableHash.ignoreTimestamps = Boolean.
719            parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
720          continue;
721        }
722
723        printUsage("Invalid argument '" + cmd + "'");
724        return false;
725      }
726      if ((tableHash.startTime != 0 || tableHash.endTime != 0)
727          && (tableHash.startTime >= tableHash.endTime)) {
728        printUsage("Invalid time range filter: starttime="
729            + tableHash.startTime + " >=  endtime=" + tableHash.endTime);
730        return false;
731      }
732
733    } catch (Exception e) {
734      e.printStackTrace();
735      printUsage("Can't start because " + e.getMessage());
736      return false;
737    }
738    return true;
739  }
740
741  /**
742   * Main entry point.
743   */
744  public static void main(String[] args) throws Exception {
745    int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
746    System.exit(ret);
747  }
748
749  @Override
750  public int run(String[] args) throws Exception {
751    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
752    if (!doCommandLine(otherArgs)) {
753      return 1;
754    }
755
756    Job job = createSubmittableJob(otherArgs);
757    writeTempManifestFile();
758    if (!job.waitForCompletion(true)) {
759      LOG.info("Map-reduce job failed!");
760      return 1;
761    }
762    completeManifest();
763    return 0;
764  }
765
766}