001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Iterator;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Mutation;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.ResultScanner;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.mapreduce.Counters;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
047import org.apache.hadoop.util.GenericOptionsParser;
048import org.apache.hadoop.util.Tool;
049import org.apache.hadoop.util.ToolRunner;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
055
056@InterfaceAudience.Private
057public class SyncTable extends Configured implements Tool {
058
059  private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class);
060
061  static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
062  static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
063  static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
064  static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
065  static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
066  static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
067
068  Path sourceHashDir;
069  String sourceTableName;
070  String targetTableName;
071
072  String sourceZkCluster;
073  String targetZkCluster;
074  boolean dryRun;
075
076  Counters counters;
077
078  public SyncTable(Configuration conf) {
079    super(conf);
080  }
081
082  public Job createSubmittableJob(String[] args) throws IOException {
083    FileSystem fs = sourceHashDir.getFileSystem(getConf());
084    if (!fs.exists(sourceHashDir)) {
085      throw new IOException("Source hash dir not found: " + sourceHashDir);
086    }
087
088    HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
089    LOG.info("Read source hash manifest: " + tableHash);
090    LOG.info("Read " + tableHash.partitions.size() + " partition keys");
091    if (!tableHash.tableName.equals(sourceTableName)) {
092      LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
093          + tableHash.tableName + " but job is reading from: " + sourceTableName);
094    }
095    if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
096      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
097          + " should be 1 more than the number of partition keys.  However, the manifest file "
098          + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
099          + " found in the partitions file is " + tableHash.partitions.size());
100    }
101
102    Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
103    int dataSubdirCount = 0;
104    for (FileStatus file : fs.listStatus(dataDir)) {
105      if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
106        dataSubdirCount++;
107      }
108    }
109
110    if (dataSubdirCount != tableHash.numHashFiles) {
111      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
112          + " should be 1 more than the number of partition keys.  However, the number of data dirs"
113          + " found is " + dataSubdirCount + " but the number of partition keys"
114          + " found in the partitions file is " + tableHash.partitions.size());
115    }
116
117    Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
118        "syncTable_" + sourceTableName + "-" + targetTableName));
119    Configuration jobConf = job.getConfiguration();
120    job.setJarByClass(HashTable.class);
121    jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
122    jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
123    jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
124    if (sourceZkCluster != null) {
125      jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
126    }
127    if (targetZkCluster != null) {
128      jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
129    }
130    jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
131
132    TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
133        SyncMapper.class, null, null, job);
134
135    job.setNumReduceTasks(0);
136
137    if (dryRun) {
138      job.setOutputFormatClass(NullOutputFormat.class);
139    } else {
140      // No reducers.  Just write straight to table.  Call initTableReducerJob
141      // because it sets up the TableOutputFormat.
142      TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
143          targetZkCluster, null, null);
144
145      // would be nice to add an option for bulk load instead
146    }
147
148    // Obtain an authentication token, for the specified cluster, on behalf of the current user
149    if (sourceZkCluster != null) {
150      Configuration peerConf =
151          HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster);
152      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
153    }
154    return job;
155  }
156
157  public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
158    Path sourceHashDir;
159
160    Connection sourceConnection;
161    Connection targetConnection;
162    Table sourceTable;
163    Table targetTable;
164    boolean dryRun;
165
166    HashTable.TableHash sourceTableHash;
167    HashTable.TableHash.Reader sourceHashReader;
168    ImmutableBytesWritable currentSourceHash;
169    ImmutableBytesWritable nextSourceKey;
170    HashTable.ResultHasher targetHasher;
171
172    Throwable mapperException;
173
174    public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
175      SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
176      MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
177
178    @Override
179    protected void setup(Context context) throws IOException {
180
181      Configuration conf = context.getConfiguration();
182      sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
183      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
184      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
185          TableOutputFormat.OUTPUT_CONF_PREFIX);
186      sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
187      targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
188      dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
189
190      sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
191      LOG.info("Read source hash manifest: " + sourceTableHash);
192      LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
193
194      TableSplit split = (TableSplit) context.getInputSplit();
195      ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
196
197      sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
198      findNextKeyHashPair();
199
200      // create a hasher, but don't start it right away
201      // instead, find the first hash batch at or after the start row
202      // and skip any rows that come before.  they will be caught by the previous task
203      targetHasher = new HashTable.ResultHasher();
204    }
205
206    private static Connection openConnection(Configuration conf, String zkClusterConfKey,
207                                             String configPrefix)
208      throws IOException {
209        String zkCluster = conf.get(zkClusterConfKey);
210        Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
211            zkCluster, configPrefix);
212        return ConnectionFactory.createConnection(clusterConf);
213    }
214
215    private static Table openTable(Connection connection, Configuration conf,
216        String tableNameConfKey) throws IOException {
217      return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
218    }
219
220    /**
221     * Attempt to read the next source key/hash pair.
222     * If there are no more, set nextSourceKey to null
223     */
224    private void findNextKeyHashPair() throws IOException {
225      boolean hasNext = sourceHashReader.next();
226      if (hasNext) {
227        nextSourceKey = sourceHashReader.getCurrentKey();
228      } else {
229        // no more keys - last hash goes to the end
230        nextSourceKey = null;
231      }
232    }
233
234    @Override
235    protected void map(ImmutableBytesWritable key, Result value, Context context)
236        throws IOException, InterruptedException {
237      try {
238        // first, finish any hash batches that end before the scanned row
239        while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
240          moveToNextBatch(context);
241        }
242
243        // next, add the scanned row (as long as we've reached the first batch)
244        if (targetHasher.isBatchStarted()) {
245          targetHasher.hashResult(value);
246        }
247      } catch (Throwable t) {
248        mapperException = t;
249        Throwables.propagateIfInstanceOf(t, IOException.class);
250        Throwables.propagateIfInstanceOf(t, InterruptedException.class);
251        Throwables.propagate(t);
252      }
253    }
254
255    /**
256     * If there is an open hash batch, complete it and sync if there are diffs.
257     * Start a new batch, and seek to read the
258     */
259    private void moveToNextBatch(Context context) throws IOException, InterruptedException {
260      if (targetHasher.isBatchStarted()) {
261        finishBatchAndCompareHashes(context);
262      }
263      targetHasher.startBatch(nextSourceKey);
264      currentSourceHash = sourceHashReader.getCurrentHash();
265
266      findNextKeyHashPair();
267    }
268
269    /**
270     * Finish the currently open hash batch.
271     * Compare the target hash to the given source hash.
272     * If they do not match, then sync the covered key range.
273     */
274    private void finishBatchAndCompareHashes(Context context)
275        throws IOException, InterruptedException {
276      targetHasher.finishBatch();
277      context.getCounter(Counter.BATCHES).increment(1);
278      if (targetHasher.getBatchSize() == 0) {
279        context.getCounter(Counter.EMPTY_BATCHES).increment(1);
280      }
281      ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
282      if (targetHash.equals(currentSourceHash)) {
283        context.getCounter(Counter.HASHES_MATCHED).increment(1);
284      } else {
285        context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
286
287        ImmutableBytesWritable stopRow = nextSourceKey == null
288                                          ? new ImmutableBytesWritable(sourceTableHash.stopRow)
289                                          : nextSourceKey;
290
291        if (LOG.isDebugEnabled()) {
292          LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey())
293              + " to " + toHex(stopRow)
294              + " sourceHash: " + toHex(currentSourceHash)
295              + " targetHash: " + toHex(targetHash));
296        }
297
298        syncRange(context, targetHasher.getBatchStartKey(), stopRow);
299      }
300    }
301    private static String toHex(ImmutableBytesWritable bytes) {
302      return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
303    }
304
305    private static final CellScanner EMPTY_CELL_SCANNER
306      = new CellScanner(Collections.<Result>emptyIterator());
307
308    /**
309     * Rescan the given range directly from the source and target tables.
310     * Count and log differences, and if this is not a dry run, output Puts and Deletes
311     * to make the target table match the source table for this range
312     */
313    private void syncRange(Context context, ImmutableBytesWritable startRow,
314        ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
315      Scan scan = sourceTableHash.initScan();
316      scan.setStartRow(startRow.copyBytes());
317      scan.setStopRow(stopRow.copyBytes());
318
319      ResultScanner sourceScanner = sourceTable.getScanner(scan);
320      CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
321
322      ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
323      CellScanner targetCells = new CellScanner(targetScanner.iterator());
324
325      boolean rangeMatched = true;
326      byte[] nextSourceRow = sourceCells.nextRow();
327      byte[] nextTargetRow = targetCells.nextRow();
328      while(nextSourceRow != null || nextTargetRow != null) {
329        boolean rowMatched;
330        int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
331        if (rowComparison < 0) {
332          if (LOG.isInfoEnabled()) {
333            LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
334          }
335          context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
336
337          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
338          nextSourceRow = sourceCells.nextRow();  // advance only source to next row
339        } else if (rowComparison > 0) {
340          if (LOG.isInfoEnabled()) {
341            LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
342          }
343          context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
344
345          rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
346          nextTargetRow = targetCells.nextRow();  // advance only target to next row
347        } else {
348          // current row is the same on both sides, compare cell by cell
349          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
350          nextSourceRow = sourceCells.nextRow();
351          nextTargetRow = targetCells.nextRow();
352        }
353
354        if (!rowMatched) {
355          rangeMatched = false;
356        }
357      }
358
359      sourceScanner.close();
360      targetScanner.close();
361
362      context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
363        .increment(1);
364    }
365
366    private static class CellScanner {
367      private final Iterator<Result> results;
368
369      private byte[] currentRow;
370      private Result currentRowResult;
371      private int nextCellInRow;
372
373      private Result nextRowResult;
374
375      public CellScanner(Iterator<Result> results) {
376        this.results = results;
377      }
378
379      /**
380       * Advance to the next row and return its row key.
381       * Returns null iff there are no more rows.
382       */
383      public byte[] nextRow() {
384        if (nextRowResult == null) {
385          // no cached row - check scanner for more
386          while (results.hasNext()) {
387            nextRowResult = results.next();
388            Cell nextCell = nextRowResult.rawCells()[0];
389            if (currentRow == null
390                || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
391                nextCell.getRowOffset(), nextCell.getRowLength())) {
392              // found next row
393              break;
394            } else {
395              // found another result from current row, keep scanning
396              nextRowResult = null;
397            }
398          }
399
400          if (nextRowResult == null) {
401            // end of data, no more rows
402            currentRowResult = null;
403            currentRow = null;
404            return null;
405          }
406        }
407
408        // advance to cached result for next row
409        currentRowResult = nextRowResult;
410        nextCellInRow = 0;
411        currentRow = currentRowResult.getRow();
412        nextRowResult = null;
413        return currentRow;
414      }
415
416      /**
417       * Returns the next Cell in the current row or null iff none remain.
418       */
419      public Cell nextCellInRow() {
420        if (currentRowResult == null) {
421          // nothing left in current row
422          return null;
423        }
424
425        Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
426        nextCellInRow++;
427        if (nextCellInRow == currentRowResult.size()) {
428          if (results.hasNext()) {
429            Result result = results.next();
430            Cell cell = result.rawCells()[0];
431            if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
432                cell.getRowOffset(), cell.getRowLength())) {
433              // result is part of current row
434              currentRowResult = result;
435              nextCellInRow = 0;
436            } else {
437              // result is part of next row, cache it
438              nextRowResult = result;
439              // current row is complete
440              currentRowResult = null;
441            }
442          } else {
443            // end of data
444            currentRowResult = null;
445          }
446        }
447        return nextCell;
448      }
449    }
450
451    /**
452     * Compare the cells for the given row from the source and target tables.
453     * Count and log any differences.
454     * If not a dry run, output a Put and/or Delete needed to sync the target table
455     * to match the source table.
456     */
457    private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
458        CellScanner targetCells) throws IOException, InterruptedException {
459      Put put = null;
460      Delete delete = null;
461      long matchingCells = 0;
462      boolean matchingRow = true;
463      Cell sourceCell = sourceCells.nextCellInRow();
464      Cell targetCell = targetCells.nextCellInRow();
465      while (sourceCell != null || targetCell != null) {
466
467        int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
468        if (cellKeyComparison < 0) {
469          if (LOG.isDebugEnabled()) {
470            LOG.debug("Target missing cell: " + sourceCell);
471          }
472          context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
473          matchingRow = false;
474
475          if (!dryRun) {
476            if (put == null) {
477              put = new Put(rowKey);
478            }
479            put.add(sourceCell);
480          }
481
482          sourceCell = sourceCells.nextCellInRow();
483        } else if (cellKeyComparison > 0) {
484          if (LOG.isDebugEnabled()) {
485            LOG.debug("Source missing cell: " + targetCell);
486          }
487          context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
488          matchingRow = false;
489
490          if (!dryRun) {
491            if (delete == null) {
492              delete = new Delete(rowKey);
493            }
494            // add a tombstone to exactly match the target cell that is missing on the source
495            delete.addColumn(CellUtil.cloneFamily(targetCell),
496                CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
497          }
498
499          targetCell = targetCells.nextCellInRow();
500        } else {
501          // the cell keys are equal, now check values
502          if (CellUtil.matchingValue(sourceCell, targetCell)) {
503            matchingCells++;
504          } else {
505            if (LOG.isDebugEnabled()) {
506              LOG.debug("Different values: ");
507              LOG.debug("  source cell: " + sourceCell
508                  + " value: " + Bytes.toHex(sourceCell.getValueArray(),
509                      sourceCell.getValueOffset(), sourceCell.getValueLength()));
510              LOG.debug("  target cell: " + targetCell
511                  + " value: " + Bytes.toHex(targetCell.getValueArray(),
512                      targetCell.getValueOffset(), targetCell.getValueLength()));
513            }
514            context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
515            matchingRow = false;
516
517            if (!dryRun) {
518              // overwrite target cell
519              if (put == null) {
520                put = new Put(rowKey);
521              }
522              put.add(sourceCell);
523            }
524          }
525          sourceCell = sourceCells.nextCellInRow();
526          targetCell = targetCells.nextCellInRow();
527        }
528
529        if (!dryRun && sourceTableHash.scanBatch > 0) {
530          if (put != null && put.size() >= sourceTableHash.scanBatch) {
531            context.write(new ImmutableBytesWritable(rowKey), put);
532            put = null;
533          }
534          if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
535            context.write(new ImmutableBytesWritable(rowKey), delete);
536            delete = null;
537          }
538        }
539      }
540
541      if (!dryRun) {
542        if (put != null) {
543          context.write(new ImmutableBytesWritable(rowKey), put);
544        }
545        if (delete != null) {
546          context.write(new ImmutableBytesWritable(rowKey), delete);
547        }
548      }
549
550      if (matchingCells > 0) {
551        context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
552      }
553      if (matchingRow) {
554        context.getCounter(Counter.MATCHINGROWS).increment(1);
555        return true;
556      } else {
557        context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
558        return false;
559      }
560    }
561
562    /**
563     * Compare row keys of the given Result objects.
564     * Nulls are after non-nulls
565     */
566    private static int compareRowKeys(byte[] r1, byte[] r2) {
567      if (r1 == null) {
568        return 1;  // source missing row
569      } else if (r2 == null) {
570        return -1; // target missing row
571      } else {
572        // Sync on no META tables only. We can directly do what CellComparator is doing inside.
573        // Never the call going to MetaCellComparator.
574        return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length);
575      }
576    }
577
578    /**
579     * Compare families, qualifiers, and timestamps of the given Cells.
580     * They are assumed to be of the same row.
581     * Nulls are after non-nulls.
582     */
583     private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
584      if (c1 == null) {
585        return 1; // source missing cell
586      }
587      if (c2 == null) {
588        return -1; // target missing cell
589      }
590
591      int result = CellComparator.getInstance().compareFamilies(c1, c2);
592      if (result != 0) {
593        return result;
594      }
595
596      result = CellComparator.getInstance().compareQualifiers(c1, c2);
597      if (result != 0) {
598        return result;
599      }
600
601      // note timestamp comparison is inverted - more recent cells first
602      return CellComparator.getInstance().compareTimestamps(c1, c2);
603    }
604
605    @Override
606    protected void cleanup(Context context)
607        throws IOException, InterruptedException {
608      if (mapperException == null) {
609        try {
610          finishRemainingHashRanges(context);
611        } catch (Throwable t) {
612          mapperException = t;
613        }
614      }
615
616      try {
617        sourceTable.close();
618        targetTable.close();
619        sourceConnection.close();
620        targetConnection.close();
621      } catch (Throwable t) {
622        if (mapperException == null) {
623          mapperException = t;
624        } else {
625          LOG.error("Suppressing exception from closing tables", t);
626        }
627      }
628
629      // propagate first exception
630      if (mapperException != null) {
631        Throwables.propagateIfInstanceOf(mapperException, IOException.class);
632        Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
633        Throwables.propagate(mapperException);
634      }
635    }
636
637    private void finishRemainingHashRanges(Context context) throws IOException,
638        InterruptedException {
639      TableSplit split = (TableSplit) context.getInputSplit();
640      byte[] splitEndRow = split.getEndRow();
641      boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
642
643      // if there are more hash batches that begin before the end of this split move to them
644      while (nextSourceKey != null
645          && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
646        moveToNextBatch(context);
647      }
648
649      if (targetHasher.isBatchStarted()) {
650        // need to complete the final open hash batch
651
652        if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
653              || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
654          // the open hash range continues past the end of this region
655          // add a scan to complete the current hash range
656          Scan scan = sourceTableHash.initScan();
657          scan.setStartRow(splitEndRow);
658          if (nextSourceKey == null) {
659            scan.setStopRow(sourceTableHash.stopRow);
660          } else {
661            scan.setStopRow(nextSourceKey.copyBytes());
662          }
663
664          ResultScanner targetScanner = null;
665          try {
666            targetScanner = targetTable.getScanner(scan);
667            for (Result row : targetScanner) {
668              targetHasher.hashResult(row);
669            }
670          } finally {
671            if (targetScanner != null) {
672              targetScanner.close();
673            }
674          }
675        } // else current batch ends exactly at split end row
676
677        finishBatchAndCompareHashes(context);
678      }
679    }
680  }
681
682  private static final int NUM_ARGS = 3;
683  private static void printUsage(final String errorMsg) {
684    if (errorMsg != null && errorMsg.length() > 0) {
685      System.err.println("ERROR: " + errorMsg);
686      System.err.println();
687    }
688    System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
689    System.err.println();
690    System.err.println("Options:");
691
692    System.err.println(" sourcezkcluster  ZK cluster key of the source table");
693    System.err.println("                  (defaults to cluster in classpath's config)");
694    System.err.println(" targetzkcluster  ZK cluster key of the target table");
695    System.err.println("                  (defaults to cluster in classpath's config)");
696    System.err.println(" dryrun           if true, output counters but no writes");
697    System.err.println("                  (defaults to false)");
698    System.err.println();
699    System.err.println("Args:");
700    System.err.println(" sourcehashdir    path to HashTable output dir for source table");
701    System.err.println("                  (see org.apache.hadoop.hbase.mapreduce.HashTable)");
702    System.err.println(" sourcetable      Name of the source table to sync from");
703    System.err.println(" targettable      Name of the target table to sync to");
704    System.err.println();
705    System.err.println("Examples:");
706    System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
707    System.err.println(" to a local target cluster:");
708    System.err.println(" $ hbase " +
709        "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
710        + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
711        + " hdfs://nn:9000/hashes/tableA tableA tableA");
712  }
713
714  private boolean doCommandLine(final String[] args) {
715    if (args.length < NUM_ARGS) {
716      printUsage(null);
717      return false;
718    }
719    try {
720      sourceHashDir = new Path(args[args.length - 3]);
721      sourceTableName = args[args.length - 2];
722      targetTableName = args[args.length - 1];
723
724      for (int i = 0; i < args.length - NUM_ARGS; i++) {
725        String cmd = args[i];
726        if (cmd.equals("-h") || cmd.startsWith("--h")) {
727          printUsage(null);
728          return false;
729        }
730
731        final String sourceZkClusterKey = "--sourcezkcluster=";
732        if (cmd.startsWith(sourceZkClusterKey)) {
733          sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
734          continue;
735        }
736
737        final String targetZkClusterKey = "--targetzkcluster=";
738        if (cmd.startsWith(targetZkClusterKey)) {
739          targetZkCluster = cmd.substring(targetZkClusterKey.length());
740          continue;
741        }
742
743        final String dryRunKey = "--dryrun=";
744        if (cmd.startsWith(dryRunKey)) {
745          dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
746          continue;
747        }
748
749        printUsage("Invalid argument '" + cmd + "'");
750        return false;
751      }
752
753
754    } catch (Exception e) {
755      e.printStackTrace();
756      printUsage("Can't start because " + e.getMessage());
757      return false;
758    }
759    return true;
760  }
761
762  /**
763   * Main entry point.
764   */
765  public static void main(String[] args) throws Exception {
766    int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
767    System.exit(ret);
768  }
769
770  @Override
771  public int run(String[] args) throws Exception {
772    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
773    if (!doCommandLine(otherArgs)) {
774      return 1;
775    }
776
777    Job job = createSubmittableJob(otherArgs);
778    if (!job.waitForCompletion(true)) {
779      LOG.info("Map-reduce job failed!");
780      return 1;
781    }
782    counters = job.getCounters();
783    return 0;
784  }
785
786}