001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.io.InputStreamReader;
022import java.io.OutputStreamWriter;
023import java.security.MessageDigest;
024import java.security.NoSuchAlgorithmException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Properties;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.io.MapFile;
046import org.apache.hadoop.io.NullWritable;
047import org.apache.hadoop.io.SequenceFile;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.mapreduce.Reducer;
050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
051import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
052import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
053import org.apache.hadoop.util.GenericOptionsParser;
054import org.apache.hadoop.util.Tool;
055import org.apache.hadoop.util.ToolRunner;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
061import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
062import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;
063
064@InterfaceAudience.Private
065public class HashTable extends Configured implements Tool {
066
067  private static final Logger LOG = LoggerFactory.getLogger(HashTable.class);
068
069  private static final int DEFAULT_BATCH_SIZE = 8000;
070
071  private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
072  final static String PARTITIONS_FILE_NAME = "partitions";
073  final static String MANIFEST_FILE_NAME = "manifest";
074  final static String HASH_DATA_DIR = "hashes";
075  final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
076  private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
077
078  TableHash tableHash = new TableHash();
079  Path destPath;
080
081  public HashTable(Configuration conf) {
082    super(conf);
083  }
084
085  public static class TableHash {
086
087    Path hashDir;
088
089    String tableName;
090    String families = null;
091    long batchSize = DEFAULT_BATCH_SIZE;
092    int numHashFiles = 0;
093    byte[] startRow = HConstants.EMPTY_START_ROW;
094    byte[] stopRow = HConstants.EMPTY_END_ROW;
095    int scanBatch = 0;
096    int versions = -1;
097    long startTime = 0;
098    long endTime = 0;
099
100    List<ImmutableBytesWritable> partitions;
101
102    public static TableHash read(Configuration conf, Path hashDir) throws IOException {
103      TableHash tableHash = new TableHash();
104      FileSystem fs = hashDir.getFileSystem(conf);
105      tableHash.hashDir = hashDir;
106      tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
107      tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
108      return tableHash;
109    }
110
111    void writePropertiesFile(FileSystem fs, Path path) throws IOException {
112      Properties p = new Properties();
113      p.setProperty("table", tableName);
114      if (families != null) {
115        p.setProperty("columnFamilies", families);
116      }
117      p.setProperty("targetBatchSize", Long.toString(batchSize));
118      p.setProperty("numHashFiles", Integer.toString(numHashFiles));
119      if (!isTableStartRow(startRow)) {
120        p.setProperty("startRowHex", Bytes.toHex(startRow));
121      }
122      if (!isTableEndRow(stopRow)) {
123        p.setProperty("stopRowHex", Bytes.toHex(stopRow));
124      }
125      if (scanBatch > 0) {
126        p.setProperty("scanBatch", Integer.toString(scanBatch));
127      }
128      if (versions >= 0) {
129        p.setProperty("versions", Integer.toString(versions));
130      }
131      if (startTime != 0) {
132        p.setProperty("startTimestamp", Long.toString(startTime));
133      }
134      if (endTime != 0) {
135        p.setProperty("endTimestamp", Long.toString(endTime));
136      }
137
138      try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
139        p.store(osw, null);
140      }
141    }
142
143    void readPropertiesFile(FileSystem fs, Path path) throws IOException {
144      Properties p = new Properties();
145      try (FSDataInputStream in = fs.open(path)) {
146        try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
147          p.load(isr);
148        }
149      }
150      tableName = p.getProperty("table");
151      families = p.getProperty("columnFamilies");
152      batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
153      numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
154
155      String startRowHex = p.getProperty("startRowHex");
156      if (startRowHex != null) {
157        startRow = Bytes.fromHex(startRowHex);
158      }
159      String stopRowHex = p.getProperty("stopRowHex");
160      if (stopRowHex != null) {
161        stopRow = Bytes.fromHex(stopRowHex);
162      }
163
164      String scanBatchString = p.getProperty("scanBatch");
165      if (scanBatchString != null) {
166        scanBatch = Integer.parseInt(scanBatchString);
167      }
168
169      String versionString = p.getProperty("versions");
170      if (versionString != null) {
171        versions = Integer.parseInt(versionString);
172      }
173
174      String startTimeString = p.getProperty("startTimestamp");
175      if (startTimeString != null) {
176        startTime = Long.parseLong(startTimeString);
177      }
178
179      String endTimeString = p.getProperty("endTimestamp");
180      if (endTimeString != null) {
181        endTime = Long.parseLong(endTimeString);
182      }
183    }
184
185    Scan initScan() throws IOException {
186      Scan scan = new Scan();
187      scan.setCacheBlocks(false);
188      if (startTime != 0 || endTime != 0) {
189        scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
190      }
191      if (scanBatch > 0) {
192        scan.setBatch(scanBatch);
193      }
194      if (versions >= 0) {
195        scan.setMaxVersions(versions);
196      }
197      if (!isTableStartRow(startRow)) {
198        scan.setStartRow(startRow);
199      }
200      if (!isTableEndRow(stopRow)) {
201        scan.setStopRow(stopRow);
202      }
203      if(families != null) {
204        for(String fam : families.split(",")) {
205          scan.addFamily(Bytes.toBytes(fam));
206        }
207      }
208      return scan;
209    }
210
211    /**
212     * Choose partitions between row ranges to hash to a single output file
213     * Selects region boundaries that fall within the scan range, and groups them
214     * into the desired number of partitions.
215     */
216    void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
217      List<byte[]> startKeys = new ArrayList<>();
218      for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
219        byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
220        byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
221
222        // if scan begins after this region, or starts before this region, then drop this region
223        // in other words:
224        //   IF (scan begins before the end of this region
225        //      AND scan ends before the start of this region)
226        //   THEN include this region
227        if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
228            || Bytes.compareTo(startRow, regionEndKey) < 0)
229          && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
230            || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
231          startKeys.add(regionStartKey);
232        }
233      }
234
235      int numRegions = startKeys.size();
236      if (numHashFiles == 0) {
237        numHashFiles = numRegions / 100;
238      }
239      if (numHashFiles == 0) {
240        numHashFiles = 1;
241      }
242      if (numHashFiles > numRegions) {
243        // can't partition within regions
244        numHashFiles = numRegions;
245      }
246
247      // choose a subset of start keys to group regions into ranges
248      partitions = new ArrayList<>(numHashFiles - 1);
249      // skip the first start key as it is not a partition between ranges.
250      for (long i = 1; i < numHashFiles; i++) {
251        int splitIndex = (int) (numRegions * i / numHashFiles);
252        partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
253      }
254    }
255
256    void writePartitionFile(Configuration conf, Path path) throws IOException {
257      FileSystem fs = path.getFileSystem(conf);
258      @SuppressWarnings("deprecation")
259      SequenceFile.Writer writer = SequenceFile.createWriter(
260        fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
261
262      for (int i = 0; i < partitions.size(); i++) {
263        writer.append(partitions.get(i), NullWritable.get());
264      }
265      writer.close();
266    }
267
268    private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
269         throws IOException {
270      @SuppressWarnings("deprecation")
271      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
272      ImmutableBytesWritable key = new ImmutableBytesWritable();
273      partitions = new ArrayList<>();
274      while (reader.next(key)) {
275        partitions.add(new ImmutableBytesWritable(key.copyBytes()));
276      }
277      reader.close();
278
279      if (!Ordering.natural().isOrdered(partitions)) {
280        throw new IOException("Partitions are not ordered!");
281      }
282    }
283
284    @Override
285    public String toString() {
286      StringBuilder sb = new StringBuilder();
287      sb.append("tableName=").append(tableName);
288      if (families != null) {
289        sb.append(", families=").append(families);
290      }
291      sb.append(", batchSize=").append(batchSize);
292      sb.append(", numHashFiles=").append(numHashFiles);
293      if (!isTableStartRow(startRow)) {
294        sb.append(", startRowHex=").append(Bytes.toHex(startRow));
295      }
296      if (!isTableEndRow(stopRow)) {
297        sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
298      }
299      if (scanBatch >= 0) {
300        sb.append(", scanBatch=").append(scanBatch);
301      }
302      if (versions >= 0) {
303        sb.append(", versions=").append(versions);
304      }
305      if (startTime != 0) {
306        sb.append("startTime=").append(startTime);
307      }
308      if (endTime != 0) {
309        sb.append("endTime=").append(endTime);
310      }
311      return sb.toString();
312    }
313
314    static String getDataFileName(int hashFileIndex) {
315      return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
316    }
317
318    /**
319     * Open a TableHash.Reader starting at the first hash at or after the given key.
320     * @throws IOException
321     */
322    public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
323        throws IOException {
324      return new Reader(conf, startKey);
325    }
326
327    public class Reader implements java.io.Closeable {
328      private final Configuration conf;
329
330      private int hashFileIndex;
331      private MapFile.Reader mapFileReader;
332
333      private boolean cachedNext;
334      private ImmutableBytesWritable key;
335      private ImmutableBytesWritable hash;
336
337      Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
338        this.conf = conf;
339        int partitionIndex = Collections.binarySearch(partitions, startKey);
340        if (partitionIndex >= 0) {
341          // if the key is equal to a partition, then go the file after that partition
342          hashFileIndex = partitionIndex+1;
343        } else {
344          // if the key is between partitions, then go to the file between those partitions
345          hashFileIndex = -1-partitionIndex;
346        }
347        openHashFile();
348
349        // MapFile's don't make it easy to seek() so that the subsequent next() returns
350        // the desired key/value pair.  So we cache it for the first call of next().
351        hash = new ImmutableBytesWritable();
352        key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
353        if (key == null) {
354          cachedNext = false;
355          hash = null;
356        } else {
357          cachedNext = true;
358        }
359      }
360
361      /**
362       * Read the next key/hash pair.
363       * Returns true if such a pair exists and false when at the end of the data.
364       */
365      public boolean next() throws IOException {
366        if (cachedNext) {
367          cachedNext = false;
368          return true;
369        }
370        key = new ImmutableBytesWritable();
371        hash = new ImmutableBytesWritable();
372        while (true) {
373          boolean hasNext = mapFileReader.next(key, hash);
374          if (hasNext) {
375            return true;
376          }
377          hashFileIndex++;
378          if (hashFileIndex < TableHash.this.numHashFiles) {
379            mapFileReader.close();
380            openHashFile();
381          } else {
382            key = null;
383            hash = null;
384            return false;
385          }
386        }
387      }
388
389      /**
390       * Get the current key
391       * @return the current key or null if there is no current key
392       */
393      public ImmutableBytesWritable getCurrentKey() {
394        return key;
395      }
396
397      /**
398       * Get the current hash
399       * @return the current hash or null if there is no current hash
400       */
401      public ImmutableBytesWritable getCurrentHash() {
402        return hash;
403      }
404
405      private void openHashFile() throws IOException {
406        if (mapFileReader != null) {
407          mapFileReader.close();
408        }
409        Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
410        Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
411        mapFileReader = new MapFile.Reader(dataFile, conf);
412      }
413
414      @Override
415      public void close() throws IOException {
416        mapFileReader.close();
417      }
418    }
419  }
420
421  static boolean isTableStartRow(byte[] row) {
422    return Bytes.equals(HConstants.EMPTY_START_ROW, row);
423  }
424
425  static boolean isTableEndRow(byte[] row) {
426    return Bytes.equals(HConstants.EMPTY_END_ROW, row);
427  }
428
429  public Job createSubmittableJob(String[] args) throws IOException {
430    Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
431    generatePartitions(partitionsPath);
432
433    Job job = Job.getInstance(getConf(),
434          getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
435    Configuration jobConf = job.getConfiguration();
436    jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
437    job.setJarByClass(HashTable.class);
438
439    TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
440        HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
441
442    // use a TotalOrderPartitioner and reducers to group region output into hash files
443    job.setPartitionerClass(TotalOrderPartitioner.class);
444    TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
445    job.setReducerClass(Reducer.class);  // identity reducer
446    job.setNumReduceTasks(tableHash.numHashFiles);
447    job.setOutputKeyClass(ImmutableBytesWritable.class);
448    job.setOutputValueClass(ImmutableBytesWritable.class);
449    job.setOutputFormatClass(MapFileOutputFormat.class);
450    FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
451
452    return job;
453  }
454
455  private void generatePartitions(Path partitionsPath) throws IOException {
456    Connection connection = ConnectionFactory.createConnection(getConf());
457    Pair<byte[][], byte[][]> regionKeys
458      = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
459    connection.close();
460
461    tableHash.selectPartitions(regionKeys);
462    LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
463
464    tableHash.writePartitionFile(getConf(), partitionsPath);
465  }
466
467  static class ResultHasher {
468    private MessageDigest digest;
469
470    private boolean batchStarted = false;
471    private ImmutableBytesWritable batchStartKey;
472    private ImmutableBytesWritable batchHash;
473    private long batchSize = 0;
474
475
476    public ResultHasher() {
477      try {
478        digest = MessageDigest.getInstance("MD5");
479      } catch (NoSuchAlgorithmException e) {
480        Throwables.propagate(e);
481      }
482    }
483
484    public void startBatch(ImmutableBytesWritable row) {
485      if (batchStarted) {
486        throw new RuntimeException("Cannot start new batch without finishing existing one.");
487      }
488      batchStarted = true;
489      batchSize = 0;
490      batchStartKey = row;
491      batchHash = null;
492    }
493
494    public void hashResult(Result result) {
495      if (!batchStarted) {
496        throw new RuntimeException("Cannot add to batch that has not been started.");
497      }
498      for (Cell cell : result.rawCells()) {
499        int rowLength = cell.getRowLength();
500        int familyLength = cell.getFamilyLength();
501        int qualifierLength = cell.getQualifierLength();
502        int valueLength = cell.getValueLength();
503        digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
504        digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
505        digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
506        long ts = cell.getTimestamp();
507        for (int i = 8; i > 0; i--) {
508          digest.update((byte) ts);
509          ts >>>= 8;
510        }
511        digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
512
513        batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
514      }
515    }
516
517    public void finishBatch() {
518      if (!batchStarted) {
519        throw new RuntimeException("Cannot finish batch that has not started.");
520      }
521      batchStarted = false;
522      batchHash = new ImmutableBytesWritable(digest.digest());
523    }
524
525    public boolean isBatchStarted() {
526      return batchStarted;
527    }
528
529    public ImmutableBytesWritable getBatchStartKey() {
530      return batchStartKey;
531    }
532
533    public ImmutableBytesWritable getBatchHash() {
534      return batchHash;
535    }
536
537    public long getBatchSize() {
538      return batchSize;
539    }
540  }
541
542  public static class HashMapper
543    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
544
545    private ResultHasher hasher;
546    private long targetBatchSize;
547
548    private ImmutableBytesWritable currentRow;
549
550    @Override
551    protected void setup(Context context) throws IOException, InterruptedException {
552      targetBatchSize = context.getConfiguration()
553          .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
554      hasher = new ResultHasher();
555
556      TableSplit split = (TableSplit) context.getInputSplit();
557      hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
558    }
559
560    @Override
561    protected void map(ImmutableBytesWritable key, Result value, Context context)
562        throws IOException, InterruptedException {
563
564      if (currentRow == null || !currentRow.equals(key)) {
565        currentRow = new ImmutableBytesWritable(key); // not immutable
566
567        if (hasher.getBatchSize() >= targetBatchSize) {
568          hasher.finishBatch();
569          context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
570          hasher.startBatch(currentRow);
571        }
572      }
573
574      hasher.hashResult(value);
575    }
576
577    @Override
578    protected void cleanup(Context context) throws IOException, InterruptedException {
579      hasher.finishBatch();
580      context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
581    }
582  }
583
584  private void writeTempManifestFile() throws IOException {
585    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
586    FileSystem fs = tempManifestPath.getFileSystem(getConf());
587    tableHash.writePropertiesFile(fs, tempManifestPath);
588  }
589
590  private void completeManifest() throws IOException {
591    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
592    Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
593    FileSystem fs = tempManifestPath.getFileSystem(getConf());
594    fs.rename(tempManifestPath, manifestPath);
595  }
596
597  private static final int NUM_ARGS = 2;
598  private static void printUsage(final String errorMsg) {
599    if (errorMsg != null && errorMsg.length() > 0) {
600      System.err.println("ERROR: " + errorMsg);
601      System.err.println();
602    }
603    System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
604    System.err.println();
605    System.err.println("Options:");
606    System.err.println(" batchsize     the target amount of bytes to hash in each batch");
607    System.err.println("               rows are added to the batch until this size is reached");
608    System.err.println("               (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
609    System.err.println(" numhashfiles  the number of hash files to create");
610    System.err.println("               if set to fewer than number of regions then");
611    System.err.println("               the job will create this number of reducers");
612    System.err.println("               (defaults to 1/100 of regions -- at least 1)");
613    System.err.println(" startrow      the start row");
614    System.err.println(" stoprow       the stop row");
615    System.err.println(" starttime     beginning of the time range (unixtime in millis)");
616    System.err.println("               without endtime means from starttime to forever");
617    System.err.println(" endtime       end of the time range.  Ignored if no starttime specified.");
618    System.err.println(" scanbatch     scanner batch size to support intra row scans");
619    System.err.println(" versions      number of cell versions to include");
620    System.err.println(" families      comma-separated list of families to include");
621    System.err.println();
622    System.err.println("Args:");
623    System.err.println(" tablename     Name of the table to hash");
624    System.err.println(" outputpath    Filesystem path to put the output data");
625    System.err.println();
626    System.err.println("Examples:");
627    System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
628    System.err.println(" $ hbase " +
629        "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
630        + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
631        + " TestTable /hashes/testTable");
632  }
633
634  private boolean doCommandLine(final String[] args) {
635    if (args.length < NUM_ARGS) {
636      printUsage(null);
637      return false;
638    }
639    try {
640
641      tableHash.tableName = args[args.length-2];
642      destPath = new Path(args[args.length-1]);
643
644      for (int i = 0; i < args.length - NUM_ARGS; i++) {
645        String cmd = args[i];
646        if (cmd.equals("-h") || cmd.startsWith("--h")) {
647          printUsage(null);
648          return false;
649        }
650
651        final String batchSizeArgKey = "--batchsize=";
652        if (cmd.startsWith(batchSizeArgKey)) {
653          tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
654          continue;
655        }
656
657        final String numHashFilesArgKey = "--numhashfiles=";
658        if (cmd.startsWith(numHashFilesArgKey)) {
659          tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
660          continue;
661        }
662
663        final String startRowArgKey = "--startrow=";
664        if (cmd.startsWith(startRowArgKey)) {
665          tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
666          continue;
667        }
668
669        final String stopRowArgKey = "--stoprow=";
670        if (cmd.startsWith(stopRowArgKey)) {
671          tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
672          continue;
673        }
674
675        final String startTimeArgKey = "--starttime=";
676        if (cmd.startsWith(startTimeArgKey)) {
677          tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
678          continue;
679        }
680
681        final String endTimeArgKey = "--endtime=";
682        if (cmd.startsWith(endTimeArgKey)) {
683          tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
684          continue;
685        }
686
687        final String scanBatchArgKey = "--scanbatch=";
688        if (cmd.startsWith(scanBatchArgKey)) {
689          tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
690          continue;
691        }
692
693        final String versionsArgKey = "--versions=";
694        if (cmd.startsWith(versionsArgKey)) {
695          tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
696          continue;
697        }
698
699        final String familiesArgKey = "--families=";
700        if (cmd.startsWith(familiesArgKey)) {
701          tableHash.families = cmd.substring(familiesArgKey.length());
702          continue;
703        }
704
705        printUsage("Invalid argument '" + cmd + "'");
706        return false;
707      }
708      if ((tableHash.startTime != 0 || tableHash.endTime != 0)
709          && (tableHash.startTime >= tableHash.endTime)) {
710        printUsage("Invalid time range filter: starttime="
711            + tableHash.startTime + " >=  endtime=" + tableHash.endTime);
712        return false;
713      }
714
715    } catch (Exception e) {
716      e.printStackTrace();
717      printUsage("Can't start because " + e.getMessage());
718      return false;
719    }
720    return true;
721  }
722
723  /**
724   * Main entry point.
725   */
726  public static void main(String[] args) throws Exception {
727    int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
728    System.exit(ret);
729  }
730
731  @Override
732  public int run(String[] args) throws Exception {
733    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
734    if (!doCommandLine(otherArgs)) {
735      return 1;
736    }
737
738    Job job = createSubmittableJob(otherArgs);
739    writeTempManifestFile();
740    if (!job.waitForCompletion(true)) {
741      LOG.info("Map-reduce job failed!");
742      return 1;
743    }
744    completeManifest();
745    return 0;
746  }
747
748}