001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.io.InputStreamReader;
022import java.io.OutputStreamWriter;
023import java.security.MessageDigest;
024import java.security.NoSuchAlgorithmException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Properties;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.io.MapFile;
046import org.apache.hadoop.io.NullWritable;
047import org.apache.hadoop.io.SequenceFile;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.mapreduce.Reducer;
050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
051import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
052import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
053import org.apache.hadoop.util.GenericOptionsParser;
054import org.apache.hadoop.util.Tool;
055import org.apache.hadoop.util.ToolRunner;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
061import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;
062
063@InterfaceAudience.Private
064public class HashTable extends Configured implements Tool {
065
066  private static final Logger LOG = LoggerFactory.getLogger(HashTable.class);
067
068  private static final int DEFAULT_BATCH_SIZE = 8000;
069
070  /**
071   * Default hash algorithm. Kept as MD5 so that manifests produced by older versions, which did not
072   * record an algorithm, remain readable.
073   */
074  static final String DEFAULT_HASH_ALGORITHM = "MD5";
075
076  private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
077  final static String HASH_ALGORITHM_CONF_KEY = "hash.algorithm";
078  final static String PARTITIONS_FILE_NAME = "partitions";
079  final static String MANIFEST_FILE_NAME = "manifest";
080  final static String HASH_DATA_DIR = "hashes";
081  final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
082  final static String IGNORE_TIMESTAMPS = "ignoreTimestamps";
083  private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
084
085  TableHash tableHash = new TableHash();
086  Path destPath;
087
088  public HashTable(Configuration conf) {
089    super(conf);
090  }
091
092  public static class TableHash {
093
094    Path hashDir;
095
096    String tableName;
097    String families = null;
098    long batchSize = DEFAULT_BATCH_SIZE;
099    int numHashFiles = 0;
100    byte[] startRow = HConstants.EMPTY_START_ROW;
101    byte[] stopRow = HConstants.EMPTY_END_ROW;
102    int scanBatch = 0;
103    int versions = -1;
104    long startTime = 0;
105    long endTime = 0;
106    boolean ignoreTimestamps;
107    boolean rawScan;
108    String hashAlgorithm = DEFAULT_HASH_ALGORITHM;
109
110    List<ImmutableBytesWritable> partitions;
111
112    public static TableHash read(Configuration conf, Path hashDir) throws IOException {
113      TableHash tableHash = new TableHash();
114      FileSystem fs = hashDir.getFileSystem(conf);
115      tableHash.hashDir = hashDir;
116      tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
117      tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
118      return tableHash;
119    }
120
121    void writePropertiesFile(FileSystem fs, Path path) throws IOException {
122      Properties p = new Properties();
123      p.setProperty("table", tableName);
124      if (families != null) {
125        p.setProperty("columnFamilies", families);
126      }
127      p.setProperty("targetBatchSize", Long.toString(batchSize));
128      p.setProperty("numHashFiles", Integer.toString(numHashFiles));
129      if (!isTableStartRow(startRow)) {
130        p.setProperty("startRowHex", Bytes.toHex(startRow));
131      }
132      if (!isTableEndRow(stopRow)) {
133        p.setProperty("stopRowHex", Bytes.toHex(stopRow));
134      }
135      if (scanBatch > 0) {
136        p.setProperty("scanBatch", Integer.toString(scanBatch));
137      }
138      if (versions >= 0) {
139        p.setProperty("versions", Integer.toString(versions));
140      }
141      if (startTime != 0) {
142        p.setProperty("startTimestamp", Long.toString(startTime));
143      }
144      if (endTime != 0) {
145        p.setProperty("endTimestamp", Long.toString(endTime));
146      }
147      p.setProperty("rawScan", Boolean.toString(rawScan));
148      p.setProperty("hashAlgorithm", hashAlgorithm);
149
150      try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
151        p.store(osw, null);
152      }
153    }
154
155    void readPropertiesFile(FileSystem fs, Path path) throws IOException {
156      Properties p = new Properties();
157      try (FSDataInputStream in = fs.open(path)) {
158        try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
159          p.load(isr);
160        }
161      }
162      tableName = p.getProperty("table");
163      families = p.getProperty("columnFamilies");
164      batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
165      numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
166
167      String startRowHex = p.getProperty("startRowHex");
168      if (startRowHex != null) {
169        startRow = Bytes.fromHex(startRowHex);
170      }
171      String stopRowHex = p.getProperty("stopRowHex");
172      if (stopRowHex != null) {
173        stopRow = Bytes.fromHex(stopRowHex);
174      }
175
176      String scanBatchString = p.getProperty("scanBatch");
177      if (scanBatchString != null) {
178        scanBatch = Integer.parseInt(scanBatchString);
179      }
180
181      String versionString = p.getProperty("versions");
182      if (versionString != null) {
183        versions = Integer.parseInt(versionString);
184      }
185
186      String rawScanString = p.getProperty("rawScan");
187      if (rawScanString != null) {
188        rawScan = Boolean.parseBoolean(rawScanString);
189      }
190
191      String startTimeString = p.getProperty("startTimestamp");
192      if (startTimeString != null) {
193        startTime = Long.parseLong(startTimeString);
194      }
195
196      String endTimeString = p.getProperty("endTimestamp");
197      if (endTimeString != null) {
198        endTime = Long.parseLong(endTimeString);
199      }
200
201      hashAlgorithm = p.getProperty("hashAlgorithm", DEFAULT_HASH_ALGORITHM);
202    }
203
204    Scan initScan() throws IOException {
205      Scan scan = new Scan();
206      scan.setCacheBlocks(false);
207      if (startTime != 0 || endTime != 0) {
208        scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
209      }
210      if (scanBatch > 0) {
211        scan.setBatch(scanBatch);
212      }
213      if (versions >= 0) {
214        scan.readVersions(versions);
215      }
216      if (!isTableStartRow(startRow)) {
217        scan.withStartRow(startRow);
218      }
219      if (!isTableEndRow(stopRow)) {
220        scan.withStopRow(stopRow);
221      }
222      if (families != null) {
223        for (String fam : families.split(",")) {
224          scan.addFamily(Bytes.toBytes(fam));
225        }
226      }
227      scan.setRaw(rawScan);
228
229      return scan;
230    }
231
232    /**
233     * Choose partitions between row ranges to hash to a single output file Selects region
234     * boundaries that fall within the scan range, and groups them into the desired number of
235     * partitions.
236     */
237    void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
238      List<byte[]> startKeys = new ArrayList<>();
239      for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
240        byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
241        byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
242
243        // if scan begins after this region, or starts before this region, then drop this region
244        // in other words:
245        // IF (scan begins before the end of this region
246        // AND scan ends before the start of this region)
247        // THEN include this region
248        if (
249          (isTableStartRow(startRow) || isTableEndRow(regionEndKey)
250            || Bytes.compareTo(startRow, regionEndKey) < 0)
251            && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
252              || Bytes.compareTo(stopRow, regionStartKey) > 0)
253        ) {
254          startKeys.add(regionStartKey);
255        }
256      }
257
258      int numRegions = startKeys.size();
259      if (numHashFiles == 0) {
260        numHashFiles = numRegions / 100;
261      }
262      if (numHashFiles == 0) {
263        numHashFiles = 1;
264      }
265      if (numHashFiles > numRegions) {
266        // can't partition within regions
267        numHashFiles = numRegions;
268      }
269
270      // choose a subset of start keys to group regions into ranges
271      partitions = new ArrayList<>(numHashFiles - 1);
272      // skip the first start key as it is not a partition between ranges.
273      for (long i = 1; i < numHashFiles; i++) {
274        int splitIndex = (int) (numRegions * i / numHashFiles);
275        partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
276      }
277    }
278
279    void writePartitionFile(Configuration conf, Path path) throws IOException {
280      FileSystem fs = path.getFileSystem(conf);
281      @SuppressWarnings("deprecation")
282      SequenceFile.Writer writer =
283        SequenceFile.createWriter(fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
284
285      for (int i = 0; i < partitions.size(); i++) {
286        writer.append(partitions.get(i), NullWritable.get());
287      }
288      writer.close();
289    }
290
291    private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
292      throws IOException {
293      @SuppressWarnings("deprecation")
294      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
295      ImmutableBytesWritable key = new ImmutableBytesWritable();
296      partitions = new ArrayList<>();
297      while (reader.next(key)) {
298        partitions.add(new ImmutableBytesWritable(key.copyBytes()));
299      }
300      reader.close();
301
302      if (!Ordering.natural().isOrdered(partitions)) {
303        throw new IOException("Partitions are not ordered!");
304      }
305    }
306
307    @Override
308    public String toString() {
309      StringBuilder sb = new StringBuilder();
310      sb.append("tableName=").append(tableName);
311      if (families != null) {
312        sb.append(", families=").append(families);
313      }
314      sb.append(", batchSize=").append(batchSize);
315      sb.append(", numHashFiles=").append(numHashFiles);
316      if (!isTableStartRow(startRow)) {
317        sb.append(", startRowHex=").append(Bytes.toHex(startRow));
318      }
319      if (!isTableEndRow(stopRow)) {
320        sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
321      }
322      if (scanBatch >= 0) {
323        sb.append(", scanBatch=").append(scanBatch);
324      }
325      if (versions >= 0) {
326        sb.append(", versions=").append(versions);
327      }
328      sb.append(", rawScan=").append(rawScan);
329      sb.append(", hashAlgorithm=").append(hashAlgorithm);
330      if (startTime != 0) {
331        sb.append("startTime=").append(startTime);
332      }
333      if (endTime != 0) {
334        sb.append("endTime=").append(endTime);
335      }
336      return sb.toString();
337    }
338
339    static String getDataFileName(int hashFileIndex) {
340      return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
341    }
342
343    /**
344     * Open a TableHash.Reader starting at the first hash at or after the given key.
345     */
346    public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
347      throws IOException {
348      return new Reader(conf, startKey);
349    }
350
351    public class Reader implements java.io.Closeable {
352      private final Configuration conf;
353
354      private int hashFileIndex;
355      private MapFile.Reader mapFileReader;
356
357      private boolean cachedNext;
358      private ImmutableBytesWritable key;
359      private ImmutableBytesWritable hash;
360
361      Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
362        this.conf = conf;
363        int partitionIndex = Collections.binarySearch(partitions, startKey);
364        if (partitionIndex >= 0) {
365          // if the key is equal to a partition, then go the file after that partition
366          hashFileIndex = partitionIndex + 1;
367        } else {
368          // if the key is between partitions, then go to the file between those partitions
369          hashFileIndex = -1 - partitionIndex;
370        }
371        openHashFile();
372
373        // MapFile's don't make it easy to seek() so that the subsequent next() returns
374        // the desired key/value pair. So we cache it for the first call of next().
375        hash = new ImmutableBytesWritable();
376        key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
377        if (key == null) {
378          cachedNext = false;
379          hash = null;
380        } else {
381          cachedNext = true;
382        }
383      }
384
385      /**
386       * Read the next key/hash pair. Returns true if such a pair exists and false when at the end
387       * of the data.
388       */
389      public boolean next() throws IOException {
390        if (cachedNext) {
391          cachedNext = false;
392          return true;
393        }
394        key = new ImmutableBytesWritable();
395        hash = new ImmutableBytesWritable();
396        while (true) {
397          boolean hasNext = mapFileReader.next(key, hash);
398          if (hasNext) {
399            return true;
400          }
401          hashFileIndex++;
402          if (hashFileIndex < TableHash.this.numHashFiles) {
403            mapFileReader.close();
404            openHashFile();
405          } else {
406            key = null;
407            hash = null;
408            return false;
409          }
410        }
411      }
412
413      /**
414       * Get the current key
415       * @return the current key or null if there is no current key
416       */
417      public ImmutableBytesWritable getCurrentKey() {
418        return key;
419      }
420
421      /**
422       * Get the current hash
423       * @return the current hash or null if there is no current hash
424       */
425      public ImmutableBytesWritable getCurrentHash() {
426        return hash;
427      }
428
429      private void openHashFile() throws IOException {
430        if (mapFileReader != null) {
431          mapFileReader.close();
432        }
433        Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
434        Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
435        mapFileReader = new MapFile.Reader(dataFile, conf);
436      }
437
438      @Override
439      public void close() throws IOException {
440        mapFileReader.close();
441      }
442    }
443  }
444
445  static boolean isTableStartRow(byte[] row) {
446    return Bytes.equals(HConstants.EMPTY_START_ROW, row);
447  }
448
449  static boolean isTableEndRow(byte[] row) {
450    return Bytes.equals(HConstants.EMPTY_END_ROW, row);
451  }
452
453  public Job createSubmittableJob(String[] args) throws IOException {
454    Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
455    generatePartitions(partitionsPath);
456
457    Job job = Job.getInstance(getConf(),
458      getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
459    Configuration jobConf = job.getConfiguration();
460    jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
461    jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
462    jobConf.set(HASH_ALGORITHM_CONF_KEY, tableHash.hashAlgorithm);
463    job.setJarByClass(HashTable.class);
464
465    TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
466      HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
467
468    // use a TotalOrderPartitioner and reducers to group region output into hash files
469    job.setPartitionerClass(TotalOrderPartitioner.class);
470    TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
471    job.setReducerClass(Reducer.class); // identity reducer
472    job.setNumReduceTasks(tableHash.numHashFiles);
473    job.setOutputKeyClass(ImmutableBytesWritable.class);
474    job.setOutputValueClass(ImmutableBytesWritable.class);
475    job.setOutputFormatClass(MapFileOutputFormat.class);
476    FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
477
478    return job;
479  }
480
481  private void generatePartitions(Path partitionsPath) throws IOException {
482    Connection connection = ConnectionFactory.createConnection(getConf());
483    Pair<byte[][], byte[][]> regionKeys =
484      connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
485    connection.close();
486
487    tableHash.selectPartitions(regionKeys);
488    LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
489
490    tableHash.writePartitionFile(getConf(), partitionsPath);
491  }
492
493  static class ResultHasher {
494    private MessageDigest digest;
495
496    private boolean batchStarted = false;
497    private ImmutableBytesWritable batchStartKey;
498    private ImmutableBytesWritable batchHash;
499    private long batchSize = 0;
500    boolean ignoreTimestamps;
501
502    public ResultHasher(String algorithm) {
503      try {
504        digest = MessageDigest.getInstance(algorithm);
505      } catch (NoSuchAlgorithmException e) {
506        throw new IllegalArgumentException("Unsupported hash algorithm: " + algorithm, e);
507      }
508    }
509
510    public void startBatch(ImmutableBytesWritable row) {
511      if (batchStarted) {
512        throw new RuntimeException("Cannot start new batch without finishing existing one.");
513      }
514      batchStarted = true;
515      batchSize = 0;
516      batchStartKey = row;
517      batchHash = null;
518    }
519
520    public void hashResult(Result result) {
521      if (!batchStarted) {
522        throw new RuntimeException("Cannot add to batch that has not been started.");
523      }
524      for (Cell cell : result.rawCells()) {
525        int rowLength = cell.getRowLength();
526        int familyLength = cell.getFamilyLength();
527        int qualifierLength = cell.getQualifierLength();
528        int valueLength = cell.getValueLength();
529        digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
530        digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
531        digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
532
533        if (!ignoreTimestamps) {
534          long ts = cell.getTimestamp();
535          for (int i = 8; i > 0; i--) {
536            digest.update((byte) ts);
537            ts >>>= 8;
538          }
539        }
540        digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
541
542        batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
543      }
544    }
545
546    public void finishBatch() {
547      if (!batchStarted) {
548        throw new RuntimeException("Cannot finish batch that has not started.");
549      }
550      batchStarted = false;
551      batchHash = new ImmutableBytesWritable(digest.digest());
552    }
553
554    public boolean isBatchStarted() {
555      return batchStarted;
556    }
557
558    public ImmutableBytesWritable getBatchStartKey() {
559      return batchStartKey;
560    }
561
562    public ImmutableBytesWritable getBatchHash() {
563      return batchHash;
564    }
565
566    public long getBatchSize() {
567      return batchSize;
568    }
569  }
570
571  public static class HashMapper
572    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
573
574    private ResultHasher hasher;
575    private long targetBatchSize;
576
577    private ImmutableBytesWritable currentRow;
578
579    @Override
580    protected void setup(Context context) throws IOException, InterruptedException {
581      Configuration conf = context.getConfiguration();
582      targetBatchSize = conf.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
583      hasher = new ResultHasher(conf.get(HASH_ALGORITHM_CONF_KEY, DEFAULT_HASH_ALGORITHM));
584      hasher.ignoreTimestamps = conf.getBoolean(IGNORE_TIMESTAMPS, false);
585      TableSplit split = (TableSplit) context.getInputSplit();
586      hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
587    }
588
589    @Override
590    protected void map(ImmutableBytesWritable key, Result value, Context context)
591      throws IOException, InterruptedException {
592
593      if (currentRow == null || !currentRow.equals(key)) {
594        currentRow = new ImmutableBytesWritable(key); // not immutable
595
596        if (hasher.getBatchSize() >= targetBatchSize) {
597          hasher.finishBatch();
598          context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
599          hasher.startBatch(currentRow);
600        }
601      }
602
603      hasher.hashResult(value);
604    }
605
606    @Override
607    protected void cleanup(Context context) throws IOException, InterruptedException {
608      hasher.finishBatch();
609      context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
610    }
611  }
612
613  private void writeTempManifestFile() throws IOException {
614    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
615    FileSystem fs = tempManifestPath.getFileSystem(getConf());
616    tableHash.writePropertiesFile(fs, tempManifestPath);
617  }
618
619  private void completeManifest() throws IOException {
620    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
621    Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
622    FileSystem fs = tempManifestPath.getFileSystem(getConf());
623    fs.rename(tempManifestPath, manifestPath);
624  }
625
626  private static final int NUM_ARGS = 2;
627
628  private static void printUsage(final String errorMsg) {
629    if (errorMsg != null && errorMsg.length() > 0) {
630      System.err.println("ERROR: " + errorMsg);
631      System.err.println();
632    }
633    System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
634    System.err.println();
635    System.err.println("Options:");
636    System.err.println(" batchsize         the target amount of bytes to hash in each batch");
637    System.err.println("                   rows are added to the batch until this size is reached");
638    System.err.println("                   (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
639    System.err.println(" numhashfiles      the number of hash files to create");
640    System.err.println("                   if set to fewer than number of regions then");
641    System.err.println("                   the job will create this number of reducers");
642    System.err.println("                   (defaults to 1/100 of regions -- at least 1)");
643    System.err.println(" startrow          the start row");
644    System.err.println(" stoprow           the stop row");
645    System.err.println(" starttime         beginning of the time range (unixtime in millis)");
646    System.err.println("                   without endtime means from starttime to forever");
647    System.err.println(" endtime           end of the time range.");
648    System.err.println("                   Ignored if no starttime specified.");
649    System.err.println(" scanbatch         scanner batch size to support intra row scans");
650    System.err.println(" versions          number of cell versions to include");
651    System.err.println(" rawScan           performs a raw scan (false if omitted)");
652    System.err.println(" families          comma-separated list of families to include");
653    System.err.println(" ignoreTimestamps  if true, ignores cell timestamps");
654    System.err.println("                   when calculating hashes");
655    System.err.println(" hashAlgorithm     MessageDigest algorithm to use for batch hashes");
656    System.err.println("                   examples: MD5, SHA-256, SHA-384, SHA-512");
657    System.err.println("                   (defaults to " + DEFAULT_HASH_ALGORITHM + ")");
658    System.err.println();
659    System.err.println("Args:");
660    System.err.println(" tablename     Name of the table to hash");
661    System.err.println(" outputpath    Filesystem path to put the output data");
662    System.err.println();
663    System.err.println("Examples:");
664    System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
665    System.err.println(" $ hbase "
666      + "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
667      + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
668      + " TestTable /hashes/testTable");
669  }
670
671  private boolean doCommandLine(final String[] args) {
672    if (args.length < NUM_ARGS) {
673      printUsage(null);
674      return false;
675    }
676    try {
677
678      tableHash.tableName = args[args.length - 2];
679      destPath = new Path(args[args.length - 1]);
680
681      for (int i = 0; i < args.length - NUM_ARGS; i++) {
682        String cmd = args[i];
683        if (cmd.equals("-h") || cmd.equals("--help")) {
684          printUsage(null);
685          return false;
686        }
687
688        final String batchSizeArgKey = "--batchsize=";
689        if (cmd.startsWith(batchSizeArgKey)) {
690          tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
691          continue;
692        }
693
694        final String numHashFilesArgKey = "--numhashfiles=";
695        if (cmd.startsWith(numHashFilesArgKey)) {
696          tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
697          continue;
698        }
699
700        final String startRowArgKey = "--startrow=";
701        if (cmd.startsWith(startRowArgKey)) {
702          tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
703          continue;
704        }
705
706        final String stopRowArgKey = "--stoprow=";
707        if (cmd.startsWith(stopRowArgKey)) {
708          tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
709          continue;
710        }
711
712        final String startTimeArgKey = "--starttime=";
713        if (cmd.startsWith(startTimeArgKey)) {
714          tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
715          continue;
716        }
717
718        final String endTimeArgKey = "--endtime=";
719        if (cmd.startsWith(endTimeArgKey)) {
720          tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
721          continue;
722        }
723
724        final String scanBatchArgKey = "--scanbatch=";
725        if (cmd.startsWith(scanBatchArgKey)) {
726          tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
727          continue;
728        }
729
730        final String versionsArgKey = "--versions=";
731        if (cmd.startsWith(versionsArgKey)) {
732          tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
733          continue;
734        }
735
736        final String rawScanArgKey = "--rawScan=";
737        if (cmd.startsWith(rawScanArgKey)) {
738          tableHash.rawScan = Boolean.parseBoolean(cmd.substring(rawScanArgKey.length()));
739          continue;
740        }
741
742        final String familiesArgKey = "--families=";
743        if (cmd.startsWith(familiesArgKey)) {
744          tableHash.families = cmd.substring(familiesArgKey.length());
745          continue;
746        }
747
748        final String ignoreTimestampsKey = "--ignoreTimestamps=";
749        if (cmd.startsWith(ignoreTimestampsKey)) {
750          tableHash.ignoreTimestamps =
751            Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
752          continue;
753        }
754
755        final String hashAlgorithmKey = "--hashAlgorithm=";
756        if (cmd.startsWith(hashAlgorithmKey)) {
757          tableHash.hashAlgorithm = cmd.substring(hashAlgorithmKey.length());
758          continue;
759        }
760
761        printUsage("Invalid argument '" + cmd + "'");
762        return false;
763      }
764      if (
765        (tableHash.startTime != 0 || tableHash.endTime != 0)
766          && (tableHash.startTime >= tableHash.endTime)
767      ) {
768        printUsage("Invalid time range filter: starttime=" + tableHash.startTime + " >=  endtime="
769          + tableHash.endTime);
770        return false;
771      }
772
773      try {
774        MessageDigest.getInstance(tableHash.hashAlgorithm);
775      } catch (NoSuchAlgorithmException e) {
776        printUsage("Unsupported hash algorithm: " + tableHash.hashAlgorithm);
777        return false;
778      }
779
780    } catch (Exception e) {
781      LOG.error("Failed to parse commandLine arguments", e);
782      printUsage("Can't start because " + e.getMessage());
783      return false;
784    }
785    return true;
786  }
787
788  /**
789   * Main entry point.
790   */
791  public static void main(String[] args) throws Exception {
792    int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
793    System.exit(ret);
794  }
795
796  @Override
797  public int run(String[] args) throws Exception {
798    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
799    if (!doCommandLine(otherArgs)) {
800      return 1;
801    }
802
803    Job job = createSubmittableJob(otherArgs);
804    writeTempManifestFile();
805    if (!job.waitForCompletion(true)) {
806      LOG.info("Map-reduce job failed!");
807      return 1;
808    }
809    completeManifest();
810    return 0;
811  }
812
813}