001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Iterator;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellBuilderFactory;
030import org.apache.hadoop.hbase.CellBuilderType;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Delete;
038import org.apache.hadoop.hbase.client.Mutation;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.mapreduce.Counters;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
050import org.apache.hadoop.mapreduce.security.TokenCache;
051import org.apache.hadoop.util.GenericOptionsParser;
052import org.apache.hadoop.util.Tool;
053import org.apache.hadoop.util.ToolRunner;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
059
060@InterfaceAudience.Private
061public class SyncTable extends Configured implements Tool {
062
063  private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class);
064
065  static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
066  static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
067  static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
068  static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
069  static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
070  static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
071  static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
072  static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
073  static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps";
074
075  Path sourceHashDir;
076  String sourceTableName;
077  String targetTableName;
078
079  String sourceZkCluster;
080  String targetZkCluster;
081  boolean dryRun;
082  boolean doDeletes = true;
083  boolean doPuts = true;
084  boolean ignoreTimestamps;
085
086  Counters counters;
087
088  public SyncTable(Configuration conf) {
089    super(conf);
090  }
091
092  private void initCredentialsForHBase(String zookeeper, Job job) throws IOException {
093    Configuration peerConf =
094      HBaseConfiguration.createClusterConf(job.getConfiguration(), zookeeper);
095    if ("kerberos".equalsIgnoreCase(peerConf.get("hbase.security.authentication"))) {
096      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
097    }
098  }
099
100  public Job createSubmittableJob(String[] args) throws IOException {
101    FileSystem fs = sourceHashDir.getFileSystem(getConf());
102    if (!fs.exists(sourceHashDir)) {
103      throw new IOException("Source hash dir not found: " + sourceHashDir);
104    }
105
106    Job job = Job.getInstance(getConf(),
107      getConf().get("mapreduce.job.name", "syncTable_" + sourceTableName + "-" + targetTableName));
108    Configuration jobConf = job.getConfiguration();
109    if ("kerberos".equalsIgnoreCase(jobConf.get("hadoop.security.authentication"))) {
110      TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { sourceHashDir },
111        getConf());
112    }
113
114    HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
115    LOG.info("Read source hash manifest: " + tableHash);
116    LOG.info("Read " + tableHash.partitions.size() + " partition keys");
117    if (!tableHash.tableName.equals(sourceTableName)) {
118      LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
119        + tableHash.tableName + " but job is reading from: " + sourceTableName);
120    }
121    if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
122      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
123        + " should be 1 more than the number of partition keys.  However, the manifest file "
124        + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
125        + " found in the partitions file is " + tableHash.partitions.size());
126    }
127
128    Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
129    int dataSubdirCount = 0;
130    for (FileStatus file : fs.listStatus(dataDir)) {
131      if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
132        dataSubdirCount++;
133      }
134    }
135
136    if (dataSubdirCount != tableHash.numHashFiles) {
137      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
138        + " should be 1 more than the number of partition keys.  However, the number of data dirs"
139        + " found is " + dataSubdirCount + " but the number of partition keys"
140        + " found in the partitions file is " + tableHash.partitions.size());
141    }
142
143    job.setJarByClass(HashTable.class);
144    jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
145    jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
146    jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
147    if (sourceZkCluster != null) {
148      jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
149      initCredentialsForHBase(sourceZkCluster, job);
150    }
151    if (targetZkCluster != null) {
152      jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
153      initCredentialsForHBase(targetZkCluster, job);
154    }
155    jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
156    jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
157    jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
158    jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps);
159
160    TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), SyncMapper.class,
161      null, null, job);
162
163    job.setNumReduceTasks(0);
164
165    if (dryRun) {
166      job.setOutputFormatClass(NullOutputFormat.class);
167    } else {
168      // No reducers. Just write straight to table. Call initTableReducerJob
169      // because it sets up the TableOutputFormat.
170      TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster,
171        null, null);
172
173      // would be nice to add an option for bulk load instead
174    }
175
176    // Obtain an authentication token, for the specified cluster, on behalf of the current user
177    if (sourceZkCluster != null) {
178      Configuration peerConf =
179        HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster);
180      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
181    }
182    return job;
183  }
184
185  public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
186    Path sourceHashDir;
187
188    Connection sourceConnection;
189    Connection targetConnection;
190    Table sourceTable;
191    Table targetTable;
192    boolean dryRun;
193    boolean doDeletes = true;
194    boolean doPuts = true;
195    boolean ignoreTimestamp;
196
197    HashTable.TableHash sourceTableHash;
198    HashTable.TableHash.Reader sourceHashReader;
199    ImmutableBytesWritable currentSourceHash;
200    ImmutableBytesWritable nextSourceKey;
201    HashTable.ResultHasher targetHasher;
202
203    Throwable mapperException;
204
205    public static enum Counter {
206      BATCHES,
207      HASHES_MATCHED,
208      HASHES_NOT_MATCHED,
209      SOURCEMISSINGROWS,
210      SOURCEMISSINGCELLS,
211      TARGETMISSINGROWS,
212      TARGETMISSINGCELLS,
213      ROWSWITHDIFFS,
214      DIFFERENTCELLVALUES,
215      MATCHINGROWS,
216      MATCHINGCELLS,
217      EMPTY_BATCHES,
218      RANGESMATCHED,
219      RANGESNOTMATCHED
220    };
221
222    @Override
223    protected void setup(Context context) throws IOException {
224
225      Configuration conf = context.getConfiguration();
226      sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
227      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
228      targetConnection =
229        openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, TableOutputFormat.OUTPUT_CONF_PREFIX);
230      sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
231      targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
232      dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
233      doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
234      doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
235      ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false);
236
237      sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
238      LOG.info("Read source hash manifest: " + sourceTableHash);
239      LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
240
241      TableSplit split = (TableSplit) context.getInputSplit();
242      ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
243
244      sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
245      findNextKeyHashPair();
246
247      // create a hasher, but don't start it right away
248      // instead, find the first hash batch at or after the start row
249      // and skip any rows that come before. they will be caught by the previous task
250      targetHasher = new HashTable.ResultHasher();
251      targetHasher.ignoreTimestamps = ignoreTimestamp;
252    }
253
254    private static Connection openConnection(Configuration conf, String zkClusterConfKey,
255      String configPrefix) throws IOException {
256      String zkCluster = conf.get(zkClusterConfKey);
257      Configuration clusterConf =
258        HBaseConfiguration.createClusterConf(conf, zkCluster, configPrefix);
259      return ConnectionFactory.createConnection(clusterConf);
260    }
261
262    private static Table openTable(Connection connection, Configuration conf,
263      String tableNameConfKey) throws IOException {
264      return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
265    }
266
267    /**
268     * Attempt to read the next source key/hash pair. If there are no more, set nextSourceKey to
269     * null
270     */
271    private void findNextKeyHashPair() throws IOException {
272      boolean hasNext = sourceHashReader.next();
273      if (hasNext) {
274        nextSourceKey = sourceHashReader.getCurrentKey();
275      } else {
276        // no more keys - last hash goes to the end
277        nextSourceKey = null;
278      }
279    }
280
281    @Override
282    protected void map(ImmutableBytesWritable key, Result value, Context context)
283      throws IOException, InterruptedException {
284      try {
285        // first, finish any hash batches that end before the scanned row
286        while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
287          moveToNextBatch(context);
288        }
289
290        // next, add the scanned row (as long as we've reached the first batch)
291        if (targetHasher.isBatchStarted()) {
292          targetHasher.hashResult(value);
293        }
294      } catch (Throwable t) {
295        mapperException = t;
296        Throwables.propagateIfInstanceOf(t, IOException.class);
297        Throwables.propagateIfInstanceOf(t, InterruptedException.class);
298        Throwables.propagate(t);
299      }
300    }
301
302    /**
303     * If there is an open hash batch, complete it and sync if there are diffs. Start a new batch,
304     * and seek to read the
305     */
306    private void moveToNextBatch(Context context) throws IOException, InterruptedException {
307      if (targetHasher.isBatchStarted()) {
308        finishBatchAndCompareHashes(context);
309      }
310      targetHasher.startBatch(nextSourceKey);
311      currentSourceHash = sourceHashReader.getCurrentHash();
312
313      findNextKeyHashPair();
314    }
315
316    /**
317     * Finish the currently open hash batch. Compare the target hash to the given source hash. If
318     * they do not match, then sync the covered key range.
319     */
320    private void finishBatchAndCompareHashes(Context context)
321      throws IOException, InterruptedException {
322      targetHasher.finishBatch();
323      context.getCounter(Counter.BATCHES).increment(1);
324      if (targetHasher.getBatchSize() == 0) {
325        context.getCounter(Counter.EMPTY_BATCHES).increment(1);
326      }
327      ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
328      if (targetHash.equals(currentSourceHash)) {
329        context.getCounter(Counter.HASHES_MATCHED).increment(1);
330      } else {
331        context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
332
333        ImmutableBytesWritable stopRow = nextSourceKey == null
334          ? new ImmutableBytesWritable(sourceTableHash.stopRow)
335          : nextSourceKey;
336
337        if (LOG.isDebugEnabled()) {
338          LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey()) + " to "
339            + toHex(stopRow) + " sourceHash: " + toHex(currentSourceHash) + " targetHash: "
340            + toHex(targetHash));
341        }
342
343        syncRange(context, targetHasher.getBatchStartKey(), stopRow);
344      }
345    }
346
347    private static String toHex(ImmutableBytesWritable bytes) {
348      return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
349    }
350
351    private static final CellScanner EMPTY_CELL_SCANNER =
352      new CellScanner(Collections.<Result> emptyIterator());
353
354    /**
355     * Rescan the given range directly from the source and target tables. Count and log differences,
356     * and if this is not a dry run, output Puts and Deletes to make the target table match the
357     * source table for this range
358     */
359    private void syncRange(Context context, ImmutableBytesWritable startRow,
360      ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
361      Scan scan = sourceTableHash.initScan();
362      scan.setStartRow(startRow.copyBytes());
363      scan.setStopRow(stopRow.copyBytes());
364
365      ResultScanner sourceScanner = sourceTable.getScanner(scan);
366      CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
367
368      ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
369      CellScanner targetCells = new CellScanner(targetScanner.iterator());
370
371      boolean rangeMatched = true;
372      byte[] nextSourceRow = sourceCells.nextRow();
373      byte[] nextTargetRow = targetCells.nextRow();
374      while (nextSourceRow != null || nextTargetRow != null) {
375        boolean rowMatched;
376        int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
377        if (rowComparison < 0) {
378          if (LOG.isDebugEnabled()) {
379            LOG.debug("Target missing row: " + Bytes.toString(nextSourceRow));
380          }
381          context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
382
383          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
384          nextSourceRow = sourceCells.nextRow(); // advance only source to next row
385        } else if (rowComparison > 0) {
386          if (LOG.isDebugEnabled()) {
387            LOG.debug("Source missing row: " + Bytes.toString(nextTargetRow));
388          }
389          context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
390
391          rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
392          nextTargetRow = targetCells.nextRow(); // advance only target to next row
393        } else {
394          // current row is the same on both sides, compare cell by cell
395          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
396          nextSourceRow = sourceCells.nextRow();
397          nextTargetRow = targetCells.nextRow();
398        }
399
400        if (!rowMatched) {
401          rangeMatched = false;
402        }
403      }
404
405      sourceScanner.close();
406      targetScanner.close();
407
408      context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
409        .increment(1);
410    }
411
412    private static class CellScanner {
413      private final Iterator<Result> results;
414
415      private byte[] currentRow;
416      private Result currentRowResult;
417      private int nextCellInRow;
418
419      private Result nextRowResult;
420
421      public CellScanner(Iterator<Result> results) {
422        this.results = results;
423      }
424
425      /**
426       * Advance to the next row and return its row key. Returns null iff there are no more rows.
427       */
428      public byte[] nextRow() {
429        if (nextRowResult == null) {
430          // no cached row - check scanner for more
431          while (results.hasNext()) {
432            nextRowResult = results.next();
433            Cell nextCell = nextRowResult.rawCells()[0];
434            if (
435              currentRow == null || !Bytes.equals(currentRow, 0, currentRow.length,
436                nextCell.getRowArray(), nextCell.getRowOffset(), nextCell.getRowLength())
437            ) {
438              // found next row
439              break;
440            } else {
441              // found another result from current row, keep scanning
442              nextRowResult = null;
443            }
444          }
445
446          if (nextRowResult == null) {
447            // end of data, no more rows
448            currentRowResult = null;
449            currentRow = null;
450            return null;
451          }
452        }
453
454        // advance to cached result for next row
455        currentRowResult = nextRowResult;
456        nextCellInRow = 0;
457        currentRow = currentRowResult.getRow();
458        nextRowResult = null;
459        return currentRow;
460      }
461
462      /**
463       * Returns the next Cell in the current row or null iff none remain.
464       */
465      public Cell nextCellInRow() {
466        if (currentRowResult == null) {
467          // nothing left in current row
468          return null;
469        }
470
471        Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
472        nextCellInRow++;
473        if (nextCellInRow == currentRowResult.size()) {
474          if (results.hasNext()) {
475            Result result = results.next();
476            Cell cell = result.rawCells()[0];
477            if (
478              Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
479                cell.getRowOffset(), cell.getRowLength())
480            ) {
481              // result is part of current row
482              currentRowResult = result;
483              nextCellInRow = 0;
484            } else {
485              // result is part of next row, cache it
486              nextRowResult = result;
487              // current row is complete
488              currentRowResult = null;
489            }
490          } else {
491            // end of data
492            currentRowResult = null;
493          }
494        }
495        return nextCell;
496      }
497    }
498
499    private Cell checkAndResetTimestamp(Cell sourceCell) {
500      if (ignoreTimestamp) {
501        sourceCell =
502          CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(sourceCell.getType())
503            .setRow(sourceCell.getRowArray(), sourceCell.getRowOffset(), sourceCell.getRowLength())
504            .setFamily(sourceCell.getFamilyArray(), sourceCell.getFamilyOffset(),
505              sourceCell.getFamilyLength())
506            .setQualifier(sourceCell.getQualifierArray(), sourceCell.getQualifierOffset(),
507              sourceCell.getQualifierLength())
508            .setTimestamp(EnvironmentEdgeManager.currentTime()).setValue(sourceCell.getValueArray(),
509              sourceCell.getValueOffset(), sourceCell.getValueLength())
510            .build();
511      }
512      return sourceCell;
513    }
514
515    /**
516     * Compare the cells for the given row from the source and target tables. Count and log any
517     * differences. If not a dry run, output a Put and/or Delete needed to sync the target table to
518     * match the source table.
519     */
520    private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
521      CellScanner targetCells) throws IOException, InterruptedException {
522      Put put = null;
523      Delete delete = null;
524      long matchingCells = 0;
525      boolean matchingRow = true;
526      Cell sourceCell = sourceCells.nextCellInRow();
527      Cell targetCell = targetCells.nextCellInRow();
528      while (sourceCell != null || targetCell != null) {
529
530        int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
531        if (cellKeyComparison < 0) {
532          if (LOG.isDebugEnabled()) {
533            LOG.debug("Target missing cell: " + sourceCell);
534          }
535          context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
536          matchingRow = false;
537
538          if (!dryRun && doPuts) {
539            if (put == null) {
540              put = new Put(rowKey);
541            }
542            sourceCell = checkAndResetTimestamp(sourceCell);
543            put.add(sourceCell);
544          }
545
546          sourceCell = sourceCells.nextCellInRow();
547        } else if (cellKeyComparison > 0) {
548          if (LOG.isDebugEnabled()) {
549            LOG.debug("Source missing cell: " + targetCell);
550          }
551          context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
552          matchingRow = false;
553
554          if (!dryRun && doDeletes) {
555            if (delete == null) {
556              delete = new Delete(rowKey);
557            }
558            // add a tombstone to exactly match the target cell that is missing on the source
559            delete.addColumn(CellUtil.cloneFamily(targetCell), CellUtil.cloneQualifier(targetCell),
560              targetCell.getTimestamp());
561          }
562
563          targetCell = targetCells.nextCellInRow();
564        } else {
565          // the cell keys are equal, now check values
566          if (CellUtil.matchingValue(sourceCell, targetCell)) {
567            matchingCells++;
568          } else {
569            if (LOG.isDebugEnabled()) {
570              LOG.debug("Different values: ");
571              LOG.debug("  source cell: " + sourceCell + " value: "
572                + Bytes.toString(sourceCell.getValueArray(), sourceCell.getValueOffset(),
573                  sourceCell.getValueLength()));
574              LOG.debug("  target cell: " + targetCell + " value: "
575                + Bytes.toString(targetCell.getValueArray(), targetCell.getValueOffset(),
576                  targetCell.getValueLength()));
577            }
578            context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
579            matchingRow = false;
580
581            if (!dryRun && doPuts) {
582              // overwrite target cell
583              if (put == null) {
584                put = new Put(rowKey);
585              }
586              sourceCell = checkAndResetTimestamp(sourceCell);
587              put.add(sourceCell);
588            }
589          }
590          sourceCell = sourceCells.nextCellInRow();
591          targetCell = targetCells.nextCellInRow();
592        }
593
594        if (!dryRun && sourceTableHash.scanBatch > 0) {
595          if (put != null && put.size() >= sourceTableHash.scanBatch) {
596            context.write(new ImmutableBytesWritable(rowKey), put);
597            put = null;
598          }
599          if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
600            context.write(new ImmutableBytesWritable(rowKey), delete);
601            delete = null;
602          }
603        }
604      }
605
606      if (!dryRun) {
607        if (put != null) {
608          context.write(new ImmutableBytesWritable(rowKey), put);
609        }
610        if (delete != null) {
611          context.write(new ImmutableBytesWritable(rowKey), delete);
612        }
613      }
614
615      if (matchingCells > 0) {
616        context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
617      }
618      if (matchingRow) {
619        context.getCounter(Counter.MATCHINGROWS).increment(1);
620        return true;
621      } else {
622        context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
623        return false;
624      }
625    }
626
627    /**
628     * Compare row keys of the given Result objects. Nulls are after non-nulls
629     */
630    private static int compareRowKeys(byte[] r1, byte[] r2) {
631      if (r1 == null) {
632        return 1; // source missing row
633      } else if (r2 == null) {
634        return -1; // target missing row
635      } else {
636        // Sync on no META tables only. We can directly do what CellComparator is doing inside.
637        // Never the call going to MetaCellComparator.
638        return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length);
639      }
640    }
641
642    /**
643     * Compare families, qualifiers, and timestamps of the given Cells. They are assumed to be of
644     * the same row. Nulls are after non-nulls.
645     */
646    private int compareCellKeysWithinRow(Cell c1, Cell c2) {
647      if (c1 == null) {
648        return 1; // source missing cell
649      }
650      if (c2 == null) {
651        return -1; // target missing cell
652      }
653
654      int result = CellComparator.getInstance().compareFamilies(c1, c2);
655      if (result != 0) {
656        return result;
657      }
658
659      result = CellComparator.getInstance().compareQualifiers(c1, c2);
660      if (result != 0) {
661        return result;
662      }
663
664      if (this.ignoreTimestamp) {
665        return 0;
666      } else {
667        // note timestamp comparison is inverted - more recent cells first
668        return CellComparator.getInstance().compareTimestamps(c1, c2);
669      }
670    }
671
672    @Override
673    protected void cleanup(Context context) throws IOException, InterruptedException {
674      if (mapperException == null) {
675        try {
676          finishRemainingHashRanges(context);
677        } catch (Throwable t) {
678          mapperException = t;
679        }
680      }
681
682      try {
683        sourceTable.close();
684        targetTable.close();
685        sourceConnection.close();
686        targetConnection.close();
687      } catch (Throwable t) {
688        if (mapperException == null) {
689          mapperException = t;
690        } else {
691          LOG.error("Suppressing exception from closing tables", t);
692        }
693      }
694
695      // propagate first exception
696      if (mapperException != null) {
697        Throwables.propagateIfInstanceOf(mapperException, IOException.class);
698        Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
699        Throwables.propagate(mapperException);
700      }
701    }
702
703    private void finishRemainingHashRanges(Context context)
704      throws IOException, InterruptedException {
705      TableSplit split = (TableSplit) context.getInputSplit();
706      byte[] splitEndRow = split.getEndRow();
707      boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
708
709      // if there are more hash batches that begin before the end of this split move to them
710      while (
711        nextSourceKey != null && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)
712      ) {
713        moveToNextBatch(context);
714      }
715
716      if (targetHasher.isBatchStarted()) {
717        // need to complete the final open hash batch
718
719        if (
720          (nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
721            || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))
722        ) {
723          // the open hash range continues past the end of this region
724          // add a scan to complete the current hash range
725          Scan scan = sourceTableHash.initScan();
726          scan.setStartRow(splitEndRow);
727          if (nextSourceKey == null) {
728            scan.setStopRow(sourceTableHash.stopRow);
729          } else {
730            scan.setStopRow(nextSourceKey.copyBytes());
731          }
732
733          ResultScanner targetScanner = null;
734          try {
735            targetScanner = targetTable.getScanner(scan);
736            for (Result row : targetScanner) {
737              targetHasher.hashResult(row);
738            }
739          } finally {
740            if (targetScanner != null) {
741              targetScanner.close();
742            }
743          }
744        } // else current batch ends exactly at split end row
745
746        finishBatchAndCompareHashes(context);
747      }
748    }
749  }
750
751  private static final int NUM_ARGS = 3;
752
753  private static void printUsage(final String errorMsg) {
754    if (errorMsg != null && errorMsg.length() > 0) {
755      System.err.println("ERROR: " + errorMsg);
756      System.err.println();
757    }
758    System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
759    System.err.println();
760    System.err.println("Options:");
761
762    System.err.println(" sourcezkcluster  ZK cluster key of the source table");
763    System.err.println("                  (defaults to cluster in classpath's config)");
764    System.err.println(" targetzkcluster  ZK cluster key of the target table");
765    System.err.println("                  (defaults to cluster in classpath's config)");
766    System.err.println(" dryrun           if true, output counters but no writes");
767    System.err.println("                  (defaults to false)");
768    System.err.println(" doDeletes        if false, does not perform deletes");
769    System.err.println("                  (defaults to true)");
770    System.err.println(" doPuts           if false, does not perform puts");
771    System.err.println("                  (defaults to true)");
772    System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing ");
773    System.err.println("                  cell values. Any missing cell on target then gets");
774    System.err.println("                  added with current time as timestamp ");
775    System.err.println("                  (defaults to false)");
776    System.err.println();
777    System.err.println("Args:");
778    System.err.println(" sourcehashdir    path to HashTable output dir for source table");
779    System.err.println("                  (see org.apache.hadoop.hbase.mapreduce.HashTable)");
780    System.err.println(" sourcetable      Name of the source table to sync from");
781    System.err.println(" targettable      Name of the target table to sync to");
782    System.err.println();
783    System.err.println("Examples:");
784    System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
785    System.err.println(" to a local target cluster:");
786    System.err.println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
787      + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
788      + " hdfs://nn:9000/hashes/tableA tableA tableA");
789  }
790
791  private boolean doCommandLine(final String[] args) {
792    if (args.length < NUM_ARGS) {
793      printUsage(null);
794      return false;
795    }
796    try {
797      sourceHashDir = new Path(args[args.length - 3]);
798      sourceTableName = args[args.length - 2];
799      targetTableName = args[args.length - 1];
800
801      for (int i = 0; i < args.length - NUM_ARGS; i++) {
802        String cmd = args[i];
803        if (cmd.equals("-h") || cmd.startsWith("--h")) {
804          printUsage(null);
805          return false;
806        }
807
808        final String sourceZkClusterKey = "--sourcezkcluster=";
809        if (cmd.startsWith(sourceZkClusterKey)) {
810          sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
811          continue;
812        }
813
814        final String targetZkClusterKey = "--targetzkcluster=";
815        if (cmd.startsWith(targetZkClusterKey)) {
816          targetZkCluster = cmd.substring(targetZkClusterKey.length());
817          continue;
818        }
819
820        final String dryRunKey = "--dryrun=";
821        if (cmd.startsWith(dryRunKey)) {
822          dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
823          continue;
824        }
825
826        final String doDeletesKey = "--doDeletes=";
827        if (cmd.startsWith(doDeletesKey)) {
828          doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length()));
829          continue;
830        }
831
832        final String doPutsKey = "--doPuts=";
833        if (cmd.startsWith(doPutsKey)) {
834          doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length()));
835          continue;
836        }
837
838        final String ignoreTimestampsKey = "--ignoreTimestamps=";
839        if (cmd.startsWith(ignoreTimestampsKey)) {
840          ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
841          continue;
842        }
843
844        printUsage("Invalid argument '" + cmd + "'");
845        return false;
846      }
847
848    } catch (Exception e) {
849      e.printStackTrace();
850      printUsage("Can't start because " + e.getMessage());
851      return false;
852    }
853    return true;
854  }
855
856  /**
857   * Main entry point.
858   */
859  public static void main(String[] args) throws Exception {
860    int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
861    System.exit(ret);
862  }
863
864  @Override
865  public int run(String[] args) throws Exception {
866    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
867    if (!doCommandLine(otherArgs)) {
868      return 1;
869    }
870
871    Job job = createSubmittableJob(otherArgs);
872    if (!job.waitForCompletion(true)) {
873      LOG.info("Map-reduce job failed!");
874      return 1;
875    }
876    counters = job.getCounters();
877    return 0;
878  }
879
880}